本篇为【写给go开发者的gRPC教程系列】第五篇

第一篇:protobuf根底

第二篇:通信办法

第三篇:拦截器

第四篇:过错处理

第五篇:metadata

本系列将持续更新,欢迎关注获取实时告诉


敞开生长之旅!这是我参加「日新方案 2 月更文应战」的第 3 天,点击查看活动概况

导语

和在一般HTTP恳求中相同,gRPC提供了在每一次RPC中携带的上下文结构:metadata。在Go语言中,它与context.Context紧密结合,帮助咱们完成服务端与客户端之间互相传递信息

什么是metadata

gRPC 的 metadata 简单了解,便是 HTTP Header 中的 key-value 对

  • metadata 是以 key-value 的办法存储数据的,其中 key 是 string 类型,而 value 是 []string,即一个字符串数组类型

  • metadata 使得 client 和 server 能够为对方提供关于本次调用的一些信息,就像一次HTTP恳求的Request HeaderResponse Header相同

  • HTTP Header 的生命周期是一次 HTTP 恳求,那么 metadata 的生命周期便是一次 RPC 调用

metadata 创立

运用New():

md := metadata.New(map[string]string{"key1":"value1","key2":"value2"})

运用Pairs():

要注意假如有相同的 key 会自动兼并

md := metadata.Pairs(
    "key1", "value1",
    "key1", "value1.2", // "key1" will have map value []string{"value1", "value1.2"}
    "key2", "value2",
)

兼并多个metadata

md1 :=  metadata.Pairs("k1", "v1", "k2", "v2")
md2 := metadata.New(map[string]string{"key1":"value1","key2":"value2"})
md := metadata.Join(md1, md2)

存储二进制数据

在 metadata 中,key 永远是 string 类型,但是 value 能够是 string 也能够是二进制数据。为了在 metadata 中存储二进制数据,咱们仅仅需要在 key 的后边加上一个 – bin 后缀。具有 – bin 后缀的 key 所对应的 value 在创立 metadata 时会被编码(base64),收到的时分会被解码:

md := metadata.Pairs(
    "key", "string value",
    "key-bin", string([]byte{96, 102}),
)

metadata 结构自身也有一些操作办法,参阅文档非常容易了解。这儿不再赘述:pkg.go.dev/google.gola…

metadata 发送与接纳

让咱们再次回忆下pb文件和生成出来的client与server端的接口

service OrderManagement {
    rpc getOrder(google.protobuf.StringValue) returns (Order);
}
type OrderManagementClient interface {
	GetOrder(ctx context.Context, 
           in *wrapperspb.StringValue, opts ...grpc.CallOption) (*Order, error)
}
type OrderManagementServer interface {
	GetOrder(context.Context, *wrapperspb.StringValue) (*Order, error)
	mustEmbedUnimplementedOrderManagementServer()
}

能够看到比较pb中的接口界说,生成出来的Go代码除了增加了error回来值,还多了context.Context

和过错处理相似,gRPC中的context.Context 也符合Go语言的运用习气:通常情况下咱们在函数首个参数放置context.Context用来传递一次RPC中有关的上下文,借助context.WithValue()ctx.Value()context增加变量或读取变量

metadata便是gRPC中能够传递的上下文信息之一,所以metadata的运用办法便是:metadata记录到context,从context读取metadata

写给go开发者的gRPC教程-metadata

Clinet发送Server接纳

client发送metadata,那便是把metadata存储到contex.Context

server接纳metadata,便是从contex.Context中读取Metadata

Clinet 发送 Metadata

Metadata放到contex.Context,有几种办法

运用NewOutgoingContext

将新创立的metadata增加到context中,这样会 覆盖 掉本来已有的metadata

// 将metadata增加到context中,获取新的context
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)
// unary RPC
response, err := client.SomeRPC(ctx, someRequest)
// streaming RPC
stream, err := client.SomeStreamingRPC(ctx)

运用AppendToOutgoingContext

能够直接将 key-value 对增加到已有的context

  • 假如context中没有metadata,那么就会 创立 一个

  • 假如已有metadata,那么就将数据 增加 到本来的metadata

