分布式追踪:跨服务调用链路分析

学习使用 OpenTelemetry 实现分布式追踪,分析跨服务调用链路和性能瓶颈

分布式追踪:跨服务调用链路分析

在微服务架构中,一个用户请求可能经过多个服务。当出现问题时,如何快速定位是哪个环节出了问题?分布式追踪就是解决这个问题的利器。

本文将介绍如何使用 OpenTelemetry 实现分布式追踪。

什么是分布式追踪?

分布式追踪记录请求在多个服务间的完整调用链路,包括:

  • Trace:完整的请求链路
  • Span:链路中的单个操作
  • Context Propagation:跨服务传递追踪信息

OpenTelemetry 简介

OpenTelemetry 是一个统一的观测性框架,提供:

  • Traces:分布式追踪
  • Metrics:指标收集
  • Logs:日志记录

安装依赖

go get go.opentelemetry.io/otel
go get go.opentelemetry.io/otel/sdk
go get go.opentelemetry.io/otel/exporters/jaeger
go get go.opentelemetry.io/otel/trace

基础追踪实现

package main

import (
    "context"
    "log"
    "net/http"
    "time"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
    "go.opentelemetry.io/otel/trace"
)

// initTracer 初始化追踪器
func initTracer() (*sdktrace.TracerProvider, error) {
    // 创建 Jaeger 导出器
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint("http://localhost:14268/api/traces"),
    ))
    if err != nil {
        return nil, err
    }
    
    // 创建 TracerProvider
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("my-service"),
        )),
    )
    
    otel.SetTracerProvider(tp)
    
    return tp, nil
}

func main() {
    tp, err := initTracer()
    if err != nil {
        log.Fatal(err)
    }
    defer tp.Shutdown(context.Background())
    
    tracer := otel.Tracer("main")
    
    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        ctx, span := tracer.Start(r.Context(), "handle-request")
        defer span.End()
        
        // 添加属性
        span.SetAttributes(
            attribute.String("http.method", r.Method),
            attribute.String("http.url", r.URL.String()),
        )
        
        // 模拟处理
        processRequest(ctx)
        
        w.Write([]byte("Hello, World!"))
    })
    
    log.Println("Server starting on :8080")
    http.ListenAndServe(":8080", nil)
}

func processRequest(ctx context.Context) {
    tracer := otel.Tracer("main")
    _, span := tracer.Start(ctx, "process-request")
    defer span.End()
    
    // 模拟工作
    time.Sleep(100 * time.Millisecond)
    
    // 调用数据库
    queryDatabase(ctx)
}

func queryDatabase(ctx context.Context) {
    tracer := otel.Tracer("main")
    _, span := tracer.Start(ctx, "query-database")
    defer span.End()
    
    span.SetAttributes(
        attribute.String("db.system", "mysql"),
        attribute.String("db.statement", "SELECT * FROM users"),
    )
    
    // 模拟数据库查询
    time.Sleep(50 * time.Millisecond)
}

HTTP 中间件集成

package main

import (
    "net/http"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/trace"
)

func tracingMiddleware(next http.Handler) http.Handler {
    tracer := otel.Tracer("http-server")
    propagator := otel.GetTextMapPropagator()
    
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // 从请求头提取追踪上下文
        ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
        
        // 创建新的 span
        ctx, span := tracer.Start(ctx, r.URL.Path,
            trace.WithSpanKind(trace.SpanKindServer),
            trace.WithAttributes(
                attribute.String("http.method", r.Method),
                attribute.String("http.url", r.URL.String()),
                attribute.String("http.host", r.Host),
                attribute.String("http.user_agent", r.UserAgent()),
            ),
        )
        defer span.End()
        
        // 包装 ResponseWriter
        wrapped := &responseWriter{ResponseWriter: w, statusCode: 200}
        
        // 执行请求
        next.ServeHTTP(wrapped, r.WithContext(ctx))
        
        // 记录响应状态
        span.SetAttributes(attribute.Int("http.status_code", wrapped.statusCode))
    })
}

type responseWriter struct {
    http.ResponseWriter
    statusCode int
}

func (rw *responseWriter) WriteHeader(code int) {
    rw.statusCode = code
    rw.ResponseWriter.WriteHeader(code)
}

HTTP 客户端追踪

package main

import (
    "context"
    "io"
    "net/http"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/trace"
)

type tracingTransport struct {
    base http.RoundTripper
}

func (t *tracingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
    tracer := otel.Tracer("http-client")
    propagator := otel.GetTextMapPropagator()
    
    // 创建客户端 span
    ctx, span := tracer.Start(r.Context(), "HTTP "+r.Method,
        trace.WithSpanKind(trace.SpanKindClient),
        trace.WithAttributes(
            attribute.String("http.method", r.Method),
            attribute.String("http.url", r.URL.String()),
        ),
    )
    defer span.End()
    
    // 注入追踪上下文到请求头
    propagator.Inject(ctx, propagation.HeaderCarrier(r.Header))
    
    // 执行请求
    resp, err := t.base.RoundTrip(r.WithContext(ctx))
    if err != nil {
        span.RecordError(err)
        return nil, err
    }
    
    // 记录响应状态
    span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
    
    return resp, nil
}

// 创建带追踪的 HTTP 客户端
func newTracingClient() *http.Client {
    return &http.Client{
        Transport: &tracingTransport{
            base: http.DefaultTransport,
        },
    }
}

