gRPC流式通信实战:双向流、服务端推送与实时数据处理

深入讲解gRPC四种通信模式,涵盖Unary、Server Streaming、Client Streaming与Bidirectional Streaming,提供Go语言的完整实战代码,包括实时聊天、日志流、文件上传等场景。

引言

gRPC不仅支持传统的请求-响应模式,还提供了强大的流式通信能力。流式RPC在实时数据推送、大文件传输、双向通信等场景中具有显著优势。

gRPC四种通信模式

gRPC通信模式对比:
┌─────────────────────────────────────────┐
│ 1. Unary RPC(一元RPC)                  │
│    Client ──── Request ────▶ Server      │
│    Client ◀─── Response ──── Server      │
│    最常用,类似HTTP请求                  │
│                                         │
│ 2. Server Streaming(服务端流)          │
│    Client ──── Request ────▶ Server      │
│    Client ◀─── Stream ────── Server      │
│    服务端持续推送数据                    │
│                                         │
│ 3. Client Streaming(客户端流)          │
│    Client ──── Stream ─────▶ Server      │
│    Client ◀─── Response ──── Server      │
│    客户端持续上传数据                    │
│                                         │
│ 4. Bidirectional Streaming(双向流)     │
│    Client ◀──▶ Stream ◀──▶  Server       │
│    双方独立地读写流                      │
│    最灵活,最复杂                        │
└─────────────────────────────────────────┘

Protocol Buffer定义

syntax = "proto3";

package streamservice;

option go_package = "github.com/example/streamservice";

// 一元RPC
service ChatService {
  rpc SendMessage(Message) returns (MessageAck);
  
  // 服务端流:订阅频道消息
  rpc SubscribeChannel(ChannelRequest) returns (stream Message);
  
  // 客户端流:批量发送消息
  rpc SendMessages(stream Message) returns (BatchAck);
  
  // 双向流:实时聊天
  rpc Chat(stream Message) returns (stream Message);
}

// 日志服务
service LogService {
  // 服务端流:实时日志
  rpc StreamLogs(LogRequest) returns (stream LogEntry);
  
  // 客户端流:批量上报日志
  rpc UploadLogs(stream LogEntry) returns (UploadResult);
}

// 文件服务
service FileService {
  // 客户端流:上传文件
  rpc UploadFile(stream FileChunk) returns (UploadResult);
  
  // 服务端流:下载文件
  rpc DownloadFile(FileRequest) returns (stream FileChunk);
  
  // 双向流:断点续传
  rpc TransferFile(stream FileTransferMsg) returns (stream FileTransferMsg);
}

message Message {
  string id = 1;
  string channel_id = 2;
  string user_id = 3;
  string content = 4;
  int64 timestamp = 5;
  MessageType type = 6;
}

enum MessageType {
  TEXT = 0;
  IMAGE = 1;
  FILE = 2;
  SYSTEM = 3;
}

message MessageAck {
  string message_id = 1;
  bool success = 2;
  string error = 3;
}

message BatchAck {
  int32 total_received = 1;
  int32 total_success = 2;
  repeated string failed_ids = 3;
}

message ChannelRequest {
  string channel_id = 1;
  string user_id = 2;
  int64 since_timestamp = 3;
}

message LogRequest {
  string service_name = 1;
  LogLevel min_level = 2;
  string pattern = 3;
}

enum LogLevel {
  DEBUG = 0;
  INFO = 1;
  WARN = 2;
  ERROR = 3;
}

message LogEntry {
  string service_name = 1;
  LogLevel level = 2;
  string message = 3;
  int64 timestamp = 4;
  map<string, string> metadata = 5;
}

message UploadResult {
  bool success = 1;
  int64 total_received = 2;
  string file_id = 3;
}

message FileChunk {
  string file_id = 1;
  int64 offset = 2;
  bytes data = 3;
  bool is_last = 4;
  FileMeta meta = 5;
}

message FileMeta {
  string filename = 1;
  int64 total_size = 2;
  string content_type = 3;
  string checksum = 4;
}

