分布式追踪:跨服务调用链路分析
在微服务架构中,一个用户请求可能经过多个服务。当出现问题时,如何快速定位是哪个环节出了问题?分布式追踪就是解决这个问题的利器。
本文将介绍如何使用 OpenTelemetry 实现分布式追踪。
什么是分布式追踪?
分布式追踪记录请求在多个服务间的完整调用链路,包括:
- Trace:完整的请求链路
- Span:链路中的单个操作
- Context Propagation:跨服务传递追踪信息
OpenTelemetry 简介
OpenTelemetry 是一个统一的观测性框架,提供:
- Traces:分布式追踪
- Metrics:指标收集
- Logs:日志记录
安装依赖
go get go.opentelemetry.io/otel
go get go.opentelemetry.io/otel/sdk
go get go.opentelemetry.io/otel/exporters/jaeger
go get go.opentelemetry.io/otel/trace
基础追踪实现
package main
import (
"context"
"log"
"net/http"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
// initTracer 初始化追踪器
func initTracer() (*sdktrace.TracerProvider, error) {
// 创建 Jaeger 导出器
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint("http://localhost:14268/api/traces"),
))
if err != nil {
return nil, err
}
// 创建 TracerProvider
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("my-service"),
)),
)
otel.SetTracerProvider(tp)
return tp, nil
}
func main() {
tp, err := initTracer()
if err != nil {
log.Fatal(err)
}
defer tp.Shutdown(context.Background())
tracer := otel.Tracer("main")
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "handle-request")
defer span.End()
// 添加属性
span.SetAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
)
// 模拟处理
processRequest(ctx)
w.Write([]byte("Hello, World!"))
})
log.Println("Server starting on :8080")
http.ListenAndServe(":8080", nil)
}
func processRequest(ctx context.Context) {
tracer := otel.Tracer("main")
_, span := tracer.Start(ctx, "process-request")
defer span.End()
// 模拟工作
time.Sleep(100 * time.Millisecond)
// 调用数据库
queryDatabase(ctx)
}
func queryDatabase(ctx context.Context) {
tracer := otel.Tracer("main")
_, span := tracer.Start(ctx, "query-database")
defer span.End()
span.SetAttributes(
attribute.String("db.system", "mysql"),
attribute.String("db.statement", "SELECT * FROM users"),
)
// 模拟数据库查询
time.Sleep(50 * time.Millisecond)
}
HTTP 中间件集成
package main
import (
"net/http"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
func tracingMiddleware(next http.Handler) http.Handler {
tracer := otel.Tracer("http-server")
propagator := otel.GetTextMapPropagator()
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 从请求头提取追踪上下文
ctx := propagator.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
// 创建新的 span
ctx, span := tracer.Start(ctx, r.URL.Path,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
attribute.String("http.host", r.Host),
attribute.String("http.user_agent", r.UserAgent()),
),
)
defer span.End()
// 包装 ResponseWriter
wrapped := &responseWriter{ResponseWriter: w, statusCode: 200}
// 执行请求
next.ServeHTTP(wrapped, r.WithContext(ctx))
// 记录响应状态
span.SetAttributes(attribute.Int("http.status_code", wrapped.statusCode))
})
}
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
HTTP 客户端追踪
package main
import (
"context"
"io"
"net/http"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
type tracingTransport struct {
base http.RoundTripper
}
func (t *tracingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
tracer := otel.Tracer("http-client")
propagator := otel.GetTextMapPropagator()
// 创建客户端 span
ctx, span := tracer.Start(r.Context(), "HTTP "+r.Method,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
),
)
defer span.End()
// 注入追踪上下文到请求头
propagator.Inject(ctx, propagation.HeaderCarrier(r.Header))
// 执行请求
resp, err := t.base.RoundTrip(r.WithContext(ctx))
if err != nil {
span.RecordError(err)
return nil, err
}
// 记录响应状态
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
return resp, nil
}
// 创建带追踪的 HTTP 客户端
func newTracingClient() *http.Client {
return &http.Client{
Transport: &tracingTransport{
base: http.DefaultTransport,
},
}
}
// 使用示例
func callExternalService(ctx context.Context) error {
client := newTracingClient()
req, err := http.NewRequestWithContext(ctx, "GET", "http://external-api.com/data", nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
return err
}
数据库追踪
package main
import (
"context"
"database/sql"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type tracedDB struct {
*sql.DB
}
func (db *tracedDB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
tracer := otel.Tracer("database")
ctx, span := tracer.Start(ctx, "db.query",
trace.WithAttributes(
attribute.String("db.system", "mysql"),
attribute.String("db.statement", query),
),
)
defer span.End()
rows, err := db.DB.QueryContext(ctx, query, args...)
if err != nil {
span.RecordError(err)
}
return rows, err
}
func (db *tracedDB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
tracer := otel.Tracer("database")
ctx, span := tracer.Start(ctx, "db.exec",
trace.WithAttributes(
attribute.String("db.system", "mysql"),
attribute.String("db.statement", query),
),
)
defer span.End()
result, err := db.DB.ExecContext(ctx, query, args...)
if err != nil {
span.RecordError(err)
}
return result, err
}
gRPC 追踪
package main
import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)
// 服务端
func main() {
s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)
// ...
}
// 客户端
func main() {
conn, err := grpc.Dial("localhost:50051",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
)
// ...
}
自定义 Span 属性
func processOrder(ctx context.Context, orderID string) error {
tracer := otel.Tracer("order-service")
ctx, span := tracer.Start(ctx, "process-order",
trace.WithAttributes(
attribute.String("order.id", orderID),
attribute.String("order.type", "standard"),
),
)
defer span.End()
// 验证订单
if err := validateOrder(ctx, orderID); err != nil {
span.RecordError(err)
span.SetAttributes(attribute.Bool("order.valid", false))
return err
}
span.SetAttributes(attribute.Bool("order.valid", true))
// 处理支付
if err := processPayment(ctx, orderID); err != nil {
span.RecordError(err)
return err
}
// 添加事件
span.AddEvent("order-processed",
trace.WithAttributes(
attribute.String("event.type", "order.completed"),
attribute.String("order.id", orderID),
),
)
return nil
}
Jaeger 部署
# 使用 Docker 启动 Jaeger
docker run -d \
--name jaeger \
-e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 14250:14250 \
-p 9411:9411 \
jaegertracing/all-in-one:latest
访问 http://localhost:16686 查看 Jaeger UI。
完整微服务追踪示例
package main
import (
"context"
"log"
"net/http"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
func initTracer(serviceName string) (*sdktrace.TracerProvider, error) {
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint("http://localhost:14268/api/traces"),
))
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
)),
)
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
return tp, nil
}
// API Gateway
func apiGatewayHandler(w http.ResponseWriter, r *http.Request) {
tracer := otel.Tracer("api-gateway")
ctx, span := tracer.Start(r.Context(), "api-gateway")
defer span.End()
// 调用用户服务
callService(ctx, "http://localhost:8081/user/123")
// 调用订单服务
callService(ctx, "http://localhost:8082/orders/123")
w.Write([]byte("OK"))
}
func callService(ctx context.Context, url string) {
tracer := otel.Tracer("api-gateway")
ctx, span := tracer.Start(ctx, "call-service",
trace.WithAttributes(attribute.String("service.url", url)),
)
defer span.End()
client := newTracingClient()
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := client.Do(req)
if err != nil {
span.RecordError(err)
return
}
defer resp.Body.Close()
}
func main() {
tp, err := initTracer("api-gateway")
if err != nil {
log.Fatal(err)
}
defer tp.Shutdown(context.Background())
http.HandleFunc("/", tracingMiddleware(http.HandlerFunc(apiGatewayHandler)))
log.Println("API Gateway starting on :8080")
http.ListenAndServe(":8080", nil)
}
总结
分布式追踪让你能够:
- 可视化调用链路:看到请求经过的每个服务
- 定位性能瓶颈:找出最慢的环节
- 快速排错:定位错误发生的位置
- 理解依赖关系:服务间的调用关系
关键概念:
- Trace:完整的请求链路
- Span:链路中的单个操作
- Context Propagation:跨服务传递追踪信息
最佳实践:
- 在所有入口点创建 span
- 记录关键属性和事件
- 使用标准化的命名
- 设置合理的采样率
记住:在微服务架构中,没有追踪就像在迷宫中蒙眼行走。
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。