Go : grpc下载文件+Status+Code

简介

在使用grpc的过程中, 有错误的情况下不管是服务器端还是客户端, 直接返回错误之后在另外一端出现了 rpc error: code = Unknown desc = xxxxxx, 所以, 不好进行判断, 下面就使用grpc包内的status 和 codes包, 使返回值方便解析和更好复用grpc错误, 顺道将一些技巧顺带使用

代码 

download.proto

syntax = "proto3";

//package download;

//option csharp_namespace = "Google.Protobuf.WellKnownTypes";
option go_package = "../proto";
//option java_package = "com.google.protobuf";
//option java_outer_classname = "AnyProto";
//option java_multiple_files = true;
option objc_class_prefix = "GPB";

service DownloadService{
    rpc Download(DownloadRequest) returns (stream DownloadResponse) {};
}

message DownloadRequest {
    string FilePath = 1;
}

message DownloadResponse {
    bytes  Buffer = 2;

}

指令生成 download.pb.go

protoc -I ./ --go_out=plugins=grpc:. ./download.proto

server.go

package main

import (
	pb "download/proto"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

var gDownloader = &Downloader{}

type DownloadServer struct {

}

func (d *DownloadServer) Download(in *pb.DownloadRequest, stream pb.DownloadService_DownloadServer) error {
	err := gDownloader.Download(in.FilePath, func (buffer []byte) error{
		return stream.Send(&pb.DownloadResponse{
			Buffer: buffer,
		})
	})

	if EndOfError == err {
		return status.Error(codes.Code(EOF), CodeToStr(EOF))
	}

	if NormallyCloseError == err {
		return status.Error(codes.Code(NORMALLY_EXIST), CodeToStr(NORMALLY_EXIST))
	}

	return err
}

client.go

package main

import (
	"context"
	pb "download/proto"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/status"
	"log"
)

func download(client pb.DownloadServiceClient, r *pb.DownloadRequest) error {
	stream, err := client.Download(context.Background(), r)
	if err != nil {
		return err
	}
	for {
		resp, err := stream.Recv()
		status := status.Convert(err)

		if status.Code() == EOF {
			fmt.Println("文件正常结束...")
			break
		}
		if err != nil {
			return err
		}
		fmt.Println("返回值:", resp)
	}

	return nil
}

func Download(host string) error {
	var (
		conn       *grpc.ClientConn
		err        error
		grpcClient pb.DownloadServiceClient
	)
	conn, err = grpc.Dial(host, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("grpc.Dial err: %v", err)
		return err
	}
	defer conn.Close()

	grpcClient = pb.NewDownloadServiceClient(conn)

	return download(grpcClient, &pb.DownloadRequest{FilePath: string("D:\\Softwares\\360\\360Safe\\360Base.dll")})
}

service.go

package main

import (
	pb "download/proto"
	"google.golang.org/grpc"
	"log"
	"net"
)

func RunDownloadService(network, host string) error {
	server := grpc.NewServer()
	pb.RegisterDownloadServiceServer(server, &DownloadServer{})

	lis, err := net.Listen(network, host)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
		return err
	}

	err = server.Serve(lis)
	if err != nil {
		log.Fatalf("net.Listen err: %v", err)
	}
	return err
}

codes.go

package main

import "io"

const (
	EOF = 1024 + iota
	NORMALLY_EXIST
)

var codeToStr = map[int] string {
	EOF: io.EOF.Error(),
	NORMALLY_EXIST: "normally close",
}

func CodeToStr(code int) string {
	v, _ := codeToStr[code]
	return v
}

downloader.go

package main

import (
	"errors"
	"io"
	"os"
	"sync"
)

var NormallyCloseError = errors.New(CodeToStr(NORMALLY_EXIST))
var EndOfError = errors.New(CodeToStr(EOF))

type Downloader struct {
	waiter sync.WaitGroup
	mu sync.Mutex

	exitChans []chan int
}

func (d *Downloader) WaitForDone() {
	d.waiter.Wait()
}

func (d *Downloader) CacelAllAndWaitForDone() {
	d.mu.Lock()
	defer d.mu.Unlock()

	d.SendToExitChans()

	d.waiter.Wait()

	d.freeAllExitChan()
}

func (d *Downloader) NewExitChan() chan int {
	d.mu.Lock()
	defer d.mu.Unlock()

	return d.newExitChan()
}

func (d *Downloader) FreeExitChan(c chan int) {
	d.mu.Lock()
	defer d.mu.Unlock()

	d.freeExitChan(c)
}

func (d *Downloader) FreeAllExitChan() {
	d.mu.Lock()
	defer d.mu.Unlock()

	d.freeAllExitChan()
}

func (d *Downloader) SendToExitChans() {
	d.mu.Lock()
	defer d.mu.Unlock()

	for i, _ := range d.exitChans {
		go func() {
			d.exitChans[i] <- 1
		} ()
	}
}

func (d *Downloader) newExitChan() chan int {
	if nil == d.exitChans { d.exitChans = make([]chan int, 0) }

	var chn = make(chan int, 1)

	d.exitChans = append(d.exitChans, chn)

	return chn
}

func (d *Downloader) freeExitChan(c chan int) {
	for i, chn := range d.exitChans {
		if chn == c {
			d.exitChans[i] = nil
			d.exitChans = append(d.exitChans[:i], d.exitChans[i+1:]...)
			break
		}
	}
}

func (d *Downloader) freeAllExitChan() {
	for i, _ := range d.exitChans {
		d.exitChans[i] = nil
	}
	d.exitChans = nil
}

func (d *Downloader) Download(dst string, f func (buffer []byte) error) error {
	d.waiter.Add(1)
	defer d.waiter.Done()

	exitChan := d.NewExitChan()
	defer func() {
	    d.FreeExitChan(exitChan)
	} ()

	var file, err = os.Open(dst)
	if nil != err { return err }
	defer file.Close()
	var maxSize = 1024
	var buffer = make([]byte, maxSize)
	var n int

	for {
		select {
		case <-exitChan:
			return NormallyCloseError
		default:
			n, err = file.Read(buffer)
			if nil != err {
				if io.EOF != err {
					return err
				}
			}
			if 0 < n {
				err = f(buffer[:n])
				if nil != err { return nil }
			}
		}
		if n < maxSize { return EndOfError }
	}
}

资源

代码 - https://download.csdn.net/download/halo_hsuh/12547363

猜你喜欢

转载自blog.csdn.net/halo_hsuh/article/details/106934388
今日推荐