Raft算法之日志复制

news/2024/5/15 14:36:56/文章来源:https://blog.csdn.net/mucaoyx/article/details/131722193

Raft算法之日志复制

一、日志复制大致流程

在Leader选举过程中,集群最终会选举出一个Leader节点,而集群中剩余的其他节点将会成为Follower节点。Leader节点除了向Follower节点发送心跳消息,还会处理客户端的请求,并将客户端的更新操作以消息(Append Entries消息)的形式发送到集群中所有的Follower节点。当Follower节点记录收到的这些消息之后,会向Leader节点返回相应的响应消息。当Leader节点在收到半数以上的Follower节点的响应消息之后,会对客户端的请求进行应答。最后,Leader会提交客户端的更新操作,该过程会发送Append Entries消息到Follower节点,通知Follower节点该操作已经提交,同时Leader节点和Follower节点也就可以将该操作应用到自己的状态机中。

参考资料:https://blog.csdn.net/qq_43949280/article/details/122669244

二、ETCD中raft模块的日志复制

2.1 消息的发送

前文中提到Leader节点会处理客户端的更新操作,这就是阅读代码的入口。

ETCD代码中除了有raft模块,还有一个raftexample模块,是对raft模块的使用示例,该模块位置如下:
在这里插入图片描述

看完这个模块的文件,觉得处理数据存储的入口应该在kvstore.go文件中。在这个文件中有一个newKVStore(...)方法,如果要使用使用kvstore结构体的话,肯定会调用newKVStore(...)方法。

我们来看看这个方法的调用点:
在这里插入图片描述

可以确定调用点只在main.go文件中,如下所示:

// contrib/raftexample/main.go文件
func main() {cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")id := flag.Int("id", 1, "node ID")kvport := flag.Int("port", 9121, "key-value server port")join := flag.Bool("join", false, "join an existing cluster")flag.Parse()proposeC := make(chan string)defer close(proposeC)confChangeC := make(chan raftpb.ConfChange)defer close(confChangeC)// raft provides a commit stream for the proposals from the http apivar kvs *kvstore // 定义kvstoregetSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // ref-1 创建kvstore// the key-value http handler will propose updates to raftserveHttpKVAPI(kvs, *kvport, confChangeC, errorC) // ref-2 使用kvstore
}

我们接着看ref-2处使用kvstore的函数serveHttpKVAPI(...)的细节,如下所示:

// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {srv := http.Server{ // 创建http serverAddr: ":" + strconv.Itoa(port),Handler: &httpKVAPI{store:       kv, // 把前面提到的kvstore 赋值给httpKVAPI的成员字段storeconfChangeC: confChangeC,},}go func() { // 开启http serverif err := srv.ListenAndServe(); err != nil {log.Fatal(err)}}()// exit when raft goes downif err, ok := <-errorC; ok {log.Fatal(err)}
}

现在的关键是httpKVAPI类型,它基于由raft支撑的key-value存储来处理http请求,下面是该类型细节:

