你将会学到

  1. 一个完整的gRPC流实例,包括单向流与双向流的操作
  2. 如何实现gRPC流服务端代码
  3. 如何实现gRPC流客户端代码

准备

  1. 新建一个文件夹 go-grpc-simple-stream
  2. 在go-grpc-simple文件夹下建立三个目录: client, proto,server
  3. 使用 go mod 管理代码
  4. 在 go-grpc-simple-stream 目录下执行 go mod init go-grpc-simple-stream

编写 proto 文件

在 go-grpc-simple-stream/proto 目录下新建 hello.proto 文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
syntax = "proto3";

package hello;

service HelloService {
  //  定义一个服务端推送客户的单向流
  rpc ServerToClient(StreamRequest) returns (stream StreamResponse){};
  // 定义一个客户端推送服务端的单向流
  rpc ClientToServer(stream StreamRequest) returns (StreamResponse){};
  //  定义一个服务端与客户端的双向流
  rpc AllStream(stream StreamRequest) returns (stream StreamResponse){};
}
// stream 请求结构
message StreamRequest {
    string data = 1;
}
// stream 响应结构
message StreamResponse {
    string data = 1;
}

生成 pb go 代码

在 go-grpc-simple-stream/proto 目录下新建 gen.sh 文件

如何是 window 系统,则直接在将 protoc 命名复制在 cmd 下执行即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env bash
set -x

# 第一步: 安装 protoc 插件
# 打开下面URL, 跟据自己的系统选择对应的 protoc-3.x.x-linux|osx|win
# https://github.com/protocolbuffers/protobuf/releases
# 下载完后,加入到环境变量或path中, 保证全局可用。
# 验证: protoc --version

# 第二步:引用 proto, protoc-gen-go, grpc 共3个工具包
# 安装 golang 的proto工具包
# go get -u github.com/golang/protobuf/proto
# 安装 goalng 的proto编译支持
# go get -u github.com/golang/protobuf/protoc-gen-go
# 安装 GRPC 包
# go get -u google.golang.org/grpc

protoc --go_out=plugins=grpc:. *.proto

执行以上脚本命令会在go-grpc-simple-stream/proto 目录下生成 hello.pb.go 文件

实现服务端代码

在 go-grpc-simple-stream/server目录下新建 main.go 文件

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main

import (
	"fmt"
	hello "go-grpc-simple-stream/proto"
	"google.golang.org/grpc"
	"io"
	"log"
	"net"
	"os"
	"sync"
	"time"
)
const (
	Addr = "localhost:3721"
)

type HelloServiceImpl struct {
}

// 服务端->客户端 推送单向流
func (h *HelloServiceImpl) ServerToClient(req *hello.StreamRequest, server hello.HelloService_ServerToClientServer) error {
	// 模拟一个向客户端推送10次的单向流
	var i int
	for {
		// 打印接受客户端的消息
		log.Printf("单向流.接受客户端的消息:%s\n", req.GetData())
		// 向客户端发送消息
		err := server.Send(&hello.StreamResponse{Data: fmt.Sprintf("%d 单向流.%s", i, time.Now().Format("15:04:05"))})
		if err != nil {
			break
		}
		i ++
		if i > 3 {
			server.Send(&hello.StreamResponse{Data: "单向流.服务端推送完啦"})
			break
		}
		time.Sleep(time.Second)
	}
	return nil
}
// 客户端->服务端 推送单向流
// 需要循环的接受来自客户端的消息,至到 io.EOF
func (h *HelloServiceImpl) ClientToServer(server hello.HelloService_ClientToServerServer) error {
	for {
		// 接受客户端的消息
		data, err := server.Recv()
		if err != nil {
			// 无数据时跳出循环
			if err == io.EOF {
				break
			}
			return err
		}
		log.Printf("单向流.接受到客户端的消息:%s", data.GetData())
	}
	err := server.SendAndClose(&hello.StreamResponse{
		Data: "单向流.接受客户端消息完毕",
	})
	if err != nil {
		return err
	}
	return nil
}
// 双向流,即可以从服务端不断发送流数据,也可以不断的接受客户端发送过来的流数据。
// 所以需要处理发送与接受,需要采用两个协程处理。
func (h *HelloServiceImpl) AllStream(server hello.HelloService_AllStreamServer) error {
	wg := sync.WaitGroup{}
	wg.Add(2)
	// 处理服务端向客户端发送的流数据
	go func() {
		defer wg.Done()
		i := 0
		for {
			err := server.Send(&hello.StreamResponse{
				Data: fmt.Sprintf("%d,来自双向流的服务端:%s",i, time.Now().Format("2006-01-02 15:04:05")),
			})
			if err != nil {
				break
			}
			i ++
			if i > 3 {
				break
			}
			time.Sleep(time.Second)
		}

	}()
	// 处理客户端向服务端发送过来的流数据。
	go func() {
		for {
			data, err := server.Recv()
			if err != nil {
				if err == io.EOF {
					break
				}
				log.Fatalln(err)
			}
			log.Printf("来自客户端的消息:%s\n", data.GetData())
		}
	}()
	wg.Wait()
	return nil
}

