gRPC:构建高性能微服务

学习使用 gRPC 和 Protocol Buffers 构建高性能的微服务通信

gRPC:构建高性能微服务

在微服务架构中,服务之间的通信至关重要。传统的 RESTful API 虽然简单易用,但在性能、类型安全和代码生成方面存在不足。gRPC 是 Google 开源的高性能 RPC 框架,它使用 Protocol Buffers 作为接口定义语言和序列化格式,为微服务通信提供了更好的解决方案。

今天我们就来学习如何在 Go 中使用 gRPC。

什么是 gRPC?

gRPC 是一个现代的、开源的、高性能的远程过程调用(RPC)框架。它的核心特性包括:

  • Protocol Buffers:高效的二进制序列化格式
  • HTTP/2:支持多路复用、流控制、头部压缩
  • 强类型:通过 .proto 文件定义接口,自动生成代码
  • 双向流:支持客户端流、服务器流、双向流
  • 跨语言:支持 Go、Java、Python、C++ 等多种语言

安装 gRPC 工具

# 安装 Go gRPC 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 安装 Protocol Buffers 编译器
# macOS
brew install protobuf

# Ubuntu
sudo apt install protobuf-compiler

# 验证安装
protoc --version

定义服务接口

创建一个 user.proto 文件:

syntax = "proto3";

package user;

option go_package = "./user";

// 用户服务
service UserService {
  // 获取用户
  rpc GetUser (GetUserRequest) returns (User);
  
  // 创建用户
  rpc CreateUser (CreateUserRequest) returns (User);
  
  // 列出用户
  rpc ListUsers (ListUsersRequest) returns (stream User);
  
  // 更新用户信息(双向流)
  rpc UpdateUsers (stream UpdateUserRequest) returns (stream User);
}

// 用户消息
message User {
  int64 id = 1;
  string name = 2;
  string email = 3;
  int32 age = 4;
  int64 created_at = 5;
}

// 请求消息
message GetUserRequest {
  int64 id = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  int32 age = 3;
}

message ListUsersRequest {
  int32 page = 1;
  int32 page_size = 2;
}

message UpdateUserRequest {
  int64 id = 1;
  string name = 2;
  string email = 3;
  int32 age = 4;
}

生成 Go 代码

protoc --go_out=. --go-grpc_out=. user.proto

这会生成两个文件:

  • user.pb.go:Protocol Buffers 消息定义
  • user_grpc.pb.go:gRPC 服务代码

实现 gRPC 服务器

package main

import (
	"context"
	"log"
	"net"
	"sync"
	"time"
	
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	
	pb "example/user"
)

type User struct {
	ID        int64
	Name      string
	Email     string
	Age       int32
	CreatedAt int64
}

type UserServiceServer struct {
	pb.UnimplementedUserServiceServer
	
	mu      sync.RWMutex
	users   map[int64]*User
	nextID  int64
}

func NewUserServiceServer() *UserServiceServer {
	return &UserServiceServer{
		users:  make(map[int64]*User),
		nextID: 1,
	}
}

// GetUser 实现获取用户
func (s *UserServiceServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	user, ok := s.users[req.Id]
	if !ok {
		return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
	}
	
	return &pb.User{
		Id:        user.ID,
		Name:      user.Name,
		Email:     user.Email,
		Age:       user.Age,
		CreatedAt: user.CreatedAt,
	}, nil
}

// CreateUser 实现创建用户
func (s *UserServiceServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	
	user := &User{
		ID:        s.nextID,
		Name:      req.Name,
		Email:     req.Email,
		Age:       req.Age,
		CreatedAt: time.Now().Unix(),
	}
	
	s.users[user.ID] = user
	s.nextID++
	
	log.Printf("创建用户: %d (%s)", user.ID, user.Name)
	
	return &pb.User{
		Id:        user.ID,
		Name:      user.Name,
		Email:     user.Email,
		Age:       user.Age,
		CreatedAt: user.CreatedAt,
	}, nil
}