// 假如对应的 context 没有 metadata,那么就会创立一个
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// 假如已有 metadata 了,那么就将数据增加到本来的 metadata  (例如在拦截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
// 一般RPC(unary RPC)
response, err := client.SomeRPC(ctx, someRequest)
// 流式RPC(streaming RPC)
stream, err := client.SomeStreamingRPC(ctx)

Server 接纳 Metedata

一般RPC与流式RPC的差异不大,都是从contex.Context中读取metadata

运用FromIncomingContext

一般RPC(unary RPC)

//Unary Call
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}

流式RPC(streaming RPC)

//Streaming Call
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
    md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
    // do something with metadata
}

Server发送Clinet接纳

服务端发送的metadata被分成了headertrailer两者,因而客户端也能够读取两者

Server 发送 Metadata

对于**一般RPC(unary RPC)**server能够运用grpc包中提供的函数向client发送 headertrailer

  • grpc.SendHeader()
  • grpc.SetHeader()
  • grpc.SetTrailer()

对于**流式RPC(streaming RPC)server能够运用ServerStream接口中界说的函数向client发送headertrailer

  • ServerStream.SendHeader()
  • ServerStream.SetHeader()
  • ServerStream.SetTrailer()

一般RPC(unary RPC)

运用 grpc.SendHeader()grpc.SetTrailer() 办法 ,这两个函数将context.Context作为第一个参数

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
  // 创立并发送header
  header := metadata.Pairs("header-key", "val")
  grpc.SendHeader(ctx, header)
  // 创立并发送trailer
  trailer := metadata.Pairs("trailer-key", "val")
  grpc.SetTrailer(ctx, trailer)
}

假如不想当即发送header,也能够运用grpc.SetHeader()grpc.SetHeader()能够被多次调用,在如下机遇会把多个metadata兼并发送出去

  • 调用grpc.SendHeader()
  • 第一个呼应被发送时
  • RPC完毕时(包括成功或失利)
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
  // 创立header,在适当机遇会被发送
  header := metadata.Pairs("header-key1", "val1")
  grpc.SetHeader(ctx, header)
  // 创立header,在适当机遇会被发送
  header := metadata.Pairs("header-key2", "val2")
  grpc.SetHeader(ctx, header)
  // 创立并发送trailer
  trailer := metadata.Pairs("trailer-key", "val")
  grpc.SetTrailer(ctx, trailer)
}

流式RPC(streaming RPC)

运用 ServerStream.SendHeader()ServerStream.SetTrailer() 办法

func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
  // create and send header
  header := metadata.Pairs("header-key", "val")
  stream.SendHeader(header)
  // create and set trailer
  trailer := metadata.Pairs("trailer-key", "val")
  stream.SetTrailer(trailer)
}

假如不想当即发送header,也能够运用ServerStream.SetHeader()ServerStream.SetHeader()能够被多次调用,在如下机遇会把多个metadata兼并发送出去

  • 调用ServerStream.SendHeader()
  • 第一个呼应被发送时
  • RPC完毕时(包括成功或失利)
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
  // create and send header
  header := metadata.Pairs("header-key", "val")
  stream.SetHeader(header)
  // create and set trailer
  trailer := metadata.Pairs("trailer-key", "val")
  stream.SetTrailer(trailer)
}

Client 接纳 Metadata

一般RPC(unary RPC)

**一般RPC(unary RPC)**运用grpc.Header()grpc.Trailer()办法来接纳 Metadata

// RPC using the context with new metadata.
var header, trailer metadata.MD
// Add Order
order := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}
res, err := client.AddOrder(ctx, &order, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
  panic(err)
}

流式RPC(streaming RPC)

**流式RPC(streaming RPC)**通过调用回来的 ClientStream接口的Header() Trailer()办法接纳 metadata

stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
stream.CloseAndRecv()
// retrieve trailer
trailer := stream.Trailer()

HeaderTrailer差异

根本差异:发送的机遇不同!

headers会在下面三种场景下被发送

  • SendHeader() 被调用时(包括grpc.SendHeaderstream.SendHeader)
  • 第一个呼应被发送时
  • RPC完毕时(包括成功或失利)

trailer会在rpc回来的时分,即这个恳求完毕的时分被发送

差异在流式RPC(streaming RPC)中比较明显:

