引言
在分布式系统中,多个节点可能需要互斥访问共享资源。分布式锁是实现这种互斥访问的关键机制,但其实现比单机锁复杂得多,需要考虑网络分区、节点故障等问题。
本文将系统介绍分布式锁的设计原理和主流实现方案。
分布式锁的核心要求
理想的分布式锁应满足:
1. 互斥性(Mutual Exclusion)
- 同一时刻只有一个客户端持有锁
2. 不会死锁(Deadlock Free)
- 客户端崩溃或网络异常时,锁能自动释放
3. 容错性(Fault Tolerance)
- 部分节点故障时,锁服务仍可用
4. 性能(Performance)
- 加锁/解锁操作延迟低
- 支持高并发竞争
Redis分布式锁
基础实现(单节点)
type RedisLock struct {
client *redis.Client
key string
value string
expire time.Duration
}
func NewRedisLock(client *redis.Client, key string, expire time.Duration) *RedisLock {
return &RedisLock{
client: client,
key: key,
value: uuid.New().String(), // 唯一标识,防止误删
expire: expire,
}
}
func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
// SET key value NX PX expire
// NX: 仅当key不存在时设置
// PX: 毫秒级过期时间
ok, err := l.client.SetNX(ctx, l.key, l.value, l.expire).Result()
if err != nil {
return false, err
}
return ok, nil
}
func (l *RedisLock) Unlock(ctx context.Context) error {
// 使用Lua脚本确保原子性:只删除自己持有的锁
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
result, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Int()
if err != nil {
return err
}
if result == 0 {
return fmt.Errorf("lock not held or already expired")
}
return nil
}
// 使用示例
func processOrder(ctx context.Context, orderID string) error {
lock := NewRedisLock(redisClient, fmt.Sprintf("lock:order:%s", orderID), 30*time.Second)
locked, err := lock.Lock(ctx)
if err != nil {
return err
}
if !locked {
return fmt.Errorf("failed to acquire lock")
}
defer lock.Unlock(ctx)
// 处理订单逻辑
return doProcessOrder(ctx, orderID)
}
锁续期(Watchdog机制)
type RedisLockWithWatchdog struct {
*RedisLock
stopWatchdog chan struct{}
}
func (l *RedisLockWithWatchdog) Lock(ctx context.Context) (bool, error) {
locked, err := l.RedisLock.Lock(ctx)
if err != nil || !locked {
return locked, err
}
// 启动看门狗,定期续期
l.stopWatchdog = make(chan struct{})
go l.watchdog(ctx)
return true, nil
}
func (l *RedisLockWithWatchdog) watchdog(ctx context.Context) {
// 每过期时间的1/3续期一次
ticker := time.NewTicker(l.expire / 3)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 续期
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
else
return 0
end
`
l.client.Eval(ctx, script, []string{l.key}, l.value, l.expire.Milliseconds())
case <-l.stopWatchdog:
return
case <-ctx.Done():
return
}
}
}
func (l *RedisLockWithWatchdog) Unlock(ctx context.Context) error {
// 停止看门狗
if l.stopWatchdog != nil {
close(l.stopWatchdog)
}
return l.RedisLock.Unlock(ctx)
}
Redlock算法(多节点)
// Redlock实现:多个独立Redis节点
type Redlock struct {
nodes []*redis.Client
key string
value string
expire time.Duration
quorum int // 需要获得锁的最小节点数
}
func NewRedlock(nodes []*redis.Client, key string, expire time.Duration) *Redlock {
return &Redlock{
nodes: nodes,
key: key,
value: uuid.New().String(),
expire: expire,
quorum: len(nodes)/2 + 1, // 多数派
}
}
func (r *Redlock) Lock(ctx context.Context) (bool, error) {
startTime := time.Now()
// 尝试在所有节点上加锁
successCount := 0
for _, node := range r.nodes {
locked, err := node.SetNX(ctx, r.key, r.value, r.expire).Result()
if err == nil && locked {
successCount++
}
}
// 计算实际有效时间
elapsedTime := time.Since(startTime)
validTime := r.expire - elapsedTime
// 检查是否达到quorum且有效时间足够
if successCount >= r.quorum && validTime > 0 {
return true, nil
}
// 未达到quorum,释放所有已获得的锁
r.Unlock(ctx)
return false, nil
}
func (r *Redlock) Unlock(ctx context.Context) {
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
// 尝试在所有节点上释放锁
for _, node := range r.nodes {
node.Eval(ctx, script, []string{r.key}, r.value)
}
}
Redlock争议:Martin Kleppmann指出Redlock在时钟跳跃、GC暂停等场景下可能失效。Antirez(Redis作者)进行了反驳。实际使用中需要权衡。
ZooKeeper分布式锁
临时顺序节点实现
type ZKLock struct {
conn *zk.Conn
lockPath string
lockNode string
}
func NewZKLock(conn *zk.Conn, lockPath string) *ZKLock {
return &ZKLock{
conn: conn,
lockPath: lockPath,
}
}
func (l *ZKLock) Lock(ctx context.Context) error {
// 确保锁目录存在
exists, _, err := l.conn.Exists(l.lockPath)
if err != nil {
return err
}
if !exists {
l.conn.Create(l.lockPath, []byte{}, 0, zk.WorldACL(zk.PermAll))
}
// 创建临时顺序节点
node, err := l.conn.CreateProtectedEphemeralSequential(
l.lockPath+"/lock-",
[]byte{},
zk.WorldACL(zk.PermAll),
)
if err != nil {
return err
}
l.lockNode = node
// 检查是否获得锁
return l.tryAcquireLock(ctx)
}
func (l *ZKLock) tryAcquireLock(ctx context.Context) error {
for {
// 获取所有子节点
children, _, err := l.conn.Children(l.lockPath)
if err != nil {
return err
}
// 排序
sort.Strings(children)
// 检查自己是否是最小节点
myNodeName := path.Base(l.lockNode)
if len(children) > 0 && children[0] == myNodeName {
// 获得锁
return nil
}
// 找到前一个节点
var prevNode string
for i, child := range children {
if child == myNodeName && i > 0 {
prevNode = children[i-1]
break
}
}
if prevNode == "" {
return fmt.Errorf("previous node not found")
}
// 监听前一个节点的删除事件
prevPath := l.lockPath + "/" + prevNode
watch, _, err := l.conn.ExistsW(prevPath)
if err != nil {
return err
}
select {
case <-watch:
// 前一个节点被删除,重新检查
continue
case <-ctx.Done():
return ctx.Err()
}
}
}
func (l *ZKLock) Unlock() error {
if l.lockNode != "" {
return l.conn.Delete(l.lockNode, -1)
}
return nil
}
优点:
- 强一致性(ZAB协议)
- 自动释放(会话超时)
- 避免羊群效应(只监听前一个节点)
缺点:
- 性能低于Redis
- 需要维护ZooKeeper集群
etcd分布式锁
基于Lease的实现
type EtcdLock struct {
client *clientv3.Client
key string
lease clientv3.Lease
leaseID clientv3.LeaseID
kv clientv3.KV
}
func NewEtcdLock(client *clientv3.Client, key string, ttl int64) (*EtcdLock, error) {
lease := clientv3.NewLease(client)
// 创建租约
leaseResp, err := lease.Grant(context.Background(), ttl)
if err != nil {
return nil, err
}
return &EtcdLock{
client: client,
key: key,
lease: lease,
leaseID: leaseResp.ID,
kv: clientv3.NewKV(client),
}, nil
}
func (l *EtcdLock) Lock(ctx context.Context) error {
// 使用事务确保原子性
cmp := clientv3.Compare(clientv3.CreateRevision(l.key), "=", 0)
// 如果key不存在,创建它
put := clientv3.OpPut(l.key, "locked", clientv3.WithLease(l.leaseID))
// 如果key已存在,获取当前值
get := clientv3.OpGet(l.key)
resp, err := l.kv.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
// 锁已被占用,等待释放
return l.waitForLock(ctx)
}
// 启动keepalive
keepAliveCh, err := l.lease.KeepAlive(ctx, l.leaseID)
if err != nil {
return err
}
go func() {
for range keepAliveCh {
// 消费keepalive响应
}
}()
return nil
}
func (l *EtcdLock) waitForLock(ctx context.Context) error {
// 监听key的删除事件
watchCh := l.client.Watch(ctx, l.key)
for watchResp := range watchCh {
for _, event := range watchResp.Events {
if event.Type == mvccpb.DELETE {
// 锁被释放,尝试获取
return l.Lock(ctx)
}
}
}
return ctx.Err()
}
func (l *EtcdLock) Unlock(ctx context.Context) error {
// 删除key
_, err := l.kv.Delete(ctx, l.key)
if err != nil {
return err
}
// 释放租约
_, err = l.lease.Revoke(ctx, l.leaseID)
return err
}
使用etcd官方lock库
import "go.etcd.io/etcd/client/v3/concurrency"
type EtcdLockV2 struct {
client *clientv3.Client
session *concurrency.Session
mutex *concurrency.Mutex
}
func NewEtcdLockV2(client *clientv3.Client, key string) (*EtcdLockV2, error) {
// 创建会话(带TTL)
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
return nil, err
}
return &EtcdLockV2{
client: client,
session: session,
mutex: concurrency.NewMutex(session, key),
}, nil
}
func (l *EtcdLockV2) Lock(ctx context.Context) error {
return l.mutex.Lock(ctx)
}
func (l *EtcdLockV2) Unlock(ctx context.Context) error {
err := l.mutex.Unlock(ctx)
l.session.Close()
return err
}
性能对比
func BenchmarkDistributedLocks(b *testing.B) {
// Redis锁
redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
b.Run("Redis", func(b *testing.B) {
for i := 0; i < b.N; i++ {
lock := NewRedisLock(redisClient, "bench:lock", 10*time.Second)
lock.Lock(context.Background())
lock.Unlock(context.Background())
}
})
// ZooKeeper锁
zkConn, _, _ := zk.Connect([]string{"localhost:2181"}, time.Second)
b.Run("ZooKeeper", func(b *testing.B) {
for i := 0; i < b.N; i++ {
lock := NewZKLock(zkConn, "/bench/lock")
lock.Lock(context.Background())
lock.Unlock()
}
})
// etcd锁
etcdClient, _ := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}})
b.Run("etcd", func(b *testing.B) {
for i := 0; i < b.N; i++ {
lock, _ := NewEtcdLockV2(etcdClient, "/bench/lock")
lock.Lock(context.Background())
lock.Unlock(context.Background())
}
})
}
// 结果(单次加锁+解锁):
// Redis: ~2-5 ms
// ZooKeeper: ~10-20 ms
// etcd: ~5-10 ms
最佳实践
锁粒度设计
// 粗粒度锁(性能差,但简单)
func processAllOrders() {
lock := NewRedisLock(client, "lock:orders", 30*time.Second)
lock.Lock(ctx)
defer lock.Unlock(ctx)
// 处理所有订单
}
// 细粒度锁(性能好,但复杂)
func processOrder(orderID string) {
lock := NewRedisLock(client, fmt.Sprintf("lock:order:%s", orderID), 30*time.Second)
lock.Lock(ctx)
defer lock.Unlock(ctx)
// 只处理特定订单
}
超时与重试
func acquireLockWithRetry(ctx context.Context, lock *RedisLock, maxRetries int) error {
for i := 0; i < maxRetries; i++ {
locked, err := lock.Lock(ctx)
if err != nil {
return err
}
if locked {
return nil
}
// 指数退避重试
backoff := time.Duration(math.Pow(2, float64(i))) * 100 * time.Millisecond
select {
case <-time.After(backoff):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("failed to acquire lock after %d retries", maxRetries)
}
总结
分布式锁方案选择:
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| Redis | 最终一致 | 高 | 中 | 高并发、允许短暂不一致 |
| ZooKeeper | 强一致 | 中 | 高 | 对一致性要求高 |
| etcd | 强一致 | 中高 | 中 | Kubernetes生态、云原生 |
关键原则:
- 锁必须设置超时,防止死锁
- 使用唯一标识,防止误删
- 考虑网络分区和节点故障
- 根据业务场景选择合适的方案
延伸阅读
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。