// ListUsers 实现列出用户(服务器流)
func (s *UserServiceServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
	s.mu.RLock()
	defer s.mu.RUnlock()
	
	count := 0
	for _, user := range s.users {
		if count >= int(req.PageSize) {
			break
		}
		
		err := stream.Send(&pb.User{
			Id:        user.ID,
			Name:      user.Name,
			Email:     user.Email,
			Age:       user.Age,
			CreatedAt: user.CreatedAt,
		})
		if err != nil {
			return err
		}
		
		count++
	}
	
	return nil
}

// UpdateUsers 实现批量更新(双向流)
func (s *UserServiceServer) UpdateUsers(stream pb.UserService_UpdateUsersServer) error {
	for {
		req, err := stream.Recv()
		if err != nil {
			return err
		}
		
		s.mu.Lock()
		user, ok := s.users[req.Id]
		if !ok {
			s.mu.Unlock()
			stream.Send(&pb.User{})
			continue
		}
		
		// 更新字段
		if req.Name != "" {
			user.Name = req.Name
		}
		if req.Email != "" {
			user.Email = req.Email
		}
		if req.Age > 0 {
			user.Age = req.Age
		}
		
		s.mu.Unlock()
		
		// 发送更新后的用户
		err = stream.Send(&pb.User{
			Id:        user.ID,
			Name:      user.Name,
			Email:     user.Email,
			Age:       user.Age,
			CreatedAt: user.CreatedAt,
		})
		if err != nil {
			return err
		}
	}
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("监听失败: %v", err)
	}
	
	s := grpc.NewServer()
	pb.RegisterUserServiceServer(s, NewUserServiceServer())
	
	log.Println("gRPC 服务器启动在 :50051")
	if err := s.Serve(lis); err != nil {
		log.Fatalf("服务失败: %v", err)
	}
}

实现 gRPC 客户端

package main

import (
	"context"
	"io"
	"log"
	"time"
	
	"google.golang.org/grpc"
	
	pb "example/user"
)

func main() {
	// 连接服务器
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	if err != nil {
		log.Fatalf("连接失败: %v", err)
	}
	defer conn.Close()
	
	client := pb.NewUserServiceClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	
	// 1. 创建用户
	log.Println("=== 创建用户 ===")
	user1, err := client.CreateUser(ctx, &pb.CreateUserRequest{
		Name:  "张三",
		Email: "zhangsan@example.com",
		Age:   25,
	})
	if err != nil {
		log.Fatalf("创建用户失败: %v", err)
	}
	log.Printf("创建成功: ID=%d, Name=%s", user1.Id, user1.Name)
	
	user2, err := client.CreateUser(ctx, &pb.CreateUserRequest{
		Name:  "李四",
		Email: "lisi@example.com",
		Age:   30,
	})
	if err != nil {
		log.Fatalf("创建用户失败: %v", err)
	}
	log.Printf("创建成功: ID=%d, Name=%s", user2.Id, user2.Name)
	
	// 2. 获取用户
	log.Println("\n=== 获取用户 ===")
	user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: user1.Id})
	if err != nil {
		log.Fatalf("获取用户失败: %v", err)
	}
	log.Printf("用户信息: %+v", user)
	
	// 3. 列出用户(服务器流)
	log.Println("\n=== 列出用户 ===")
	stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
		Page:     1,
		PageSize: 10,
	})
	if err != nil {
		log.Fatalf("列出用户失败: %v", err)
	}
	
	for {
		user, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("接收失败: %v", err)
		}
		log.Printf("用户: ID=%d, Name=%s", user.Id, user.Name)
	}
	
	// 4. 批量更新(双向流)
	log.Println("\n=== 批量更新 ===")
	updateStream, err := client.UpdateUsers(ctx)
	if err != nil {
		log.Fatalf("更新用户失败: %v", err)
	}
	
	// 发送更新请求
	updates := []*pb.UpdateUserRequest{
		{Id: user1.Id, Name: "张三(已更新)", Age: 26},
		{Id: user2.Id, Email: "lisi_new@example.com"},
	}
	
	for _, update := range updates {
		err := updateStream.Send(update)
		if err != nil {
			log.Fatalf("发送更新失败: %v", err)
		}
		
		// 接收更新后的用户
		user, err := updateStream.Recv()
		if err != nil {
			log.Fatalf("接收更新失败: %v", err)
		}
		log.Printf("更新成功: ID=%d, Name=%s, Email=%s", user.Id, user.Name, user.Email)
	}
	
	updateStream.CloseSend()
}