message FileRequest {
  string file_id = 1;
  int64 offset = 2;
  int32 chunk_size = 3;
}

message FileTransferMsg {
  oneof payload {
    TransferRequest request = 1;
    FileChunk chunk = 2;
    TransferAck ack = 3;
    TransferComplete complete = 4;
  }
}

message TransferRequest {
  string file_id = 1;
  int64 offset = 2;
  Direction direction = 3;
}

enum Direction {
  UPLOAD = 0;
  DOWNLOAD = 1;
}

message TransferAck {
  int64 received_offset = 1;
  bool success = 2;
}

message TransferComplete {
  string file_id = 1;
  string checksum = 2;
}

服务端流实现

实时消息订阅

package server

import (
    "fmt"
    "log"
    "sync"
    "time"
    
    pb "github.com/example/streamservice/proto"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

type ChatServer struct {
    pb.UnimplementedChatServiceServer
    
    mu          sync.RWMutex
    channels    map[string][]chan *pb.Message
    messageStore map[string][]*pb.Message
}

func NewChatServer() *ChatServer {
    return &ChatServer{
        channels:     make(map[string][]chan *pb.Message),
        messageStore: make(map[string][]*pb.Message),
    }
}

// 服务端流:订阅频道消息
func (s *ChatServer) SubscribeChannel(req *pb.ChannelRequest, stream pb.ChatService_SubscribeChannelServer) error {
    channelID := req.ChannelId
    userID := req.UserId
    
    log.Printf("User %s subscribing to channel %s", userID, channelID)
    
    // 创建消息通道
    msgChan := make(chan *pb.Message, 100)
    
    s.mu.Lock()
    s.channels[channelID] = append(s.channels[channelID], msgChan)
    s.mu.Unlock()
    
    // 确保退出时清理
    defer func() {
        s.mu.Lock()
        chans := s.channels[channelID]
        for i, ch := range chans {
            if ch == msgChan {
                s.channels[channelID] = append(chans[:i], chans[i+1:]...)
                break
            }
        }
        s.mu.Unlock()
        close(msgChan)
        log.Printf("User %s unsubscribed from channel %s", userID, channelID)
    }()
    
    // 先发送历史消息
    s.mu.RLock()
    if messages, ok := s.messageStore[channelID]; ok {
        for _, msg := range messages {
            if msg.Timestamp > req.SinceTimestamp {
                if err := stream.Send(msg); err != nil {
                    s.mu.RUnlock()
                    return err
                }
            }
        }
    }
    s.mu.RUnlock()
    
    // 持续监听新消息
    for {
        select {
        case msg, ok := <-msgChan:
            if !ok {
                return nil
            }
            if err := stream.Send(msg); err != nil {
                return err
            }
        case <-stream.Context().Done():
            return stream.Context().Err()
        }
    }
}

// 发送消息(会触发流推送)
func (s *ChatServer) SendMessage(ctx context.Context, msg *pb.Message) (*pb.MessageAck, error) {
    msg.Id = generateID()
    msg.Timestamp = time.Now().UnixMilli()
    
    // 存储消息
    s.mu.Lock()
    s.messageStore[msg.ChannelId] = append(s.messageStore[msg.ChannelId], msg)
    
    // 广播给所有订阅者
    if chans, ok := s.channels[msg.ChannelId]; ok {
        for _, ch := range chans {
            select {
            case ch <- msg:
            default:
                log.Printf("Warning: subscriber channel full, dropping message")
            }
        }
    }
    s.mu.Unlock()
    
    return &pb.MessageAck{
        MessageId: msg.Id,
        Success:   true,
    }, nil
}

实时日志流

type LogServer struct {
    pb.UnimplementedLogServiceServer
    
    logCollector *LogCollector
}

func (s *LogServer) StreamLogs(req *pb.LogRequest, stream pb.LogService_StreamLogsServer) error {
    serviceName := req.ServiceName
    minLevel := req.MinLevel
    pattern := req.Pattern
    
    log.Printf("Streaming logs for service=%s, minLevel=%v, pattern=%s",
        serviceName, minLevel, pattern)
    
    // 创建日志过滤器
    filter := &LogFilter{
        ServiceName: serviceName,
        MinLevel:    minLevel,
        Pattern:     pattern,
    }
    
    // 订阅日志流
    logChan := s.logCollector.Subscribe(filter)
    defer s.logCollector.Unsubscribe(filter, logChan)
    
    for {
        select {
        case entry := <-logChan:
            if err := stream.Send(entry); err != nil {
                return err
            }
        case <-stream.Context().Done():
            return nil
        }
    }
}

type LogCollector struct {
    mu          sync.RWMutex
    subscribers map[*LogFilter]chan *pb.LogEntry
}

type LogFilter struct {
    ServiceName string
    MinLevel    pb.LogLevel
    Pattern     string
}

func (c *LogCollector) Subscribe(filter *LogFilter) chan *pb.LogEntry {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    ch := make(chan *pb.LogEntry, 256)
    c.subscribers[filter] = ch
    return ch
}

func (c *LogCollector) Unsubscribe(filter *LogFilter, ch chan *pb.LogEntry) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    delete(c.subscribers, filter)
    close(ch)
}

