forked from fuyao-w/papillon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
raft.go
331 lines (307 loc) · 9.34 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
package papillon
import (
"container/list"
"fmt"
. "github.com/fuyao-w/common-util"
"golang.org/x/sync/errgroup"
"sync"
"sync/atomic"
"time"
)
type (
Raft struct {
commitIndex *atomic.Uint64 // 集群已提交的日志,初始化为 0 只有再提交过日志后才可以更新
lastApply *atomic.Uint64 // 已提交给状态机的最新 index , 注意:不代表状态机已经应用
currentTerm *atomic.Uint64 // 当前任期,需要持久化
configuration *AtomicVal[ClusterInfo] // 集群配置的副本
conf *AtomicVal[*Config] // 参数配置信息
confReloadMu sync.Mutex
state State // 节点状态
run func() // 主线程函数
lastContact *AtomicVal[time.Time] // 与领导者上次联系的时间
localInfo ServerInfo // 当前节点地址
lastEntry *LockItem[lastEntry] // 节点最新的索引、任期
cluster cluster // 集群配置
leaderInfo *LockItem[ServerInfo] // 领导人地址
funcEg *errgroup.Group
shutDown shutDown
logger Logger // 日志
//-------主线程-----------
rpcCh <-chan *RPC // 处理 RPC 命令
commitCh chan struct{} // 日志已提交通知,可以应用到 FSM 可以有 buffer
leaderState leaderState // 领导人上下文
heartbeatTimeout <-chan time.Time // 由主线程设置
electionTimeout <-chan time.Time // 由主线程设置
candidateFromLeaderTransfer bool // 当前节点在领导权转移过程中
//-------fsm-----------
fsm FSM // 状态机,日志提交后由此应用
fsmApplyCh chan []*LogFuture // 状态机线程的日志提交通知
fsmSnapshotCh chan *fsmSnapshotFuture // 从状态机取快照
fsmRestoreCh chan *restoreFuture // 通知状态机重新应用快照
readOnly readOnly // 跟踪只读查询的请求
//-----API--------------------
apiSnapshotBuildCh chan *apiSnapshotFuture // 生成快照
apiSnapshotRestoreCh chan *userRestoreFuture // 重新应用快照的时候不能接收新的日志,需要从 runState 线程触发
apiLogApplyCh chan *LogFuture // 日志提交请求,由于需要支持批量提交,所以单独提出来
commandCh chan *command // 对节点发起的命令,包括领导权验证等,不能有 buffer
stateChangeCh chan *StateChange // 状态切换时的通知
//-----组件------
rpc RpcInterface // RPC 组件
kvStore KVStorage // 任期、投票信息持久化组件
logStore LogStore // 日志持久化组件
snapshotStore SnapshotStore // 快照组件
}
lastEntry struct {
snapshot lastLog // 快照中存的最新日志
log lastLog // LogStore 中的最新日志
}
lastLog struct {
index uint64
term uint64
}
// leaderState 领导人上下文
leaderState struct {
commitIndex *atomic.Uint64 // 通过计算副本得出的已提交索引,只能由新日志提交触发更新
startIndex uint64 // 记录任期开始时的最新一条索引,防止在日志提交的时候发生 commit index 回退
leadershipTransfer *atomic.Bool // 是否发生领导权转移 1 :是 ,0 :否
matchIndex map[ServerID]uint64 // 每个跟随者对应的已复制的 index
matchList []uint64 // matchIndex 的 value 列表,用于减少内存分配
replicate map[ServerID]*replication // 所有的跟随者
inflight *list.List // 等待提交并应用到状态机的 LogFuture
stepDown chan ServerID // 领导人下台通知
matchLock sync.Mutex
}
StateChange struct {
Before, After State
}
// readOnly 跟踪只读请求,确保状态机已经应用完制定索引,用于实现线性一致性读
readOnly struct {
notifySet map[*readOnlyFuture]struct{} // 待响应的只读请求,只由状态机线程处理
request chan *readOnlyFuture // 只读请求
}
)
const (
Voter Suffrage = iota
NonVoter
)
const (
unknown = "Unknown"
voterStr = "Voter"
nonVoterStr = "NonVoter"
)
type (
ServerAddr string
ServerID string
Suffrage int // 是否有选举权,枚举: Voter NonVoter
// ServerInfo 节点的地址信息
ServerInfo struct {
Suffrage Suffrage
Addr ServerAddr
ID ServerID
}
)
// observe 将请求添加进 pending 集合
func (r *readOnly) observe(future *readOnlyFuture) {
r.notifySet[future] = struct{}{}
}
// notify 回调集合中达到索引位置的请求
func (r *readOnly) notify(index uint64) {
for future := range r.notifySet {
if future.readIndex <= index {
future.responded(index, nil)
delete(r.notifySet, future)
}
}
// 释放内存
if len(r.notifySet) == 0 {
r.notifySet = map[*readOnlyFuture]struct{}{}
}
}
func (s *Suffrage) MarshalText() (text []byte, err error) {
if suffrage := s.String(); suffrage == unknown {
return nil, fmt.Errorf("unknown suffrage :%d", *s)
} else {
return Str2Bytes(suffrage), nil
}
}
func (s *Suffrage) UnmarshalText(text []byte) error {
switch Bytes2Str(text) {
case voterStr:
*s = Voter
case nonVoterStr:
*s = NonVoter
default:
return fmt.Errorf("unknown suffrage :%d", *s)
}
return nil
}
func (s *Suffrage) String() string {
switch *s {
case Voter:
return voterStr
case NonVoter:
return nonVoterStr
default:
return unknown
}
}
func (r *Raft) Conf() *Config {
return r.conf.Load()
}
func (r *Raft) setLatestConfiguration(index uint64, configuration ClusterInfo) {
r.cluster.setLatest(index, configuration)
r.configuration.Store(configuration)
}
func (r *Raft) setCommitConfiguration(index uint64, configuration ClusterInfo) {
r.cluster.setCommit(index, configuration)
}
func (r *Raft) getLatestIndex() uint64 {
entry := r.lastEntry.Get()
return Max(entry.log.index, entry.snapshot.index)
}
func (r *Raft) getLatestTerm() uint64 {
entry := r.lastEntry.Get()
return Max(entry.log.term, entry.snapshot.term)
}
func (r *Raft) getLatestEntry() (term uint64, index uint64) {
entry := r.lastEntry.Get()
return Max(entry.log.term, entry.snapshot.term), Max(entry.log.index, entry.snapshot.index)
}
func (r *Raft) clearLeaderInfo() {
r.updateLeaderInfo(func(s *ServerInfo) {
*s = ServerInfo{}
})
}
func (r *Raft) updateLeaderInfo(act func(s *ServerInfo)) {
r.leaderInfo.Action(act)
}
func (r *Raft) goFunc(funcList ...func()) {
for _, f := range funcList {
f := f
r.funcEg.Go(func() error {
f()
return nil
})
}
}
func (r *Raft) waitShutDown() {
}
func (r *Raft) getCurrentTerm() uint64 {
return r.currentTerm.Load()
}
func (r *Raft) getLastApply() uint64 {
return r.lastApply.Load()
}
func (r *Raft) setLastApply(index uint64) {
r.lastApply.Store(index)
}
func (l *leaderState) setupLeadershipTransfer(status bool) (success bool) {
old := true
if status {
old = false
}
return l.leadershipTransfer.CompareAndSwap(old, status)
}
func (l *leaderState) getLeadershipTransfer() (status bool) {
return l.leadershipTransfer.Load()
}
func (s *ServerInfo) isVoter() bool {
return s.Suffrage == Voter
}
func (r *Raft) buildRPCHeader() *RPCHeader {
header := &RPCHeader{
ID: r.localInfo.ID,
Addr: r.localInfo.Addr,
}
return header
}
func (r *Raft) setLastContact() {
r.lastContact.Store(time.Now())
}
func (r *Raft) getLastContact() time.Time {
return r.lastContact.Load()
}
func (r *Raft) getCommitIndex() uint64 {
return r.commitIndex.Load()
}
func (r *Raft) setCommitIndex(commitIndex uint64) {
r.commitIndex.Store(commitIndex)
}
// runState 运行主线程
func (r *Raft) runState() {
for {
select {
case <-r.shutDown.C:
return
default:
}
r.run()
}
}
func (r *Raft) setState(state State) {
before := r.GetState()
r._setState(state)
overrideNotify(r.stateChangeCh, &StateChange{
Before: before,
After: state,
})
}
func (r *Raft) setFollower() {
r.setState(Follower)
r.run = r.cycleFollower
}
func (r *Raft) setCandidate() {
r.setState(Candidate)
r.run = r.cycleCandidate
}
func (r *Raft) setLeader() {
r.setState(Leader)
r.leaderInfo.Action(func(t *ServerInfo) {
t.ID = r.localInfo.ID
t.Addr = r.localInfo.Addr
})
r.run = r.cycleLeader
}
// setShutDown 只能由 Raft.ShutDown 调用
func (r *Raft) setShutDown() {
r.setState(ShutDown)
r.run = nil
r.onShutDown()
}
func (r *Raft) setLatestLog(term, index uint64) {
r.lastEntry.Action(func(t *lastEntry) {
t.log = lastLog{index: index, term: term}
})
}
func (r *Raft) getLatestLog() (term, index uint64) {
entry := r.lastEntry.Get()
return entry.log.term, entry.log.index
}
func (r *Raft) getLatestSnapshot() (term, index uint64) {
entry := r.lastEntry.Get()
return entry.snapshot.term, entry.snapshot.index
}
func (r *Raft) setLatestSnapshot(term, index uint64) {
r.lastEntry.Action(func(t *lastEntry) {
t.snapshot = lastLog{index: index, term: term}
})
}
func (r *Raft) getLatestCluster() []ServerInfo {
return r.cluster.latest.Servers
}
func (r *Raft) inConfiguration(id ServerID) bool {
for _, server := range r.cluster.latest.Servers {
if server.ID == id {
return true
}
}
return false
}
func (r *Raft) canVote(id ServerID) bool {
for _, serverInfo := range r.getLatestCluster() {
if serverInfo.ID == id {
return serverInfo.isVoter()
}
}
return false
}