// contrib/raftexample/httpapi.go文件
// Handler for a http based key-value store backed by raft
type httpKVAPI struct {store       *kvstoreconfChangeC chan<- raftpb.ConfChange
}func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {key := r.RequestURIdefer r.Body.Close()switch {case r.Method == "PUT": // ref-3 设置键值对时,是用的put方法,在该模块的reademe文件有提到。v, err := ioutil.ReadAll(r.Body) // 读取客户端传递过来的bodyif err != nil {log.Printf("Failed to read on PUT (%v)\n", err)http.Error(w, "Failed on PUT", http.StatusBadRequest)return}h.store.Propose(key, string(v)) // ref-4 kvstore处理存储键值对// Optimistic-- no waiting for ack from raft. Value is not yet// committed so a subsequent GET on the key may return old valuew.WriteHeader(http.StatusNoContent)case r.Method == "GET":if v, ok := h.store.Lookup(key); ok {w.Write([]byte(v))} else {http.Error(w, "Failed to GET", http.StatusNotFound)}case r.Method == "POST":url, err := ioutil.ReadAll(r.Body)if err != nil {log.Printf("Failed to read on POST (%v)\n", err)http.Error(w, "Failed on POST", http.StatusBadRequest)return}nodeId, err := strconv.ParseUint(key[1:], 0, 64)if err != nil {log.Printf("Failed to convert ID for conf change (%v)\n", err)http.Error(w, "Failed on POST", http.StatusBadRequest)return}cc := raftpb.ConfChange{Type:    raftpb.ConfChangeAddNode,NodeID:  nodeId,Context: url,}h.confChangeC <- cc// As above, optimistic that raft will apply the conf changew.WriteHeader(http.StatusNoContent)case r.Method == "DELETE":nodeId, err := strconv.ParseUint(key[1:], 0, 64)if err != nil {log.Printf("Failed to convert ID for conf change (%v)\n", err)http.Error(w, "Failed on DELETE", http.StatusBadRequest)return}cc := raftpb.ConfChange{Type:   raftpb.ConfChangeRemoveNode,NodeID: nodeId,}h.confChangeC <- cc// As above, optimistic that raft will apply the conf changew.WriteHeader(http.StatusNoContent)default:w.Header().Set("Allow", "PUT")w.Header().Add("Allow", "GET")w.Header().Add("Allow", "POST")w.Header().Add("Allow", "DELETE")http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)}
}

终于在ref-4处看到了kvstore处理存储键值对的入口,就是Propose(...)方法。下面是该方法的细节:

// contrib/raftexample/kvstore.go文件
func (s *kvstore) Propose(k string, v string) {var buf bytes.Buffer// 对key-value数据进行编码,存储到buf中if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {log.Fatal(err)}s.proposeC <- buf.String() // 将buf中的数据传递过channel
}

在代码中可以看到把数据传递给了proposeC这个channel,现在的关键就是找出来哪儿在从这个channel读取数据。

首先找到proposeC字段所在的类型定义,然后查看proposeC字段的使用点,可以看到它是在创建kvstore类型变量的时候传递进来的一个channel。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JEILdB8q-1689314736004)(images/image-20230707171843285.png)]

接着跟踪,可以发现这个channel是newKVStore(...)函数的一个入参,这个函数我们在一开始的时候分析过。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5GV6iAJv-1689314736005)(images/image-20230707172103285.png)]

我们重新回到ref-1处的代码,看看newKVStore调用是怎么传递这个关键channel的:

// contrib/raftexample/main.go文件
func main() {cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")id := flag.Int("id", 1, "node ID")kvport := flag.Int("port", 9121, "key-value server port")join := flag.Bool("join", false, "join an existing cluster")flag.Parse()proposeC := make(chan string) // 创建proposeCdefer close(proposeC)confChangeC := make(chan raftpb.ConfChange)defer close(confChangeC)// raft provides a commit stream for the proposals from the http apivar kvs *kvstore // 定义kvstoregetSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) // ref-5 传递proposeC给raft的nodekvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // ref-1 创建kvstore// the key-value http handler will propose updates to raftserveHttpKVAPI(kvs, *kvport, confChangeC, errorC) // ref-2 使用kvstore
}

现在可以断定proposeC这个channel的数据读取就在ref-5处代码调用的newRaftNode(...)里面,代码如下所示:

