引言
分布式共识是分布式系统中最基础也最重要的问题之一。它确保多个节点在存在网络分区和节点故障的情况下,仍然能够对某个值达成一致。本文将深入探讨Raft和Paxos两大共识算法。
共识问题的本质
FLP不可能定理
FLP Impossibility Theorem (1985):
在异步网络模型中,即使只有一个节点可能发生故障,
也不存在能够保证终止性的确定性共识算法。
这意味着:
- 安全性(Safety):可以保证(不会达成错误的共识)
- 活性(Liveness):无法保证(可能永远无法达成共识)
实际解决方案:
- 引入随机性
- 使用超时机制(部分同步模型)
- 放宽终止性要求
共识算法的核心要求
1. 终止性(Termination)
- 所有正确节点最终都能决定一个值
2. 一致性(Agreement)
- 所有正确节点决定相同的值
3. 有效性(Validity)
- 决定的值必须是某个节点提议的值
4. 完整性(Integrity)
- 每个节点最多决定一次值
Raft算法详解
角色与状态
Raft中的三种角色:
┌─────────────┐
│ Leader │ 处理所有客户端请求
│ (领导者) │ 管理日志复制
└──────┬──────┘
│
│ 发送心跳和日志
│
▼
┌─────────────┐ ┌─────────────┐
│ Follower │ │ Follower │
│ (跟随者) │ │ (跟随者) │
└─────────────┘ └─────────────┘
状态转换:
Follower → Candidate(超时未收到心跳)
Candidate → Leader(获得多数票)
Candidate → Follower(发现更高任期)
Leader → Follower(发现更高任期)
Leader选举
// raft/election.go
package raft
import (
"math/rand"
"time"
)
type NodeState int
const (
Follower NodeState = iota
Candidate
Leader
)
type RaftNode struct {
id string
state NodeState
currentTerm int
votedFor string
log []LogEntry
// 选举相关
electionTimeout time.Duration
lastHeartbeat time.Time
votesReceived map[string]bool
// Leader相关
nextIndex map[string]int // 每个follower的下一个日志索引
matchIndex map[string]int // 每个follower已复制的日志索引
}
func (rn *RaftNode) StartElection() {
rn.state = Candidate
rn.currentTerm++
rn.votedFor = rn.id
rn.votesReceived = map[string]bool{rn.id: true}
// 重置选举超时
rn.resetElectionTimer()
log.Printf("Node %s starting election for term %d", rn.id, rn.currentTerm)
// 发送RequestVote RPC给所有其他节点
for _, peer := range rn.peers {
go rn.sendRequestVote(peer)
}
}
type RequestVoteArgs struct {
Term int
CandidateID string
LastLogIndex int
LastLogTerm int
}
type RequestVoteReply struct {
Term int
VoteGranted bool
}
func (rn *RaftNode) sendRequestVote(peer string) {
args := RequestVoteArgs{
Term: rn.currentTerm,
CandidateID: rn.id,
LastLogIndex: len(rn.log) - 1,
LastLogTerm: rn.log[len(rn.log)-1].Term,
}
var reply RequestVoteReply
err := rn.rpcClient.Call(peer, "Raft.RequestVote", &args, &reply)
if err != nil {
log.Printf("Failed to send RequestVote to %s: %v", peer, err)
return
}
// 检查任期
if reply.Term > rn.currentTerm {
rn.convertToFollower(reply.Term)
return
}
if reply.VoteGranted {
rn.mu.Lock()
rn.votesReceived[peer] = true
// 检查是否获得多数票
if len(rn.votesReceived) > len(rn.peers)/2+1 {
rn.convertToLeader()
}
rn.mu.Unlock()
}
}
func (rn *RaftNode) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error {
rn.mu.Lock()
defer rn.mu.Unlock()
reply.Term = rn.currentTerm
reply.VoteGranted = false
// 如果请求者的任期更小,拒绝
if args.Term < rn.currentTerm {
return nil
}
// 如果请求者的任期更大,转换为follower
if args.Term > rn.currentTerm {
rn.convertToFollower(args.Term)
}
// 检查是否已经投票给其他节点
if rn.votedFor == "" || rn.votedFor == args.CandidateID {
// 检查候选人的日志是否至少和自己一样新
lastLogIndex := len(rn.log) - 1
lastLogTerm := rn.log[lastLogIndex].Term
if args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex) {
rn.votedFor = args.CandidateID
reply.VoteGranted = true
rn.resetElectionTimer()
}
}
return nil
}
func (rn *RaftNode) convertToLeader() {
log.Printf("Node %s becoming leader for term %d", rn.id, rn.currentTerm)
rn.state = Leader
// 初始化nextIndex和matchIndex
for _, peer := range rn.peers {
rn.nextIndex[peer] = len(rn.log)
rn.matchIndex[peer] = 0
}
// 开始发送心跳
go rn.sendHeartbeats()
}
func (rn *RaftNode) sendHeartbeats() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for rn.state == Leader {
<-ticker.C
rn.sendAppendEntriesToAll()
}
}
func (rn *RaftNode) resetElectionTimer() {
// 随机化选举超时,避免分裂投票
timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
rn.electionTimeout = timeout
rn.lastHeartbeat = time.Now()
// 重置定时器
if rn.electionTimer != nil {
rn.electionTimer.Stop()
}
rn.electionTimer = time.AfterFunc(timeout, func() {
if rn.state != Leader {
rn.StartElection()
}
})
}
日志复制
// raft/replication.go
package raft
type LogEntry struct {
Term int
Command interface{}
}
type AppendEntriesArgs struct {
Term int
LeaderID string
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
}
func (rn *RaftNode) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) error {
rn.mu.Lock()
defer rn.mu.Unlock()
reply.Term = rn.currentTerm
reply.Success = false
// 检查任期
if args.Term < rn.currentTerm {
return nil
}
if args.Term > rn.currentTerm {
rn.convertToFollower(args.Term)
}
// 重置选举超时
rn.resetElectionTimer()
// 检查日志一致性
if args.PrevLogIndex > 0 {
if args.PrevLogIndex >= len(rn.log) {
return nil // 日志不够长
}
if rn.log[args.PrevLogIndex].Term != args.PrevLogTerm {
// 日志不匹配,删除冲突的条目
rn.log = rn.log[:args.PrevLogIndex]
return nil
}
}
// 追加新条目
for i, entry := range args.Entries {
index := args.PrevLogIndex + 1 + i
if index < len(rn.log) {
if rn.log[index].Term != entry.Term {
// 删除冲突的条目及其后的所有条目
rn.log = rn.log[:index]
rn.log = append(rn.log, entry)
}
} else {
rn.log = append(rn.log, entry)
}
}
// 更新commit index
if args.LeaderCommit > rn.commitIndex {
rn.commitIndex = min(args.LeaderCommit, len(rn.log)-1)
rn.applyCommittedEntries()
}
reply.Success = true
return nil
}
func (rn *RaftNode) sendAppendEntries(peer string) {
rn.mu.Lock()
nextIndex := rn.nextIndex[peer]
args := AppendEntriesArgs{
Term: rn.currentTerm,
LeaderID: rn.id,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rn.log[nextIndex-1].Term,
Entries: rn.log[nextIndex:],
LeaderCommit: rn.commitIndex,
}
rn.mu.Unlock()
var reply AppendEntriesReply
err := rn.rpcClient.Call(peer, "Raft.AppendEntries", &args, &reply)
if err != nil {
log.Printf("Failed to send AppendEntries to %s: %v", peer, err)
return
}
rn.mu.Lock()
defer rn.mu.Unlock()
if reply.Term > rn.currentTerm {
rn.convertToFollower(reply.Term)
return
}
if reply.Success {
// 更新nextIndex和matchIndex
rn.nextIndex[peer] = nextIndex + len(args.Entries)
rn.matchIndex[peer] = rn.nextIndex[peer] - 1
// 检查是否可以提交更多日志
rn.updateCommitIndex()
} else {
// 回退nextIndex
rn.nextIndex[peer]--
}
}
func (rn *RaftNode) updateCommitIndex() {
// 查找大多数节点都已复制的日志索引
for n := len(rn.log) - 1; n > rn.commitIndex; n-- {
if rn.log[n].Term == rn.currentTerm {
count := 1 // Leader自己
for _, peer := range rn.peers {
if rn.matchIndex[peer] >= n {
count++
}
}
if count > len(rn.peers)/2 {
rn.commitIndex = n
rn.applyCommittedEntries()
break
}
}
}
}
func (rn *RaftNode) applyCommittedEntries() {
for rn.lastApplied < rn.commitIndex {
rn.lastApplied++
entry := rn.log[rn.lastApplied]
// 应用命令到状态机
rn.stateMachine.Apply(entry.Command)
log.Printf("Applied command at index %d", rn.lastApplied)
}
}
客户端交互
// raft/client.go
package raft
type ClientRequest struct {
Command interface{}
}
type ClientResponse struct {
Success bool
LeaderID string
Error string
}
func (rn *RaftNode) HandleClientRequest(req *ClientRequest, resp *ClientResponse) error {
rn.mu.Lock()
// 只有Leader可以处理客户端请求
if rn.state != Leader {
resp.Success = false
resp.LeaderID = rn.leaderID
resp.Error = "Not the leader"
rn.mu.Unlock()
return nil
}
// 追加日志条目
entry := LogEntry{
Term: rn.currentTerm,
Command: req.Command,
}
rn.log = append(rn.log, entry)
index := len(rn.log) - 1
rn.mu.Unlock()
// 发送给所有follower
rn.sendAppendEntriesToAll()
// 等待日志被提交
for {
rn.mu.Lock()
if rn.commitIndex >= index {
rn.mu.Unlock()
resp.Success = true
return nil
}
if rn.state != Leader {
rn.mu.Unlock()
resp.Success = false
resp.Error = "No longer the leader"
return nil
}
rn.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
}
Paxos算法概述
Basic Paxos
Paxos中的角色:
Proposer(提议者):提出提案
Acceptor(接受者):接受或拒绝提案
Learner(学习者):学习被选中的值
两阶段协议:
Phase 1: Prepare
┌──────────┐
│ Proposer │
└────┬─────┘
│ 1a. Prepare(n)
│ n = 提案编号
▼
┌──────────┐
│ Acceptor │ ──▶ 如果 n > 已见过的最大编号
└──────────┘ 承诺不接受更小编号的提案
返回已接受的最大编号提案(如果有)
Phase 2: Accept
┌──────────┐
│ Proposer │
└────┬─────┘
│ 2a. Accept(n, v)
│ v = 已接受提案中编号最大的值
│ 或自己提议的值
▼
┌──────────┐
│ Acceptor │ ──▶ 如果没有承诺接受更大编号
└──────────┘ 接受该提案
Multi-Paxos优化
Multi-Paxos优化:
1. Leader选举
- 选出一个稳定的Leader
- 只有Leader可以提议
- 减少Prepare阶段
2. 管道化
- 连续发送多个Accept请求
- 提高吞吐量
3. 快速提交
- 如果大多数Acceptor在Prepare阶段返回相同值
- 可以直接认为该值被选中
实际系统中的应用:
- Google Chubby
- Apache ZooKeeper(ZAB协议,类似Multi-Paxos)
etcd实战应用
分布式锁
// etcd/lock.go
package etcd
import (
"context"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
type DistributedLock struct {
client *clientv3.Client
session *concurrency.Session
mutex *concurrency.Mutex
}
func NewDistributedLock(client *clientv3.Client, lockKey string) (*DistributedLock, error) {
session, err := concurrency.NewSession(client, concurrency.WithTTL(10))
if err != nil {
return nil, err
}
mutex := concurrency.NewMutex(session, lockKey)
return &DistributedLock{
client: client,
session: session,
mutex: mutex,
}, nil
}
func (dl *DistributedLock) Lock(ctx context.Context) error {
return dl.mutex.Lock(ctx)
}
func (dl *DistributedLock) Unlock(ctx context.Context) error {
err := dl.mutex.Unlock(ctx)
dl.session.Close()
return err
}
// 使用示例
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
lock, err := NewDistributedLock(client, "/locks/order-processing")
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
// 获取锁
if err := lock.Lock(ctx); err != nil {
log.Fatal(err)
}
defer lock.Unlock(ctx)
// 执行临界区代码
processOrders()
}
Leader选举
// etcd/election.go
package etcd
import (
"context"
"log"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
type LeaderElection struct {
client *clientv3.Client
session *concurrency.Session
election *concurrency.Election
nodeID string
isLeader bool
}
func NewLeaderElection(client *clientv3.Client, nodeID, electionKey string) (*LeaderElection, error) {
session, err := concurrency.NewSession(client)
if err != nil {
return nil, err
}
election := concurrency.NewElection(session, electionKey)
return &LeaderElection{
client: client,
session: session,
election: election,
nodeID: nodeID,
}, nil
}
func (le *LeaderElection) Campaign(ctx context.Context) error {
log.Printf("Node %s campaigning for leadership", le.nodeID)
err := le.election.Campaign(ctx, le.nodeID)
if err != nil {
return err
}
le.isLeader = true
log.Printf("Node %s became the leader", le.nodeID)
return nil
}
func (le *LeaderElection) Resign(ctx context.Context) error {
le.isLeader = false
return le.election.Resign(ctx)
}
func (le *LeaderElection) Observe(ctx context.Context) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for resp := range le.election.Observe(ctx) {
if len(resp.Kvs) > 0 {
leaderID := string(resp.Kvs[0].Value)
ch <- leaderID
if leaderID == le.nodeID {
le.isLeader = true
} else {
le.isLeader = false
}
}
}
}()
return ch
}
func (le *LeaderElection) IsLeader() bool {
return le.isLeader
}
// 使用示例
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
nodeID := "node-1"
election, err := NewLeaderElection(client, nodeID, "/elections/scheduler")
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
// 参与选举
if err := election.Campaign(ctx); err != nil {
log.Fatal(err)
}
defer election.Resign(ctx)
// 作为Leader执行任务
for election.IsLeader() {
performLeaderDuties()
time.Sleep(1 * time.Second)
}
}
总结
分布式共识算法是构建可靠分布式系统的基石:
Raft:易于理解和实现,适合大多数场景
- 强Leader模型简化了设计
- 日志复制机制清晰直观
- etcd、Consul等系统采用
Paxos:理论基础扎实,性能更优
- Multi-Paxos优化了性能
- ZooKeeper的ZAB协议基于此
- 实现复杂度较高
实际应用:
- 配置管理(etcd、Consul)
- 分布式锁
- Leader选举
- 服务发现
关键原则:
- 理解CAP定理的约束
- 选择合适的共识算法
- 正确处理网络分区和节点故障
- 监控共识性能和延迟
延伸阅读
继续阅读
探索更多技术文章
浏览归档,发现更多关于系统设计、工具链和工程实践的内容。