func main() {
	log.SetFlags(log.Lshortfile|log.LstdFlags)
	// 构造一个 gRPC 服务对象
	grpcServer := grpc.NewServer()
	// 然后使用 protoc 工具生成的 go 代码函数(RegisterHelloServiceServer)  注册我们实现的 HelloServiceImpl 服务
	hello.RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
	// 通过 grpcServer.Serve 在一个监听端口上提供 gRPC 服务
	lis, err := net.Listen("tcp", Addr)
	if err != nil {
		log.Fatal(err)
	}
	// 打印一下运行时的进程ID和地址
	go func() {
		log.Printf("PID:%d, %s\n", os.Getpid(), Addr)
	}()
	err = grpcServer.Serve(lis)
	if err != nil {
		log.Fatal(err)
	}
}

运行 go run .

实现客户端代码

在 go-grpc-simple-stream/client 目录下新建 main.go 文件

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main

import (
	"context"
	"fmt"
	hello "go-grpc-simple-stream/proto"
	"google.golang.org/grpc"
	"io"
	"log"
	"sync"
	"time"
)

const (
	Addr = "localhost:3721"
)

func main() {
	log.SetFlags(log.Lshortfile|log.LstdFlags)
	conn, err := grpc.Dial(Addr, grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()
	client := hello.NewHelloServiceClient(conn)
	// 获取服务端向客户端不断推送的数据流。
	GetServerStream(client)
	// 客户端不断的向服务器推送单向流
	PutServerStream(client)
	// 双向流
	AllStream(client)
}
// 双向流
func AllStream(client hello.HelloServiceClient) {
	all, err := client.AllStream(context.Background())
	if err != nil {
		log.Fatalln(err)
	}
	wg := sync.WaitGroup{}
	wg.Add(2)
	// 处理服务端->客户端的流
	go func() {
		defer wg.Done()
		for {
			resp, err := all.Recv()
			if err != nil {
				if err == io.EOF {
					break
				}
				log.Fatalln(err)
			}
			log.Printf("双向流,接受到服务的数据:%s\n", resp.GetData())
		}
	}()
	// 处理 客户端->服务端的流
	go func() {
		defer wg.Done()
		i := 0
		for {
			err = all.Send(&hello.StreamRequest{
				Data: fmt.Sprintf("%d,%s", i, time.Now().Format("15:04:05")),
			})
			if err != nil {
				log.Println(err)
				break
			}
			i ++
			if i > 3 {
				break
			}
			time.Sleep(time.Second)
		}
	}()
	wg.Wait()
}
// 客户端不断的向服务器推送单向流
func PutServerStream(client hello.HelloServiceClient) {
	i := 0
	// 向服务端推送流
	put, err := client.ClientToServer(context.Background())
	if err != nil {
		log.Fatalln(err)
	}
	for {
		err = put.Send(&hello.StreamRequest{
			Data: fmt.Sprintf("%d,%s", i, time.Now().Format("15:04:05")),
		})
		if err != nil {
			break
		}
		i ++
		if i > 3 {
			break
		}
		time.Sleep(time.Second)
	}
	// 接受
	resp, err := put.CloseAndRecv()
	if err != nil {
		if err != io.EOF {
			log.Fatalln(err)
		}
	}
	// 接受服务端响应的数据
	if resp != nil {
		log.Println(resp.GetData()) // 接受完毕
	}

}
// 获取服务端向客户端不断推送的数据流。
func GetServerStream(client hello.HelloServiceClient) {
	// 向服务器发一个数据标识
	req := hello.StreamRequest{Data: "客户端"}
	// 调用 ServerToClient 函数,准备接受服务端单向流
	resp, err := client.ServerToClient(context.Background(), &req)
	if err != nil {
		log.Fatalln(err)
	}
	for {
		data, err := resp.Recv()
		if err != nil {
			// 遇到 io.EOF 表示服务端流关闭
			if err == io.EOF {
				break
			}
			log.Println(err)
			break
		}
		log.Printf("服务端推送的单向流:%s\n", data)
	}
}

执行 go run .

go mod 管理代码

在 go-grpc-simple 目录下

1
go mod tidy

源码

github.com/go-grpc-simple-stream

推荐学习

  1. gRPC入门 简介
  2. gRPC入门 Protobuf
  3. gRPC入门 搭建完整gRPC
  4. gRPC入门 实现双向流