本篇为【写给go开发者的gRPC教程系列】第五篇
第一篇:protobuf根底
第二篇:通信办法
第三篇:拦截器
第四篇:过错处理
第五篇:metadata
本系列将持续更新,欢迎关注获取实时告诉
敞开生长之旅!这是我参加「日新方案 2 月更文应战」的第 3 天,点击查看活动概况
导语
和在一般
HTTP
恳求中相同,gRPC提供了在每一次RPC中携带的上下文结构:metadata
。在Go语言中,它与context.Context
紧密结合,帮助咱们完成服务端与客户端之间互相传递信息
什么是metadata
?
gRPC 的 metadata
简单了解,便是 HTTP Header
中的 key-value 对
-
metadata
是以key-value
的办法存储数据的,其中 key 是 string 类型,而 value 是 []string,即一个字符串数组类型 -
metadata
使得 client 和 server 能够为对方提供关于本次调用的一些信息,就像一次HTTP恳求的Request Header
和Response Header
相同 -
HTTP Header
的生命周期是一次 HTTP 恳求,那么metadata
的生命周期便是一次 RPC 调用
metadata 创立
运用New():
md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
运用Pairs():
要注意假如有相同的 key 会自动兼并
md := metadata.Pairs(
"key1", "value1",
"key1", "value1.2", // "key1" will have map value []string{"value1", "value1.2"}
"key2", "value2",
)
兼并多个metadata
md1 := metadata.Pairs("k1", "v1", "k2", "v2")
md2 := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
md := metadata.Join(md1, md2)
存储二进制数据
在 metadata 中,key 永远是 string 类型,但是 value 能够是 string 也能够是二进制数据。为了在 metadata 中存储二进制数据,咱们仅仅需要在 key 的后边加上一个 – bin 后缀。具有 – bin 后缀的 key 所对应的 value 在创立 metadata 时会被编码(base64),收到的时分会被解码:
md := metadata.Pairs(
"key", "string value",
"key-bin", string([]byte{96, 102}),
)
metadata 结构自身也有一些操作办法,参阅文档非常容易了解。这儿不再赘述:pkg.go.dev/google.gola…
metadata 发送与接纳
让咱们再次回忆下pb文件和生成出来的client与server端的接口
service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementClient interface {
GetOrder(ctx context.Context,
in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error)
}
type OrderManagementServer interface {
GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
mustEmbedUnimplementedOrderManagementServer()
}
能够看到比较pb中的接口界说,生成出来的Go代码除了增加了error
回来值,还多了context.Context
和过错处理相似,gRPC中的context.Context
也符合Go语言的运用习气:通常情况下咱们在函数首个参数放置context.Context
用来传递一次RPC中有关的上下文,借助context.WithValue()
或ctx.Value()
往context
增加变量或读取变量
metadata
便是gRPC中能够传递的上下文信息之一,所以metadata
的运用办法便是:metadata
记录到context
,从context
读取metadata
Clinet发送Server接纳
client
发送metadata
,那便是把metadata
存储到contex.Context
server
接纳metadata
,便是从contex.Context
中读取Metadata
Clinet 发送 Metadata
把Metadata
放到contex.Context
,有几种办法
运用NewOutgoingContext
将新创立的metadata
增加到context
中,这样会 覆盖 掉本来已有的metadata
// 将metadata增加到context中,获取新的context
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)
// unary RPC
response, err := client.SomeRPC(ctx, someRequest)
// streaming RPC
stream, err := client.SomeStreamingRPC(ctx)
运用AppendToOutgoingContext
能够直接将 key-value 对增加到已有的context
中
-
假如
context
中没有metadata
,那么就会 创立 一个 -
假如已有
metadata
,那么就将数据 增加 到本来的metadata
// 假如对应的 context 没有 metadata,那么就会创立一个
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// 假如已有 metadata 了,那么就将数据增加到本来的 metadata (例如在拦截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
// 一般RPC(unary RPC)
response, err := client.SomeRPC(ctx, someRequest)
// 流式RPC(streaming RPC)
stream, err := client.SomeStreamingRPC(ctx)
Server 接纳 Metedata
一般RPC与流式RPC的差异不大,都是从contex.Context
中读取metadata
运用FromIncomingContext
一般RPC(unary RPC)
//Unary Call
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
流式RPC(streaming RPC)
//Streaming Call
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
// do something with metadata
}
Server发送Clinet接纳
服务端发送的metadata
被分成了header
和 trailer
两者,因而客户端也能够读取两者
Server 发送 Metadata
对于**一般RPC(unary RPC)**server能够运用grpc包中提供的函数向client发送 header
和trailer
grpc.SendHeader()
grpc.SetHeader()
grpc.SetTrailer()
对于**流式RPC(streaming RPC)server能够运用ServerStream接口中界说的函数向client发送header
和 trailer
ServerStream.SendHeader()
ServerStream.SetHeader()
ServerStream.SetTrailer()
一般RPC(unary RPC)
运用 grpc.SendHeader()
和 grpc.SetTrailer()
办法 ,这两个函数将context.Context
作为第一个参数
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 创立并发送header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// 创立并发送trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
假如不想当即发送header
,也能够运用grpc.SetHeader()
。grpc.SetHeader()
能够被多次调用,在如下机遇会把多个metadata
兼并发送出去
- 调用
grpc.SendHeader()
- 第一个呼应被发送时
- RPC完毕时(包括成功或失利)
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 创立header,在适当机遇会被发送
header := metadata.Pairs("header-key1", "val1")
grpc.SetHeader(ctx, header)
// 创立header,在适当机遇会被发送
header := metadata.Pairs("header-key2", "val2")
grpc.SetHeader(ctx, header)
// 创立并发送trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
流式RPC(streaming RPC)
运用 ServerStream.SendHeader()
和 ServerStream.SetTrailer()
办法
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SendHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
假如不想当即发送header
,也能够运用ServerStream.SetHeader()
。ServerStream.SetHeader()
能够被多次调用,在如下机遇会把多个metadata
兼并发送出去
- 调用
ServerStream.SendHeader()
- 第一个呼应被发送时
- RPC完毕时(包括成功或失利)
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SetHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
Client 接纳 Metadata
一般RPC(unary RPC)
**一般RPC(unary RPC)**运用grpc.Header()
和grpc.Trailer()
办法来接纳 Metadata
// RPC using the context with new metadata.
var header, trailer metadata.MD
// Add Order
order := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}
res, err := client.AddOrder(ctx, &order, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
panic(err)
}
流式RPC(streaming RPC)
**流式RPC(streaming RPC)**通过调用回来的 ClientStream
接口的Header()
和 Trailer()
办法接纳 metadata
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
stream.CloseAndRecv()
// retrieve trailer
trailer := stream.Trailer()
Header
和Trailer
差异
根本差异:发送的机遇不同!
✨ headers
会在下面三种场景下被发送
-
SendHeader()
被调用时(包括grpc.SendHeader
和stream.SendHeader
) - 第一个呼应被发送时
- RPC完毕时(包括成功或失利)
✨ trailer
会在rpc回来的时分,即这个恳求完毕的时分被发送
差异在流式RPC(streaming RPC)中比较明显:
因为trailer
是在服务端发送完恳求之后才发送的,所以client获取trailer
的时分需要在stream.CloseAndRecv
或者stream.Recv
回来非nil过错 (包括 io.EOF)之后
假如stream.CloseAndRecv
之前调用stream.Trailer()
获取的是空
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
// retrieve trailer
// `trailer`会在rpc回来的时分,即这个恳求完毕的时分被发送
// 因而此刻调用`stream.Trailer()`获取的是空
trailer := stream.Trailer()
stream.CloseAndRecv()
// retrieve trailer
// `trailer`会在rpc回来的时分,即这个恳求完毕的时分被发送
// 因而此刻调用`stream.Trailer()`才能够获取到值
trailer := stream.Trailer()
运用场景
既然咱们把metadata
类比成HTTP Header
,那么metadata
的运用场景也能够借鉴HTTP
的Header
。如传递用户token
进行用户认证,传递trace
进行链路追寻等
拦截器中的metadata
在拦截器中,咱们不光能够获取或修正接纳到的metadata
,乃至还能够截取并修正要发送出去的metadata
还记得拦截器如何完成么?假如现已忘了快快回忆一下吧:
举个例子:
咱们在客户端拦截器中从要发送给服务端的metadata
中读取一个时间戳字段,假如没有则弥补这个时间戳字段
注意这儿用到了一个上文没有说到的FromOutgoingContext(ctx)
函数
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var s string
// 获取要发送给服务端的`metadata`
md, ok := metadata.FromOutgoingContext(ctx)
if ok && len(md.Get("time")) > 0 {
s = md.Get("time")[0]
} else {
// 假如没有则弥补这个时间戳字段
s = "inter" + strconv.FormatInt(time.Now().UnixNano(), 10)
ctx = metadata.AppendToOutgoingContext(ctx, "time", s)
}
log.Printf("call timestamp: %s", s)
// Invoking the remote method
err := invoker(ctx, method, req, reply, cc, opts...)
return err
}
func main() {
conn, err := grpc.Dial("127.0.0.1:8009",
grpc.WithInsecure(),
grpc.WithChainUnaryInterceptor(
orderUnaryClientInterceptor,
),
)
if err != nil {
panic(err)
}
c := pb.NewOrderManagementClient(conn)
ctx = metadata.AppendToOutgoingContext(context.Background(), "time",
"raw"+strconv.FormatInt(time.Now().UnixNano(), 10))
// RPC using the context with new metadata.
var header, trailer metadata.MD
// Add Order
order := pb.Order{
Id: "101",
Items: []string{"iPhone XS", "Mac Book Pro"},
Destination: "San Jose, CA",
Price: 2300.00,
}
res, err := c.AddOrder(ctx, &order)
if err != nil {
panic(err)
}
}
以上的思路在server同样适用。根据以上原理咱们能够完成链路追寻、用户认证等功能
过错信息
还记得过错处理一文中留下的问题么:gRPC 中如何传递过错音讯Status
的呢?没错!也是运用的metadata
或者说http2.0
的header
。Status
的三种信息别离运用了三个header
头
-
Grpc-Status
: 传递Status
的code
-
Grpc-Message
: 传递Status
的message
-
Grpc-Status-Details-Bin
: 传递Status
的details
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
// ...
h := ht.rw.Header()
h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
if m := st.Message(); m != "" {
h.Set("Grpc-Message", encodeGrpcMessage(m))
}
if p := st.Proto(); p != nil && len(p.Details) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
panic(err)
}
h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
}
// ...
}
总结
一张图总结下整个metadata
的运用办法
推荐阅读
- 写给go开发者的gRPC教程-拦截器
- 写给go开发者的gRPC教程-过错处理
参阅资料
- gRPC 中的 Metadata
- pkg.go.dev/grpc@v1.44.…
- concept of metadata
- Documentation/grpc-metadata
✨ 微信大众号【凉凉的知识库】同步更新,欢迎关注获取最新最有用的后端知识 ✨