func (c *LogCollector) Collect(entry *pb.LogEntry) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    
    for filter, ch := range c.subscribers {
        if matchesFilter(entry, filter) {
            select {
            case ch <- entry:
            default:
                // 通道满,丢弃
            }
        }
    }
}

func matchesFilter(entry *pb.LogEntry, filter *LogFilter) bool {
    if filter.ServiceName != "" && entry.ServiceName != filter.ServiceName {
        return false
    }
    if entry.Level < filter.MinLevel {
        return false
    }
    if filter.Pattern != "" {
        matched, _ := regexp.MatchString(filter.Pattern, entry.Message)
        return matched
    }
    return true
}

客户端流实现

批量消息发送

func (s *ChatServer) SendMessages(stream pb.ChatService_SendMessagesServer) error {
    var totalReceived int32
    var totalSuccess int32
    var failedIDs []string
    
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.BatchAck{
                TotalReceived: totalReceived,
                TotalSuccess:  totalSuccess,
                FailedIds:     failedIDs,
            })
        }
        if err != nil {
            return err
        }
        
        totalReceived++
        
        // 处理消息
        msg.Id = generateID()
        msg.Timestamp = time.Now().UnixMilli()
        
        if err := s.processMessage(msg); err != nil {
            failedIDs = append(failedIDs, msg.Id)
            log.Printf("Failed to process message: %v", err)
            continue
        }
        
        totalSuccess++
    }
}

日志批量上传

func (s *LogServer) UploadLogs(stream pb.LogService_UploadLogsServer) error {
    var totalReceived int64
    batch := make([]*pb.LogEntry, 0, 100)
    
    for {
        entry, err := stream.Recv()
        if err == io.EOF {
            // 处理剩余的批次
            if len(batch) > 0 {
                if err := s.flushBatch(batch); err != nil {
                    log.Printf("Failed to flush final batch: %v", err)
                }
            }
            
            return stream.SendAndClose(&pb.UploadResult{
                Success:       true,
                TotalReceived: totalReceived,
            })
        }
        if err != nil {
            return err
        }
        
        totalReceived++
        batch = append(batch, entry)
        
        // 达到批次大小,刷盘
        if len(batch) >= 100 {
            if err := s.flushBatch(batch); err != nil {
                log.Printf("Failed to flush batch: %v", err)
            }
            batch = batch[:0]
        }
    }
}

func (s *LogServer) flushBatch(batch []*pb.LogEntry) error {
    // 批量写入数据库或Elasticsearch
    return s.logStorage.BulkInsert(batch)
}

文件上传

