project2c目的是实现RaftLog GC和Snapshot支持。在Raft和raftstore中均需要修改和新增代码。
问题分析
raft 一致性算法并没有考虑log无限增长的情况,若不做任何处理,随着系统的长时间运行,Raft节点中的RaftLog会占用大量内存;所以要引进applied index,把applied之前的条目定期压缩(compact)起来然后落盘,最后在内存删除它们,只需要记录最后applied的Index、Term
,以及一些状态。这就是RaftLog GC。
基于RaftLog GC,我们需要实现Snapshot支持来保障Raft算法的正常运行。这主要是在日志复制的过程中,leader需要给follower发送[next, LastIndex]
的条目以及next-1
的index和Term,很可能next-1
(最小索引)的条目已经被compact掉了,此时没法完成日志复制所需的匹配动作,因此leader需要发送一个Snapshot来帮助follower赶上进度。事实上,Project3中实现增加节点(Add Peer
)时,也是通过Snapshot来初始化新节点。
Snapshot的生成
tinykv已经提供了Raft节点获取Snapshot的接口r.RaftLog.storage.Snapshot()
。可以发现是一个异步的实现,将一个任务丢给了ps.regionSched
,这是消费者端。
之所以采用异步实现,是因为SnapShot通常比较大,所以一般Leader第一次调用r.RaftLog.storage.Snapshot()
可能拿不到结果,不过worker已经开始生成了,等后面再调用时就能够直接返回。
1
2
3
4
5
| // schedule snapshot generate task
ps.regionSched <- &runner.RegionTaskGen{
RegionId: ps.region.GetId(),
Notifier: ch,
}
|
生产者端是在kv/raftstore/runner/region_task.go(根据RegionTaskGen
就可以找到)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func (r *regionTaskHandler) Handle(t worker.Task) {
switch t.(type) {
case *RegionTaskGen:
task := t.(*RegionTaskGen)
// It is safe for now to handle generating and applying snapshot concurrently,
// but it may not when merge is implemented.
r.ctx.handleGen(task.RegionId, task.Notifier)
case *RegionTaskApply:
task := t.(*RegionTaskApply)
r.ctx.handleApply(task.RegionId, task.Notifier, task.StartKey, task.EndKey, task.SnapMeta)
case *RegionTaskDestroy:
task := t.(*RegionTaskDestroy)
r.ctx.cleanUpRange(task.RegionId, task.StartKey, task.EndKey)
}
}
|
最后会走到doSnapshot
当中,关键的部分如下。这样就生成了一个SnapShot
1
2
3
4
5
6
7
| snapshot := &eraftpb.Snapshot{
Metadata: &eraftpb.SnapshotMetadata{
Index: key.Index,
Term: key.Term,
ConfState: &confState,
},
}
|
raft leader 发现某个节点落后较多(该节点的Next-1位置的entry已经被leader compact了),则给他发送一个Snapshot
Snapshot的Apply
SnapShot的Apply也应该采用如上的方法,以异步的方式,创意一个RegionTaskApply
结构体丢给ps.regionSched
。
在Raft中实现
对于Snapshot,在raft模块我们主要需要实现“leader获取并发送SnapShot”,“Follower处理Snapshot消息”,“log”,“ready”四块功能。
pb.Snapshot
类定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| type Snapshot struct {
Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
Metadata *SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type SnapshotMetadata struct {
ConfState *ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state,omitempty"`
Index uint64 `protobuf:"varint,2,opt,name=index,proto3" json:"index,omitempty"`
Term uint64 `protobuf:"varint,3,opt,name=term,proto3" json:"term,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type ConfState struct {
// all node id
Nodes []uint64 `protobuf:"varint,1,rep,packed,name=nodes" json:"nodes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
|
其中Metadata
是我们需要注意的关键数据。
leader获取并发送SnapShot
在“问题分析”中提到raft leader 发现某个节点落后较多(该节点的Next-1位置的entry已经被leader compact了),则给他发送一个Snapshot。
Compact实际做的事情就是日志截断。这里“被leader compact”的含义是由于截断leader中已经找不到Next-1(即论文中的preLogIndex
)位置的entry了,所以没办法通过单纯的sendAppend
进行日志同步(没办法获取preLogTerm
)。
所以需要做的修改是要在**sendAppend**
加入检查并发送Snapshot的逻辑。
那么首先的一个问题就是如何检查next-1
位置的entry已经被compact了?
已知preLogIndex=next-1
,通过r.RaftLog.Term(preLogIndex)
来获取preLogTerm
。
1
2
3
4
5
6
7
8
9
10
11
12
13
| func (l *RaftLog) Term(i uint64) (uint64, error) {
// Your Code Here (2A).
if len(l.entries) > 0 && i >= l.dummyIndex {
if i > l.LastIndex() {
return 0, ErrUnavailable
}
return l.entries[i-l.dummyIndex].Term, nil
} else {
term, err := l.storage.Term(i)
return term, err
}
}
|
如果发生compact,那么显然上述调用会走到else分支,再来看一下l.storage.Term
的实现kv/raftstore/peer_storage.go。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| func (ps *PeerStorage) Term(idx uint64) (uint64, error) {
if idx == ps.truncatedIndex() {
return ps.truncatedTerm(), nil
}
if err := ps.checkRange(idx, idx+1); err != nil {
return 0, err
}
if ps.truncatedTerm() == ps.raftState.LastTerm || idx == ps.raftState.LastIndex {
return ps.raftState.LastTerm, nil
}
var entry eraftpb.Entry
if err := engine_util.GetMeta(ps.Engines.Raft, meta.RaftLogKey(ps.region.Id, idx), &entry); err != nil {
return 0, err
}
return entry.Term, nil
}
func (ps *PeerStorage) checkRange(low, high uint64) error {
if low > high {
return errors.Errorf("low %d is greater than high %d", low, high)
} else if low <= ps.truncatedIndex() {
return raft.ErrCompacted
} else if high > ps.raftState.LastIndex+1 {
return errors.Errorf("entries' high %d is out of bound, lastIndex %d",
high, ps.raftState.LastIndex)
}
return nil
}
|
仔细阅读上面的代码,在checkRange
函数中找到了一个错误ErrCompacted
,它的条件是low <= ps.truncatedIndex()
,即小于被截断的最高Index,符合我们上面描述的Compact的操作。因此可以确定只要对这个错误进行检查即可。
1
2
3
4
5
6
7
8
| preLogTerm, err := r.RaftLog.Term(preLogIndex)
if err != nil {
if err == ErrCompacted {
r.sendSnapshot(to)
return false
}
return false
}
|
接下来考虑实现sendSnapshot
这一函数。
首先是Snapshot的获取,根据在“Snapshot生成”中的描述,通过r.RaftLog.storage.Snapshot()
可以异步生成一个SnapShot,由于SnapShot通常比较大,因此第一次调用可能会返回错误,即SnapShot还没有准备好,因此要对这种情况做判断,对应的错误是ErrSnapshotTemporailyUnavailable
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| func (r *Raft) sendSnapshot(to uint64) {
// Your Code Here (2C).
snap, err := r.RaftLog.storage.Snapshot()
// because snapshot is handled asynchronously, so we should check if snapshot is valid
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
return
}
panic(err)
}
r.msgs = append(r.msgs, pb.Message{
MsgType: pb.MessageType_MsgSnapshot,
From: r.id,
To: to,
Term: r.Term,
Snapshot: &snap,
})
// avoid snapshot is sent too frequently
r.Prs[to].Next = snap.Metadata.Index + 1
}
|
注意,在发送Snapshot成功之后,可以直接在leader更新目标节点的Next,避免需要频繁发送snapshot造成较高的带宽占用。
在raftstore中实现
processAdminRequest
根据文档和之前的分析,需要在process
的逻辑中增加对AdminCmdType_CompactLog
这一消息的处理,不同于普通的Request
,它的类型是AdminRequest
,要分开处理。
1
2
3
4
5
6
7
8
9
10
| func (d *peerMsgHandler) processAdminRequest(entry *eraftpb.Entry, cmd *raft_cmdpb.RaftCmdRequest)
func (d *peerMsgHandler) process(entry *eraftpb.Entry) {
cmd := &raft_cmdpb.RaftCmdRequest{}
cmd.Unmarshal(entry.Data)
if cmd.AdminRequest != nil {
d.processAdminRequest(entry, cmd)
}
...
}
|
对该消息的处理流程,文档中说明的比较详细,即先更新applyState.TruncatedState
的状态,然后通过接口d.ScheduleCompactLog
为 raftlog-gc worker 安排一个任务。Raftlog-gc worker 会异步完成实际的日志删除工作。
ApplySnapshot
appluSnapshot
即peer_storage
对于Ready()
获得的snapshot
进行实际应用,要做的事情基本上能根据前面的分析推出来:删除过时的数据(所有的数据)、更新各种状态、发送任务给region_worker进行实际应用。
删除过时数据,根据注释可知是ClearMeta
和ps.clearExtraData
,所需要的参数也十分简单。
1
2
| ps.clearMeta(kvWB, raftWB)
ps.clearExtraData(snapData.Region)
|
更新peer_storage的内存状态,RaftLocalState
、RaftApplyState
和 RegionLocalState
。
简要分析一下需要更新哪些状态,首先snapshot.MetaData
只有Index、Term、ConfState
三个字段。对于RaftLocalState
,很显然只要更新LastIndex
、LastTerm
(HardState
的更新在ApplySnapshot
的上层SaveReadyState
当中)。对于RaftApplyState
,AppliedIndex
肯定要更新到meta.Index
,TruncatedState
代表截断状态,同样有Index、Term
两个字段,很显然也要更新。文档中还指明“您还需要更新PeerStorage.snapState
到snap.SnapState_Applying
”。
1
2
3
4
5
6
7
8
9
10
| type RaftApplyState struct {
// Record the applied index of the state machine to make sure
// not apply any index twice after restart.
AppliedIndex uint64 `protobuf:"varint,1,opt,name=applied_index,json=appliedIndex,proto3" json:"applied_index,omitempty"`
// Record the index and term of the last raft log that have been truncated. (Used in 2C)
TruncatedState *RaftTruncatedState `protobuf:"bytes,2,opt,name=truncated_state,json=truncatedState" json:"truncated_state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
|
但是文档中提到的RegionLocalState
怎么更新?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| type PeerStorage struct {
// current region information of the peer
region *metapb.Region
// current raft state of the peer
raftState *rspb.RaftLocalState
// current apply state of the peer
applyState *rspb.RaftApplyState
// current snapshot state
snapState snap.SnapState
// regionSched used to schedule task to region worker
regionSched chan<- worker.Task
// generate snapshot tried count
snapTriedCnt int
// Engine include two badger instance: Raft and Kv
Engines *engine_util.Engines
// Tag used for logging
Tag string
}
|
根据PeerStorage
定义,推断metapb.Region
即RegionLocalState
。在Snapshot.MetaData
中似乎没有找到与之相关的内容。
1
2
3
4
5
6
7
8
9
10
11
| type Region struct {
Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
// Region key range [start_key, end_key).
StartKey []byte `protobuf:"bytes,2,opt,name=start_key,json=startKey,proto3" json:"start_key,omitempty"`
EndKey []byte `protobuf:"bytes,3,opt,name=end_key,json=endKey,proto3" json:"end_key,omitempty"`
RegionEpoch *RegionEpoch `protobuf:"bytes,4,opt,name=region_epoch,json=regionEpoch" json:"region_epoch,omitempty"`
Peers []*Peer `protobuf:"bytes,5,rep,name=peers" json:"peers,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
|
但是在已经写好的代码当中,有一个RaftSnapshotData
类型的snapData
是将Snapshot
中的Data
解析后的数据,可以看到其中的region
数据,正是想要的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| type RaftSnapshotData struct {
Region *metapb.Region `protobuf:"bytes,1,opt,name=region" json:"region,omitempty"`
FileSize uint64 `protobuf:"varint,2,opt,name=file_size,json=fileSize,proto3" json:"file_size,omitempty"`
Data []*KeyValue `protobuf:"bytes,3,rep,name=data" json:"data,omitempty"`
Meta *SnapshotMeta `protobuf:"bytes,5,opt,name=meta" json:"meta,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
// Apply the peer with given snapshot
func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
log.Infof("%v begin to apply snapshot", ps.Tag)
snapData := new(rspb.RaftSnapshotData)
if err := snapData.Unmarshal(snapshot.Data); err != nil {
return nil, err
}
// Hint: things need to do here including: update peer storage state like raftState and applyState, etc,
// and send RegionTaskApply task to region worker through ps.regionSched, also remember call ps.clearMeta
// and ps.clearExtraData to delete stale data
// Your Code Here (2C).
return nil, nil
}
|
但是想用kvWB.SetMeta(meta.RegionStateKey(snapData.Region.Id), snapData.Region)
来持久化时却发现snapData.Region
参数不匹配,*rspb.RegionLocalState
。
后面在kv/raftstore/meta/values.go找到了WriteRegionState
函数,正是我们想要的。
1
2
3
4
5
6
| func WriteRegionState(kvWB *engine_util.WriteBatch, region *metapb.Region, state rspb.PeerState) {
regionState := new(rspb.RegionLocalState)
regionState.State = state
regionState.Region = region
kvWB.SetMeta(RegionStateKey(region.Id), regionState)
}
|
一个新的问题来了,之前不论是RaftLocalState
、RaftApplyState
还是SnapState
都是先在PeerStorage
内存中更新,再持久化,我们显然不能先对RegionState
持久化。
那么如何在内存中更新RegionState
?
在之前的分析当中以及文档当中都提到,需要“通过**PeerStorage.regionSched**
发送**runner.RegionTaskApply**
任务给到region_worker”,这里的RegionTask
是否可以理解为在内存中更新RegionState
呢?通过查看代码调用链runner.RegionTaskApply
->Handle
->r.ctx.handleApply
->applySnap
验证了这一想法。
简单参考func (ps *PeerStorage) Snapshot() (eraftpb.Snapshot, error)
中的代码实现(能够注意到它没有等待snapshot的生成,直接返回了一个ErrSnaoshotTemporarilyUnavailable
,对应我们在raft模块中的处理):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| func (ps *PeerStorage) Snapshot() (eraftpb.Snapshot, error) {
...
ch := make(chan *eraftpb.Snapshot, 1)
ps.snapState = snap.SnapState{
StateType: snap.SnapState_Generating,
Receiver: ch,
}
// schedule snapshot generate task
ps.regionSched <- &runner.RegionTaskGen{
RegionId: ps.region.GetId(),
Notifier: ch,
}
return snapshot, raft.ErrSnapshotTemporarilyUnavailable
}
|
但是此处不能直接返回,文档中指明了要“wait until region worker finishes
”。猜测是因为如果直接返回,raft节点可能在没有更新regionState的情况下发生错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| // send RegionTaskApply task to region worker
ch := make(chan bool, 1)
ps.regionSched <- &runner.RegionTaskApply{
RegionId: snapData.Region.Id,
Notifier: ch,
SnapMeta: snapshot.Metadata,
StartKey: snapData.Region.GetStartKey(),
EndKey: snapData.Region.GetEndKey(),
}
// according to document, need to wait until region worker finishes
<-ch
// regionState update
result := &ApplySnapResult{
PrevRegion: ps.region,
Region: snapData.Region,
}
meta.WriteRegionState(kvWB, snapData.Region, rspb.PeerState_Normal)
return result, nil
|