拦截器(中间件)

gRPC 支持拦截器,类似于 HTTP 的中间件:

// 服务器拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	start := time.Now()
	log.Printf("开始处理: %s", info.FullMethod)
	
	resp, err := handler(ctx, req)
	
	duration := time.Since(start)
	log.Printf("完成处理: %s (%v)", info.FullMethod, duration)
	
	return resp, err
}

// 客户端拦截器
func clientLoggingInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	start := time.Now()
	log.Printf("客户端调用: %s", method)
	
	err := invoker(ctx, method, req, reply, cc, opts...)
	
	duration := time.Since(start)
	log.Printf("客户端完成: %s (%v)", method, duration)
	
	return err
}

// 使用拦截器
func main() {
	// 服务器
	s := grpc.NewServer(
		grpc.UnaryInterceptor(loggingInterceptor),
	)
	
	// 客户端
	conn, err := grpc.Dial(
		"localhost:50051",
		grpc.WithInsecure(),
		grpc.WithUnaryInterceptor(clientLoggingInterceptor),
	)
}

错误处理

gRPC 使用状态码来表示错误:

import (
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// 服务器端返回错误
func (s *Server) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
	user, ok := s.users[req.Id]
	if !ok {
		return nil, status.Errorf(codes.NotFound, "用户 %d 不存在", req.Id)
	}
	return user, nil
}

// 客户端处理错误
user, err := client.GetUser(ctx, &pb.GetUserRequest{Id: 123})
if err != nil {
	st, ok := status.FromError(err)
	if ok {
		switch st.Code() {
		case codes.NotFound:
			log.Println("用户不存在")
		case codes.InvalidArgument:
			log.Println("参数错误")
		case codes.Unauthenticated:
			log.Println("未认证")
		default:
			log.Printf("错误: %v", err)
		}
	}
}

认证和授权

// 服务器拦截器:验证 Token
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	// 从 metadata 获取 token
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return nil, status.Error(codes.Unauthenticated, "缺少 metadata")
	}
	
	tokens := md.Get("authorization")
	if len(tokens) == 0 {
		return nil, status.Error(codes.Unauthenticated, "缺少 token")
	}
	
	// 验证 token
	token := tokens[0]
	userID, err := validateToken(token)
	if err != nil {
		return nil, status.Error(codes.Unauthenticated, "无效的 token")
	}
	
	// 将用户信息放入 context
	ctx = context.WithValue(ctx, "userID", userID)
	
	return handler(ctx, req)
}

// 客户端:发送 Token
type tokenAuth struct {
	token string
}

func (t *tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"authorization": t.token,
	}, nil
}

func (t *tokenAuth) RequireTransportSecurity() bool {
	return false
}

// 使用
conn, err := grpc.Dial(
	"localhost:50051",
	grpc.WithInsecure(),
	grpc.WithPerRPCCredentials(&tokenAuth{token: "your-token"}),
)

小结

今天我们学习了 gRPC:

  1. 基础概念:gRPC vs REST,Protocol Buffers
  2. 服务定义:编写 .proto 文件,生成代码
  3. 服务器实现:Unary RPC、服务器流、双向流
  4. 客户端调用:同步调用、流式调用
  5. 拦截器:日志、认证等中间件
  6. 错误处理:状态码和错误信息

gRPC 为微服务通信提供了高性能、强类型的解决方案。在构建复杂的分布式系统时,gRPC 是一个值得考虑的选择。

练习时间

  1. 实现一个 gRPC 网关,将 gRPC 服务暴露为 REST API
  2. 创建一个带有认证和授权的 gRPC 服务
  3. 实现 gRPC 服务的服务发现和负载均衡
  4. 使用 gRPC 的反射功能实现动态客户端

我们下篇见!

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页