func (s *FileServer) UploadFile(stream pb.FileService_UploadFileServer) error {
    var fileID string
    var file *os.File
    var totalBytes int64
    var meta *pb.FileMeta
    
    for {
        chunk, err := stream.Recv()
        if err == io.EOF {
            file.Close()
            
            // 验证文件大小
            if meta != nil && totalBytes != meta.TotalSize {
                os.Remove(file.Name())
                return status.Errorf(codes.DataLoss,
                    "file size mismatch: expected %d, got %d",
                    meta.TotalSize, totalBytes)
            }
            
            // 验证校验和
            if meta != nil && meta.Checksum != "" {
                actualChecksum := calculateChecksum(file.Name())
                if actualChecksum != meta.Checksum {
                    os.Remove(file.Name())
                    return status.Errorf(codes.DataLoss,
                        "checksum mismatch: expected %s, got %s",
                        meta.Checksum, actualChecksum)
                }
            }
            
            return stream.SendAndClose(&pb.UploadResult{
                Success:       true,
                TotalReceived: totalBytes,
                FileId:        fileID,
            })
        }
        if err != nil {
            return err
        }
        
        // 第一个chunk包含文件元信息
        if file == nil {
            fileID = chunk.FileId
            if fileID == "" {
                fileID = generateID()
            }
            
            if chunk.Meta != nil {
                meta = chunk.Meta
            }
            
            filePath := fmt.Sprintf("/storage/files/%s", fileID)
            file, err = os.Create(filePath)
            if err != nil {
                return status.Errorf(codes.Internal, "failed to create file: %v", err)
            }
        }
        
        // 写入数据
        n, err := file.WriteAt(chunk.Data, chunk.Offset)
        if err != nil {
            file.Close()
            os.Remove(file.Name())
            return status.Errorf(codes.Internal, "failed to write chunk: %v", err)
        }
        
        totalBytes += int64(n)
        
        if chunk.IsLast {
            file.Close()
            return stream.SendAndClose(&pb.UploadResult{
                Success:       true,
                TotalReceived: totalBytes,
                FileId:        fileID,
            })
        }
    }
}

双向流实现

实时聊天

func (s *ChatServer) Chat(stream pb.ChatService_ChatServer) error {
    // 获取用户信息(从metadata)
    md, ok := metadata.FromIncomingContext(stream.Context())
    if !ok {
        return status.Error(codes.Unauthenticated, "missing metadata")
    }
    
    userID := md.Get("user_id")[0]
    channelID := md.Get("channel_id")[0]
    
    log.Printf("User %s joined chat in channel %s", userID, channelID)
    
    // 注册到频道
    msgChan := make(chan *pb.Message, 100)
    s.mu.Lock()
    s.channels[channelID] = append(s.channels[channelID], msgChan)
    s.mu.Unlock()
    
    // 发送加入通知
    joinMsg := &pb.Message{
        Id:        generateID(),
        ChannelId: channelID,
        UserId:    "system",
        Content:   fmt.Sprintf("User %s joined the chat", userID),
        Timestamp: time.Now().UnixMilli(),
        Type:      pb.MessageType_SYSTEM,
    }
    
    s.broadcastToChannel(channelID, joinMsg)
    
    // 退出时清理
    defer func() {
        s.mu.Lock()
        chans := s.channels[channelID]
        for i, ch := range chans {
            if ch == msgChan {
                s.channels[channelID] = append(chans[:i], chans[i+1:]...)
                break
            }
        }
        s.mu.Unlock()
        close(msgChan)
        
        // 发送离开通知
        leaveMsg := &pb.Message{
            Id:        generateID(),
            ChannelId: channelID,
            UserId:    "system",
            Content:   fmt.Sprintf("User %s left the chat", userID),
            Timestamp: time.Now().UnixMilli(),
            Type:      pb.MessageType_SYSTEM,
        }
        s.broadcastToChannel(channelID, leaveMsg)
        
        log.Printf("User %s left chat in channel %s", userID, channelID)
    }()
    
    // 双向通信
    errChan := make(chan error, 2)
    
    // 接收消息
    go func() {
        for {
            msg, err := stream.Recv()
            if err != nil {
                errChan <- err
                return
            }
            
            msg.Id = generateID()
            msg.UserId = userID
            msg.Timestamp = time.Now().UnixMilli()
            
            s.broadcastToChannel(channelID, msg)
        }
    }()
    
    // 发送消息
    go func() {
        for msg := range msgChan {
            if err := stream.Send(msg); err != nil {
                errChan <- err
                return
            }
        }
    }()
    
    // 等待错误或上下文取消
    select {
    case err := <-errChan:
        if err == io.EOF {
            return nil
        }
        return err
    case <-stream.Context().Done():
        return stream.Context().Err()
    }
}