// 使用示例
func callExternalService(ctx context.Context) error {
    client := newTracingClient()
    
    req, err := http.NewRequestWithContext(ctx, "GET", "http://external-api.com/data", nil)
    if err != nil {
        return err
    }
    
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    _, err = io.ReadAll(resp.Body)
    return err
}

数据库追踪

package main

import (
    "context"
    "database/sql"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

type tracedDB struct {
    *sql.DB
}

func (db *tracedDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
    tracer := otel.Tracer("database")
    
    ctx, span := tracer.Start(ctx, "db.query",
        trace.WithAttributes(
            attribute.String("db.system", "mysql"),
            attribute.String("db.statement", query),
        ),
    )
    defer span.End()
    
    rows, err := db.DB.QueryContext(ctx, query, args...)
    if err != nil {
        span.RecordError(err)
    }
    
    return rows, err
}

func (db *tracedDB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
    tracer := otel.Tracer("database")
    
    ctx, span := tracer.Start(ctx, "db.exec",
        trace.WithAttributes(
            attribute.String("db.system", "mysql"),
            attribute.String("db.statement", query),
        ),
    )
    defer span.End()
    
    result, err := db.DB.ExecContext(ctx, query, args...)
    if err != nil {
        span.RecordError(err)
    }
    
    return result, err
}

gRPC 追踪

package main

import (
    "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    "google.golang.org/grpc"
)

// 服务端
func main() {
    s := grpc.NewServer(
        grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
        grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
    )
    // ...
}

// 客户端
func main() {
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithInsecure(),
        grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
        grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
    )
    // ...
}

自定义 Span 属性

func processOrder(ctx context.Context, orderID string) error {
    tracer := otel.Tracer("order-service")
    
    ctx, span := tracer.Start(ctx, "process-order",
        trace.WithAttributes(
            attribute.String("order.id", orderID),
            attribute.String("order.type", "standard"),
        ),
    )
    defer span.End()
    
    // 验证订单
    if err := validateOrder(ctx, orderID); err != nil {
        span.RecordError(err)
        span.SetAttributes(attribute.Bool("order.valid", false))
        return err
    }
    
    span.SetAttributes(attribute.Bool("order.valid", true))
    
    // 处理支付
    if err := processPayment(ctx, orderID); err != nil {
        span.RecordError(err)
        return err
    }
    
    // 添加事件
    span.AddEvent("order-processed",
        trace.WithAttributes(
            attribute.String("event.type", "order.completed"),
            attribute.String("order.id", orderID),
        ),
    )
    
    return nil
}

Jaeger 部署

# 使用 Docker 启动 Jaeger
docker run -d \
  --name jaeger \
  -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
  -p 5775:5775/udp \
  -p 6831:6831/udp \
  -p 6832:6832/udp \
  -p 5778:5778 \
  -p 16686:16686 \
  -p 14268:14268 \
  -p 14250:14250 \
  -p 9411:9411 \
  jaegertracing/all-in-one:latest

访问 http://localhost:16686 查看 Jaeger UI。

完整微服务追踪示例

package main

import (
    "context"
    "log"
    "net/http"
    "time"
    
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

func initTracer(serviceName string) (*sdktrace.TracerProvider, error) {
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint("http://localhost:14268/api/traces"),
    ))
    if err != nil {
        return nil, err
    }
    
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String(serviceName),
        )),
    )
    
    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{},
        propagation.Baggage{},
    ))
    
    return tp, nil
}

// API Gateway
func apiGatewayHandler(w http.ResponseWriter, r *http.Request) {
    tracer := otel.Tracer("api-gateway")
    ctx, span := tracer.Start(r.Context(), "api-gateway")
    defer span.End()
    
    // 调用用户服务
    callService(ctx, "http://localhost:8081/user/123")
    
    // 调用订单服务
    callService(ctx, "http://localhost:8082/orders/123")
    
    w.Write([]byte("OK"))
}

func callService(ctx context.Context, url string) {
    tracer := otel.Tracer("api-gateway")
    ctx, span := tracer.Start(ctx, "call-service",
        trace.WithAttributes(attribute.String("service.url", url)),
    )
    defer span.End()
    
    client := newTracingClient()
    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    resp, err := client.Do(req)
    if err != nil {
        span.RecordError(err)
        return
    }
    defer resp.Body.Close()
}

func main() {
    tp, err := initTracer("api-gateway")
    if err != nil {
        log.Fatal(err)
    }
    defer tp.Shutdown(context.Background())
    
    http.HandleFunc("/", tracingMiddleware(http.HandlerFunc(apiGatewayHandler)))
    
    log.Println("API Gateway starting on :8080")
    http.ListenAndServe(":8080", nil)
}

总结

分布式追踪让你能够:

  1. 可视化调用链路:看到请求经过的每个服务
  2. 定位性能瓶颈:找出最慢的环节
  3. 快速排错:定位错误发生的位置
  4. 理解依赖关系:服务间的调用关系

关键概念:

  • Trace:完整的请求链路
  • Span:链路中的单个操作
  • Context Propagation:跨服务传递追踪信息

最佳实践:

  • 在所有入口点创建 span
  • 记录关键属性和事件
  • 使用标准化的命名
  • 设置合理的采样率

记住:在微服务架构中,没有追踪就像在迷宫中蒙眼行走。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页