// contrib/raftexample/raft.go 文件
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {commitC := make(chan *commit)errorC := make(chan error)rc := &raftNode{proposeC:    proposeC, // ref-6  proposeC赋值给字段proposeCconfChangeC: confChangeC,commitC:     commitC,errorC:      errorC,id:          id,peers:       peers,join:        join,waldir:      fmt.Sprintf("raftexample-%d", id),snapdir:     fmt.Sprintf("raftexample-%d-snap", id),getSnapshot: getSnapshot,snapCount:   defaultSnapshotCount,stopc:       make(chan struct{}),httpstopc:   make(chan struct{}),httpdonec:   make(chan struct{}),logger: zap.NewExample(),snapshotterReady: make(chan *snap.Snapshotter, 1),// rest of structure populated after WAL replay}go rc.startRaft()return commitC, errorC, rc.snapshotterReady
}

我们接着跟raftNodeproposeC调用点,从下图中可以看到读取proposeC数据点只有一个。
在这里插入图片描述

我们接着看读取数据的具体代码:

// contrib/raftexample/raft.go文件
func (rc *raftNode) serveChannels() {snap, err := rc.raftStorage.Snapshot()if err != nil {panic(err)}rc.confState = snap.Metadata.ConfStaterc.snapshotIndex = snap.Metadata.Indexrc.appliedIndex = snap.Metadata.Indexdefer rc.wal.Close()ticker := time.NewTicker(100 * time.Millisecond)defer ticker.Stop()// send proposals over raftgo func() {confChangeCount := uint64(0)for rc.proposeC != nil && rc.confChangeC != nil {select {case prop, ok := <-rc.proposeC: // 读取键值对数据if !ok {rc.proposeC = nil} else {// blocks until accepted by raft state machinerc.node.Propose(context.TODO(), []byte(prop)) // ref-7 处理客户端写入的键值对}case cc, ok := <-rc.confChangeC:if !ok {rc.confChangeC = nil} else {confChangeCount++cc.ID = confChangeCountrc.node.ProposeConfChange(context.TODO(), cc)}}}// client closed channel; shutdown raft if not alreadyclose(rc.stopc)}()...... // 省略
}

我们接着看ref-7raftNode是怎么处理键值对写入的,由于Node是一个接口,我们需要看看这个Propose(...)方法的实现:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TQpZJlzu-1689314736006)(images/image-20230707173531638.png)]

可以看到在raft模块中只有一个实现,在node.go文件中,如下所示:

// raft/node.go文件
func (n *node) Propose(ctx context.Context, data []byte) error {return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

可以看到,把数据放入到了pb.Entry中,并且将pb.Message的消息类型设置为了pb.MsgProp。我们接着看stepWait(...)方法:

// raft/node.go 文件
func (n *node) stepWait(ctx context.Context, m pb.Message) error {return n.stepWithWaitOption(ctx, m, true)
}// 进入到使用消息的状态机中。
// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {if m.Type != pb.MsgProp { // 如果消息类型不是pb.MsgPropselect {case n.recvc <- m:return nilcase <-ctx.Done():return ctx.Err()case <-n.done:return ErrStopped}}ch := n.propc // 赋值channelpm := msgWithResult{m: m} // 依据消息构建msgWithResult类型变量if wait { // 上游传递是truepm.result = make(chan error, 1) // 创建接收处理结果的channel}select {case ch <- pm: // ref-7  将构建的消息发送出去if !wait {return nil}case <-ctx.Done():return ctx.Err()case <-n.done:return ErrStopped}select {case err := <-pm.result: // ref-8  等待处理结果if err != nil {return err}case <-ctx.Done():return ctx.Err()case <-n.done:return ErrStopped}return nil
}

ref-7处是在将消息发送出去,那么现在的关键就是消息在哪儿读取的呢?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HhXH0t68-1689314736006)(images/image-20230707174633031.png)]

依据调用点信息,我们找到如下使用propc的地方:

// raft/node.go文件
func (n *node) run() {var propc chan msgWithResultvar readyc chan Readyvar advancec chan struct{}var rd Readyr := n.rn.raftlead := Nonefor {if advancec != nil {readyc = nil} else if n.rn.HasReady() {// Populate a Ready. Note that this Ready is not guaranteed to// actually be handled. We will arm readyc, but there's no guarantee// that we will actually send on it. It's possible that we will// service another channel instead, loop around, and then populate// the Ready again. We could instead force the previous Ready to be// handled first, but it's generally good to emit larger Readys plus// it simplifies testing (by emitting less frequently and more// predictably).rd = n.rn.readyWithoutAccept()readyc = n.readyc}if lead != r.lead {if r.hasLeader() {if lead == None {r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)} else {r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)}propc = n.propc // ref-9 将节点的propc赋值给变量propc} else {r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)propc = nil}lead = r.lead}select {// TODO: maybe buffer the config propose if there exists one (the way// described in raft dissertation)// Currently it is dropped in Step silently.case pm := <-propc:  // 读取propc中的数据m := pm.m // 将pb.Message取出来m.From = r.iderr := r.Step(m) // ref-9if pm.result != nil {pm.result <- errclose(pm.result)}...... // 省略其他casecase <-advancec:n.rn.Advance(rd)rd = Ready{}advancec = nilcase c := <-n.status:c <- getStatus(r)case <-n.stop:close(n.done)return}}

该run()方法在前一篇博文中分析过,在此就不在赘述。我们接着看ref-9处是如何在step(...)方法中处理消息的:

// raft/raft.go文件
func (r *raft) Step(m pb.Message) error {// Handle the message term, which may result in our stepping down to a follower.switch { // 处理消息的任期数据case m.Term == 0: // 由于前面的数据都没有设置term,所以会走这个case// local messagecase m.Term > r.Term:...... // 省略case m.Term < r.Term:...... // 省略}switch m.Type {case pb.MsgHup:...... // 省略case pb.MsgVote, pb.MsgPreVote:...... // 省略default:err := r.step(r, m) // ref-10 处理消息if err != nil {return err}}return nil
}

我们继续看ref-10处是如何处理消息的,下面是该函数的访问点:
在这里插入图片描述

我们知道当前分析的是Leader节点,所以可以直接锁定唯一调用点就是将stepLeader赋值给r.step,代码如下所示:

// raft/raft.go文件
func (r *raft) becomeLeader() {// TODO(xiangli) remove the panic when the raft implementation is stableif r.state == StateFollower {panic("invalid transition [follower -> leader]")}r.step = stepLeader // ref-11 将stepLeader赋值给step字段r.reset(r.Term)r.tick = r.tickHeartbeatr.lead = r.idr.state = StateLeader...... // 省略
}

现在的关键就是stepLeader函数了。becomeLeader在上一篇博客中也提到过。下面我们接着看stepLeader函数细节:

// raft/raft.go文件
func stepLeader(r *raft, m pb.Message) error {// These message types do not require any progress for m.From.switch m.Type {case pb.MsgBeat:...... // 省略return nilcase pb.MsgCheckQuorum:...... // 省略return nilcase pb.MsgProp: // 依据前文阅读代码,消息类型是MsgProp,所以会走这个分支if len(m.Entries) == 0 {r.logger.Panicf("%x stepped empty MsgProp", r.id)}if r.prs.Progress[r.id] == nil {// If we are not currently a member of the range (i.e. this node// was removed from the configuration while serving as leader),// drop any new proposals.return ErrProposalDropped}if r.leadTransferee != None {r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)return ErrProposalDropped}for i := range m.Entries {e := &m.Entries[i]var cc pb.ConfChangeIif e.Type == pb.EntryConfChange { // 如果是配置改变var ccc pb.ConfChangeif err := ccc.Unmarshal(e.Data); err != nil {panic(err)}cc = ccc} else if e.Type == pb.EntryConfChangeV2 { // 如果是配置改变的V2版本var ccc pb.ConfChangeV2if err := ccc.Unmarshal(e.Data); err != nil {panic(err)}cc = ccc}if cc != nil {alreadyPending := r.pendingConfIndex > r.raftLog.appliedalreadyJoint := len(r.prs.Config.Voters[1]) > 0wantsLeaveJoint := len(cc.AsV2().Changes) == 0var refused stringif alreadyPending {refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)} else if alreadyJoint && !wantsLeaveJoint {refused = "must transition out of joint config first"} else if !alreadyJoint && wantsLeaveJoint {refused = "not in joint state; refusing empty conf change"}if refused != "" {r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)m.Entries[i] = pb.Entry{Type: pb.EntryNormal}} else {r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1}}}if !r.appendEntry(m.Entries...) { // ref-13 将entry数据追加到raftlog中return ErrProposalDropped}r.bcastAppend() // ref-12 将entry数据广播到其他节点上return nilcase pb.MsgReadIndex:...... // 省略return nil}// All other message types require a progress for m.From (pr).pr := r.prs.Progress[m.From]if pr == nil {r.logger.Debugf("%x no progress available for %x", r.id, m.From)return nil}switch m.Type {...... // 省略}

ref-12处的代码是我们的关注点,接着看看数据是怎么广播出去的:

// raft/raft.go文件
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {// r.prs字段记录着其他节点的信息。这个visit方法就是遍历其他所有节点,然后发送信息r.prs.Visit(func(id uint64, _ *tracker.Progress) {if id == r.id {return}r.sendAppend(id) // ref-14 发送数据给其他节点})
}

我们接着看看怎么发送数据给其他节点的:

// raft/raft.go 文件
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {r.maybeSendAppend(to, true)
}
// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {pr := r.prs.Progress[to]if pr.IsPaused() {return false}m := pb.Message{}m.To = to// 从r.raftlog中获取任期和entry数据。这个地方就和前面往r.raftlog中存入日志呼应起来了。term, errt := r.raftLog.term(pr.Next - 1)ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)if len(ents) == 0 && !sendIfEmpty {return false}if errt != nil || erre != nil { // send snapshot if we failed to get term or entries...... // 省略对错误情况的处理} else {// 组装要发送的消息m.Type = pb.MsgApp  // 注意这个消息类型是pb.MsgAppm.Index = pr.Next - 1m.LogTerm = termm.Entries = entsm.Commit = r.raftLog.committedif n := len(m.Entries); n != 0 {switch pr.State {// optimistically increase the next when in StateReplicatecase tracker.StateReplicate:last := m.Entries[n-1].Indexpr.OptimisticUpdate(last)pr.Inflights.Add(last)case tracker.StateProbe:pr.ProbeSent = truedefault:r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)}}}r.send(m) // 发送数据return true
}

现在的关键点,在于r.send(m)是如何将数据发送出去的:

// raft/raft.go文件
// send schedules persisting state to a stable storage and AFTER that
// sending the message (as part of next Ready message processing).
func (r *raft) send(m pb.Message) {if m.From == None {m.From = r.id}if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {if m.Term == 0 {// All {pre-,}campaign messages need to have the term set when// sending.// - MsgVote: m.Term is the term the node is campaigning for,//   non-zero as we increment the term when campaigning.// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was//   granted, non-zero for the same reason MsgVote is// - MsgPreVote: m.Term is the term the node will campaign,//   non-zero as we use m.Term to indicate the next term we'll be//   campaigning for// - MsgPreVoteResp: m.Term is the term received in the original//   MsgPreVote if the pre-vote was granted, non-zero for the//   same reasons MsgPreVote ispanic(fmt.Sprintf("term should be set when sending %s", m.Type))}} else {if m.Term != 0 {panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))}// do not attach term to MsgProp, MsgReadIndex// proposals are a way to forward to the leader and// should be treated as local message.// MsgReadIndex is also forwarded to leader.if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {m.Term = r.Term}}r.msgs = append(r.msgs, m) // 将消息m追加到r.msgs上
}

消息被追加到r.msgs上,那么哪儿又在读取这个r.msgs呢?只有一个地方,该r.msgs被赋值给其他字段:

// raft/node.go文件
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {rd := Ready{Entries:          r.raftLog.unstableEntries(),CommittedEntries: r.raftLog.nextEnts(),Messages:         r.msgs, // 将r.msgs赋值给Messages}...... // 省略其他处理return rd
}

传输Messages的地方如下所示:

func (rc *raftNode) serveChannels() {...... // 省略// event loop on raft state machine updatesfor {select {case <-ticker.C:rc.node.Tick()// store raft entries to wal, then publish over commit channelcase rd := <-rc.node.Ready():rc.wal.Save(rd.HardState, rd.Entries)if !raft.IsEmptySnap(rd.Snapshot) {rc.saveSnap(rd.Snapshot)rc.raftStorage.ApplySnapshot(rd.Snapshot)rc.publishSnapshot(rd.Snapshot)}rc.raftStorage.Append(rd.Entries)rc.transport.Send(rd.Messages) // 调用传输模块,发送消息。这个传输模块是ETCD的etcdserver模块提供的。applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))if !ok {rc.stop()return}rc.maybeTriggerSnapshot(applyDoneC)rc.node.Advance()case err := <-rc.transport.ErrorC:rc.writeError(err)returncase <-rc.stopc:rc.stop()return}}
}

2.2 消息的接收

在集群中,Follower会接收到leader的消息,我们直接看becomeFollower函数,如下所示:

// raft/raft.go 文件
func (r *raft) becomeFollower(term uint64, lead uint64) {r.step = stepFollower // ref-15 设置处理消息接收的函数r.reset(term)r.tick = r.tickElectionr.lead = leadr.state = StateFollowerr.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

我们接着看关键函数stepFollower,如下所示:

// raft/raft.go文件
func stepFollower(r *raft, m pb.Message) error {switch m.Type {case pb.MsgProp:if r.lead == None {r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)return ErrProposalDropped} else if r.disableProposalForwarding {r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)return ErrProposalDropped}m.To = r.leadr.send(m)case pb.MsgApp: // 上文中Leader最后发送的消息类型就是pb.MsgApp,因此会走这个分支r.electionElapsed = 0r.lead = m.Fromr.handleAppendEntries(m) // ref-16 处理消息中的entries数据case pb.MsgHeartbeat:r.electionElapsed = 0r.lead = m.Fromr.handleHeartbeat(m)case pb.MsgSnap:r.electionElapsed = 0r.lead = m.Fromr.handleSnapshot(m)case pb.MsgTransferLeader:if r.lead == None {r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)return nil}m.To = r.leadr.send(m)case pb.MsgTimeoutNow:r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)// Leadership transfers never use pre-vote even if r.preVote is true; we// know we are not recovering from a partition so there is no need for the// extra round trip.r.hup(campaignTransfer)case pb.MsgReadIndex:if r.lead == None {r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)return nil}m.To = r.leadr.send(m)case pb.MsgReadIndexResp:if len(m.Entries) != 1 {r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))return nil}r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})}return nil
}

我们接着看关键函数handleAppendEntries,如下所示:

// raft/raft.go文件
func (r *raft) handleAppendEntries(m pb.Message) {if m.Index < r.raftLog.committed { // 如果消息的index小于提交的记录,则什么也不做。r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})return}// 开始追加entry数据if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})} else {...... // 省略}
}

现在关键步骤是r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...),我们接着看:

// raft/log.go 文件
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {if l.matchTerm(index, logTerm) {lastnewi = index + uint64(len(ents))ci := l.findConflict(ents)switch {case ci == 0:case ci <= l.committed:l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)default:offset := index + 1l.append(ents[ci-offset:]...) // ref-16 将数据追加到日志中}l.commitTo(min(committed, lastnewi)) // 提交数据return lastnewi, true}return 0, false
}

ref-16处代码在处理数据的追加,详细细节如下:

// raft/log.go文件
func (l *raftLog) append(ents ...pb.Entry) uint64 {if len(ents) == 0 {return l.lastIndex()}if after := ents[0].Index - 1; after < l.committed {l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)}l.unstable.truncateAndAppend(ents)return l.lastIndex()
}// raft/log_unstable.go文件
func (u *unstable) truncateAndAppend(ents []pb.Entry) {after := ents[0].Indexswitch {case after == u.offset+uint64(len(u.entries)):// after is the next index in the u.entries// directly appendu.entries = append(u.entries, ents...)case after <= u.offset:u.logger.Infof("replace the unstable entries from index %d", after)// The log is being truncated to before our current offset// portion, so set the offset and replace the entriesu.offset = afteru.entries = entsdefault:// truncate to after and copy to u.entries// then appendu.logger.Infof("truncate the unstable entries before index %d", after)u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)u.entries = append(u.entries, ents...)}
}func (u *unstable) truncateAndAppend(ents []pb.Entry) {after := ents[0].Indexswitch {case after == u.offset+uint64(len(u.entries)):// after is the next index in the u.entries// directly appendu.entries = append(u.entries, ents...)case after <= u.offset:u.logger.Infof("replace the unstable entries from index %d", after)// The log is being truncated to before our current offset// portion, so set the offset and replace the entriesu.offset = afteru.entries = entsdefault:// truncate to after and copy to u.entries// then appendu.logger.Infof("truncate the unstable entries before index %d", after)u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)u.entries = append(u.entries, ents...)}
}

日志复制流程的分析到这儿就结束了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_331050.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AP5193 DC-DC宽电压LED降压恒流驱动器 LED电源驱动IC

产品 AP5193是一款PWM工作模式、外围简单、内置功率MOS管&#xff0c;适用于4.5-100V输入的高精度降压LED恒流驱动芯片。电流2.5A。AP5193可实现线性调光和PWM调光&#xff0c;线性调光脚有效电压范围0.55-2.6V.AP5193 工作频率可以通过RT 外部电阻编程来设定&#xff0c;同时…

linux下一个iic驱动(按键+点灯)-互斥

一、前提&#xff1a; 硬件部分&#xff1a; 1. rk3399开发板&#xff0c;其中的某一路iic&#xff0c;这个作为总线的主控制器 2. gd32单片机&#xff0c;其中的某一路iic&#xff0c;从设备。主要是按键上报和灯的亮灭控制。&#xff08;按键大约30个&#xff0c;灯在键的…

day34-servlet 分页

0目录 servlet 1.分页 分页逻辑1&#xff1a;数据库中20条记录&#xff0c;要求每页5条数据&#xff0c;则一共有4页 分页逻辑2&#xff1a;数据库中21条记录&#xff0c;要求每页5条数据&#xff0c;则一共有5页 分页逻辑3&#xff1a;数据库中19条记录&#xff0c;要求每页…

数字 IC 设计职位经典笔/面试题(二)

共100道经典笔试、面试题目&#xff08;文末可全领&#xff09; FPGA 中可以综合实现为 RAM/ROM/CAM 的三种资源及其注意事项&#xff1f; 三种资源&#xff1a;BLOCK RAM&#xff0c;触发器&#xff08;FF&#xff09;&#xff0c;查找表&#xff08;LUT&#xff09;&#xf…

【算法基础】2.1栈和队列(单调栈和单调队列)

文章目录 例题3302. 表达式求值&#xff08;栈的应用&#xff09;&#x1f62d;&#x1f62d;&#x1f62d;&#x1f62d;&#x1f62d;830. 单调栈知识点解法 154. 滑动窗口 &#xff08;单调队列&#xff09;知识点解法 相关链接 & 相关题目 例题 3302. 表达式求值&…

基于weka手工实现多层感知机(BPNet)

一、BP网络 1.1 单层感知机 单层感知机&#xff0c;就是只有一层神经元&#xff0c;它的模型结构如下1&#xff1a; 对于权重 w w w的更新&#xff0c;我们采用如下公式&#xff1a; w i w i Δ w i Δ w i η ( y − y ^ ) x i (1) w_iw_i\Delta w_i \\ \Delta w_i\eta(y…

RabbitMQ 同样的操作一次成功一次失败

RabbitMQ 是一个功能强大的消息队列系统&#xff0c;广泛应用于分布式系统中。然而&#xff0c;我遇到这样的情况&#xff1a;执行同样的操作&#xff0c;一次成功&#xff0c;一次失败。在本篇博文中&#xff0c;我将探讨这个问题的原因&#xff0c;并提供解决方法。 我是在表…

创作一周年纪念日【道阻且长,行则将至】

✨个人主页&#xff1a; 北 海 &#x1f389;所属专栏&#xff1a; 技术之外的往事 &#x1f383;所处时段&#xff1a; 大学生涯[1/2] 文章目录 一、起点一切皆有定数 二、成果尽心、尽力 三、相遇孤举者难起&#xff0c;众行者易趋 四、未来长风破浪会有时&#xff0c;直挂云…

markdown2html 转化流程

定义一个extensions function markedMention() {return {extensions: [{name: mention,level: inline,start(src) {// console.log("markedMention start....", src);return src.indexOf(#)},tokenizer(src, tokens) {const rule /^(#[a-zA-Z0-9])\s?/const match…

JMeter websocket接口测试

前言 在一个网站中&#xff0c;很多数据需要即时更新&#xff0c;比如期货交易类的用户资产。在以前&#xff0c;这种功能的实现一般使用http轮询&#xff0c;即客户端用定时任务每隔一段时间向服务器发送查询请求来获取最新值。这种方式的弊端显而易见&#xff1a; 有可能造…

解决Gson解析json字符串,Integer变为Double类型的问题

直接上代码记录下。我代码里没有Gson包&#xff0c;用的是nacos对Gson的封装&#xff0c;只是包不同&#xff0c;方法都一样 import com.alibaba.nacos.shaded.com.google.common.reflect.TypeToken; import com.alibaba.nacos.shaded.com.google.gson.*;import java.util.Map;…

idea-控制台输出乱码问题

idea-控制台输出乱码问题 现象描述&#xff1a; 今天在进行IDEA开发WEB工程调式的时候控制台日志输出了乱码&#xff0c;如下截图 其实开发者大多都知道乱码是 编码不一致导致的&#xff0c;但是有时候就是不知到哪些地方不一致&#xff0c;今天我碰到的情况可能和你的不相同…

前端框架Layui实现动态表格效果用户管理实例(对表格进行CRUD操作-附源码)

目录 一、前言 1.什么是表格 2.表格的使用范围 二、案例实现 1.案例分析 ①根据需求找到文档源码 ②查询结果在实体中没有该属性 2.dao层编写 ①BaseDao工具类 ②UserDao编写 3.Servlet编写 ①R工具类的介绍 ②Useraction编写 4.jsp页面搭建 ①userManage.jsp ②…

Docker基础——初识Docker

Docker架构 Docker 使用客户端-服务器 (C/S) 架构模式&#xff0c;使用远程API来管理和创建Docker容器。 Docker 客户端(Client) : Docker 客户端通过命令行或者其他工具使用 Docker SDK (https://docs.docker.com/develop/sdk/) 与 Docker 的守护进程通信。Docker 主机(Host…

搭建微服务工程 【详细步骤】

一、准备阶段 &#x1f349; 本篇文章用到的技术栈 mysqlmybatis[mp]springbootspringcloud alibaba 需要用到的数据库 订单数据库: SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS 0;-- ---------------------------- -- Table structure for shop_order -- --------------…

windows下配置pytorch + yolov8+vscode,并自定义数据进行训练、摄像头实时预测

最近由于工程需要&#xff0c;研究学习了一下windows下如何配置pytorch和yolov8&#xff0c;并自己搜集数据进行训练和预测&#xff0c;预测使用usb摄像头进行实时预测。在此记录一下全过程 一、软件安装和配置 1. vscode安装 windows平台开发python&#xff0c;我采用vscod…

Docker架构

目录 Docker总架构图Docker ClientDocker DaemonDocker ServerDocker EngineJob Docker RegistryGraphDriverGraphDriverNetworkDriverExecDriver LibcontainerDocker Container Docker可以帮助用户在容器内部快速自动化部署应用&#xff0c;并利用Linux内核特性命名空间&#…

uniapp微信小程序使用axios(vue3+axios+ts版)

版本号 "vue": "^3.2.45", "axios": "^1.4.0", "axios-miniprogram-adapter": "^0.3.5", 安装axios及axios适配器&#xff0c;适配小程序 yarn add axios axios-miniprogram-adapter 使用axios 在utils创建utils/…

二、学习回归 - 基于广告费预测点击量

山外风雨三尺剑 有事提剑下山去 云中花鸟一屋书 无忧翻书圣贤来 1.设置问题 以Web广告和点击量的关系为例来学习回归。 前提&#xff1a;投入的广告费越多&#xff0c;广告的点击量就越高。 根据以往的经验数据&#xff0c;可以得到下图&#xff1a; 那么假设我要投200块的广…

NFTScan 与 Decert 达成合作伙伴,双方在 NFT 数据方面展开合作

近日&#xff0c;NFT 数据基础设施 NFTScan 与 Decert 达成合作伙伴关系&#xff0c;双方在多链 NFT 数据层面展开合作。在 Decert 产品中&#xff0c;由 NFTScan 为其提供专业的多链 NFT 数据支持&#xff0c;为用户带来优质的 NFT 搜索查询等相关交互功能&#xff0c;提升用户…