客户端调用示例

订阅消息流

package main

import (
    "context"
    "io"
    "log"
    
    pb "github.com/example/streamservice/proto"
    "google.golang.org/grpc"
)

func subscribeMessages() {
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    
    client := pb.NewChatServiceClient(conn)
    
    stream, err := client.SubscribeChannel(context.Background(), &pb.ChannelRequest{
        ChannelId:      "general",
        UserId:         "user123",
        SinceTimestamp: 0,
    })
    if err != nil {
        log.Fatal(err)
    }
    
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("Error receiving message: %v", err)
            break
        }
        
        fmt.Printf("[%s] %s: %s\n", 
            time.UnixMilli(msg.Timestamp).Format("15:04:05"),
            msg.UserId,
            msg.Content)
    }
}

双向聊天客户端

func chatClient() {
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    
    client := pb.NewChatServiceClient(conn)
    
    ctx := metadata.AppendToOutgoingContext(context.Background(),
        "user_id", "user123",
        "channel_id", "general")
    
    stream, err := client.Chat(ctx)
    if err != nil {
        log.Fatal(err)
    }
    
    // 接收消息
    go func() {
        for {
            msg, err := stream.Recv()
            if err != nil {
                log.Printf("Error: %v", err)
                return
            }
            fmt.Printf("[%s] %s: %s\n",
                time.UnixMilli(msg.Timestamp).Format("15:04:05"),
                msg.UserId, msg.Content)
        }
    }()
    
    // 发送消息(从stdin读取)
    scanner := bufio.NewScanner(os.Stdin)
    for scanner.Scan() {
        msg := &pb.Message{
            Content: scanner.Text(),
            Type:    pb.MessageType_TEXT,
        }
        if err := stream.Send(msg); err != nil {
            log.Printf("Send error: %v", err)
            break
        }
    }
}

流式通信最佳实践

流式通信最佳实践:
┌─────────────────────────────────────────┐
│ 1. 背压控制                              │
│    - 使用带缓冲的channel                 │
│    - 监控发送队列长度                     │
│    - 队列满时丢弃或断开连接               │
│                                         │
│ 2. 心跳保活                              │
│    - 定期发送ping/pong                   │
│    - 检测连接是否存活                     │
│    - 超时自动断开                         │
│                                         │
│ 3. 流量控制                              │
│    - 使用gRPC窗口大小控制                 │
│    - 限制单个消息大小                     │
│    - 限制并发流数量                       │
│                                         │
│ 4. 错误处理                              │
│    - 优雅处理stream关闭                  │
│    - 支持断线重连                         │
│    - 记录详细的错误日志                   │
│                                         │
│ 5. 资源清理                              │
│    - 使用defer确保资源释放               │
│    - 监控活跃流数量                       │
│    - 设置合理的超时时间                   │
│                                         │
│ 6. 消息确认                              │
│    - 重要消息需要ACK                      │
│    - 支持消息重传                         │
│    - 记录消息序列号                       │
└─────────────────────────────────────────┘

总结

gRPC流式通信选型

模式适用场景复杂度性能
Unary简单请求-响应
Server Stream实时推送、日志流
Client Stream批量上传、数据采集
Bidirectional聊天、协作编辑

关键原则

  1. 选择合适的模式:不要过度使用双向流
  2. 实现背压控制:防止慢消费者拖垮系统
  3. 心跳保活:检测并清理死连接
  4. 优雅关闭:确保资源正确释放
  5. 监控指标:跟踪活跃流数量、消息吞吐量
  6. 错误恢复:支持断线重连和消息重传

延伸阅读

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页