// Server API for TinyKv service
typeTinyKvServerinterface{// KV commands with mvcc/txn supported.
KvGet(context.Context,*kvrpcpb.GetRequest)(*kvrpcpb.GetResponse,error)KvScan(context.Context,*kvrpcpb.ScanRequest)(*kvrpcpb.ScanResponse,error)KvPrewrite(context.Context,*kvrpcpb.PrewriteRequest)(*kvrpcpb.PrewriteResponse,error)KvCommit(context.Context,*kvrpcpb.CommitRequest)(*kvrpcpb.CommitResponse,error)KvCheckTxnStatus(context.Context,*kvrpcpb.CheckTxnStatusRequest)(*kvrpcpb.CheckTxnStatusResponse,error)KvBatchRollback(context.Context,*kvrpcpb.BatchRollbackRequest)(*kvrpcpb.BatchRollbackResponse,error)KvResolveLock(context.Context,*kvrpcpb.ResolveLockRequest)(*kvrpcpb.ResolveLockResponse,error)// RawKV commands.
RawGet(context.Context,*kvrpcpb.RawGetRequest)(*kvrpcpb.RawGetResponse,error)RawPut(context.Context,*kvrpcpb.RawPutRequest)(*kvrpcpb.RawPutResponse,error)RawDelete(context.Context,*kvrpcpb.RawDeleteRequest)(*kvrpcpb.RawDeleteResponse,error)RawScan(context.Context,*kvrpcpb.RawScanRequest)(*kvrpcpb.RawScanResponse,error)// Raft commands (tinykv <-> tinykv).
Raft(TinyKv_RaftServer)errorSnapshot(TinyKv_SnapshotServer)error// Coprocessor
Coprocessor(context.Context,*coprocessor.Request)(*coprocessor.Response,error)}
server实现:kv/server/server.go
1
2
3
4
5
6
7
8
9
typeServerstruct{storagestorage.Storage// (Used in 4B)
Latches*latches.Latches// 锁集合,实现多版本并发控制
// coprocessor API handler, out of course scope
copHandler*coprocessor.CopHandler}
typeraftWorkerstruct{pr*router// receiver of messages should sent to raft, including:
// * raft command from `raftStorage`
// * raft inner messages from other peers sent by network
raftChchanmessage.Msgctx*GlobalContextcloseCh<-chanstruct{}}funcnewRaftWorker(ctx*GlobalContext,pm*router)*raftWorker{return&raftWorker{raftCh:pm.peerSender,ctx:ctx,pr:pm,}}
func(d*peerMsgHandler)HandleMsg(msgmessage.Msg){switchmsg.Type{casemessage.MsgTypeRaftMessage:raftMsg:=msg.Data.(*rspb.RaftMessage)iferr:=d.onRaftMsg(raftMsg);err!=nil{log.Errorf("%s handle raft message error %v",d.Tag,err)}casemessage.MsgTypeRaftCmd:raftCMD:=msg.Data.(*message.MsgRaftCmd)d.proposeRaftCommand(raftCMD.Request,raftCMD.Callback)casemessage.MsgTypeTick:d.onTick()casemessage.MsgTypeSplitRegion:split:=msg.Data.(*message.MsgSplitRegion)log.Infof("%s on split with %v",d.Tag,split.SplitKey)d.onPrepareSplitRegion(split.RegionEpoch,split.SplitKey,split.Callback)casemessage.MsgTypeRegionApproximateSize:d.onApproximateRegionSize(msg.Data.(uint64))casemessage.MsgTypeGcSnap:gcSnap:=msg.Data.(*message.MsgGCSnap)d.onGCSnap(gcSnap.Snaps)casemessage.MsgTypeStart:d.startTicker()}}
1
2
3
4
5
6
7
8
func(d*peerMsgHandler)proposeRaftCommand(msg*raft_cmdpb.RaftCmdRequest,cb*message.Callback){err:=d.preProposeRaftCommand(msg)iferr!=nil{cb.Done(ErrResp(err))return}// Your Code Here (2B).
}
// Append the given entries to the raft log and update ps.raftState also delete log entries that will
// never be committed
func(ps*PeerStorage)Append(entries[]eraftpb.Entry,raftWB*engine_util.WriteBatch)error{// Your Code Here (2B).
returnnil}
func(ps*PeerStorage)SaveReadyState(ready*raft.Ready)(*ApplySnapResult,error){// Hint: you may call `Append()` and `ApplySnapshot()` in this function
// Your Code Here (2B/2C).
// 1. append
wb:=&engine_util.WriteBatch{}ps.Append(ready.Entries,wb)// 2. update hardstate and save raftState
if!raft.IsEmptyHardState(ready.HardState){ps.raftState.HardState=&ready.HardState}wb.SetMeta(meta.RaftStateKey(ps.region.Id),ps.raftState)// 3. write to DB
ps.Engines.WriteRaft(wb)returnnil,nil}
func(d*peerMsgHandler)proposeRaftCommand(msg*raft_cmdpb.RaftCmdRequest,cb*message.Callback){err:=d.preProposeRaftCommand(msg)iferr!=nil{cb.Done(ErrResp(err))return}// Your Code Here (2B).
return}
// Record the callback of the proposals
// (Used in 2B)
proposals[]*proposaltypeproposalstruct{// index + term for unique identification
indexuint64termuint64cb*message.Callback}
// if return true, d.proposals[0] is the target proposal
func(d*peerMsgHandler)clearStaleAndGetTargetProposal(entry*eraftpb.Entry)bool{d.clearStaleProposals(entry)iflen(d.proposals)>0&&d.proposals[0].index==entry.Index{p:=d.proposals[0]ifp.term!=entry.Term{NotifyStaleReq(entry.Term,p.cb)d.proposals=d.proposals[1:]returnfalse}else{returntrue}}else{returnfalse}}func(d*peerMsgHandler)clearStaleProposals(entry*eraftpb.Entry){variintfori=0;i<len(d.proposals)&&d.proposals[i].index<entry.Index;i++{d.proposals[i].cb.Done(ErrResp(&util.ErrStaleCommand{}))}d.proposals=d.proposals[i:]}