因为trailer是在服务端发送完恳求之后才发送的,所以client获取trailer的时分需要在stream.CloseAndRecv或者stream.Recv 回来非nil过错 (包括 io.EOF)之后

假如stream.CloseAndRecv之前调用stream.Trailer()获取的是空

stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
// retrieve trailer 
// `trailer`会在rpc回来的时分,即这个恳求完毕的时分被发送
// 因而此刻调用`stream.Trailer()`获取的是空
trailer := stream.Trailer()
stream.CloseAndRecv()
// retrieve trailer 
// `trailer`会在rpc回来的时分,即这个恳求完毕的时分被发送
// 因而此刻调用`stream.Trailer()`才能够获取到值
trailer := stream.Trailer()

运用场景

既然咱们把metadata类比成HTTP Header,那么metadata的运用场景也能够借鉴HTTPHeader。如传递用户token进行用户认证,传递trace进行链路追寻等

拦截器中的metadata

在拦截器中,咱们不光能够获取或修正接纳到的metadata,乃至还能够截取并修正要发送出去的metadata

还记得拦截器如何完成么?假如现已忘了快快回忆一下吧:

举个例子:

咱们在客户端拦截器中从要发送给服务端的metadata中读取一个时间戳字段,假如没有则弥补这个时间戳字段

注意这儿用到了一个上文没有说到的FromOutgoingContext(ctx)函数

func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{},
	cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	var s string
	// 获取要发送给服务端的`metadata`
	md, ok := metadata.FromOutgoingContext(ctx)
	if ok && len(md.Get("time")) > 0 {
		s = md.Get("time")[0]
	} else {
        // 假如没有则弥补这个时间戳字段
		s = "inter" + strconv.FormatInt(time.Now().UnixNano(), 10)
		ctx = metadata.AppendToOutgoingContext(ctx, "time", s)
	}
	log.Printf("call timestamp: %s", s)
	// Invoking the remote method
	err := invoker(ctx, method, req, reply, cc, opts...)
	return err
}
func main() {
	conn, err := grpc.Dial("127.0.0.1:8009",
		grpc.WithInsecure(),
		grpc.WithChainUnaryInterceptor(
			orderUnaryClientInterceptor,
		),
	)
	if err != nil {
		panic(err)
	}
    c := pb.NewOrderManagementClient(conn)
	ctx = metadata.AppendToOutgoingContext(context.Background(), "time",
		"raw"+strconv.FormatInt(time.Now().UnixNano(), 10))
	// RPC using the context with new metadata.
	var header, trailer metadata.MD
	// Add Order
	order := pb.Order{
		Id:          "101",
		Items:       []string{"iPhone XS", "Mac Book Pro"},
		Destination: "San Jose, CA",
		Price:       2300.00,
	}
	res, err := c.AddOrder(ctx, &order)
	if err != nil {
		panic(err)
	}
}

以上的思路在server同样适用。根据以上原理咱们能够完成链路追寻、用户认证等功能

过错信息

还记得过错处理一文中留下的问题么:gRPC 中如何传递过错音讯Status的呢?没错!也是运用的metadata或者说http2.0headerStatus的三种信息别离运用了三个header

  • Grpc-Status: 传递Statuscode
  • Grpc-Message: 传递Statusmessage
  • Grpc-Status-Details-Bin: 传递Statusdetails
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
	// ...
		h := ht.rw.Header()
		h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
		if m := st.Message(); m != "" {
			h.Set("Grpc-Message", encodeGrpcMessage(m))
		}
		if p := st.Proto(); p != nil && len(p.Details) > 0 {
			stBytes, err := proto.Marshal(p)
			if err != nil {
				// TODO: return error instead, when callers are able to handle it.
				panic(err)
			}
			h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
		}
    // ...
}

总结

一张图总结下整个metadata的运用办法

写给go开发者的gRPC教程-metadata

推荐阅读

  • 写给go开发者的gRPC教程-拦截器
  • 写给go开发者的gRPC教程-过错处理

参阅资料

  • gRPC 中的 Metadata
  • pkg.go.dev/grpc@v1.44.…
  • concept of metadata
  • Documentation/grpc-metadata

✨ 微信大众号【凉凉的知识库】同步更新,欢迎关注获取最新最有用的后端知识 ✨