JobPlus知识库 IT 工业智能4.0 文章
Tendermint同步区块

最近在学习Tendermint的代码,记录下同步区块的流程,直接跳过P2P阶段,后续再写一篇文章记录P2P流程吧

blockchain/pool.go的OnStart()新建了gorountine来发起获取区块请求

[plain] 

  1. func (pool *BlockPool) OnStart() error {  
  2.     go pool.makeRequestersRoutine()  
  3.     pool.startTime = time.Now()  
  4.     return nil  
  5. }  

看下makeRequestersRoutine的代码

[plain]

  1. // Run spawns requesters as needed.  
  2. func (pool *BlockPool) makeRequestersRoutine() {  
  3.   
  4.     for {  
  5.         if !pool.IsRunning() {  
  6.             break  
  7.         }  
  8.   
  9.         _, numPending, lenRequesters := pool.GetStatus()  
  10.         if numPending >= maxPendingRequests {  
  11.             // sleep for a bit.  
  12.             time.Sleep(requestIntervalMS * time.Millisecond)  
  13.             // check for timed out peers  
  14.             pool.removeTimedoutPeers()  
  15.         } else if lenRequesters >= maxTotalRequesters {  
  16.             // sleep for a bit.  
  17.             time.Sleep(requestIntervalMS * time.Millisecond)  
  18.             // check for timed out peers  
  19.             pool.removeTimedoutPeers()  
  20.         } else {  
  21.             // request for more blocks.  
  22.             pool.makeNextRequester()  
  23.         }  
  24.     }  
  25. }  

这里是一个for循环,获取当前pool的状态

1 如果当前pending的个数大于等于10000个,sleep 100ms,并且remove掉timeout的peer(remove timeout peer的逻辑是当前接收peer的bitrate小于10KB/S

2 如果当前request的个数大于等于10000个,sleep 100ms并且remove timeout的peer

3 新建一个请求

[plain] 

  1. func (pool *BlockPool) makeNextRequester() {  
  2.     pool.mtx.Lock()  
  3.     defer pool.mtx.Unlock()  
  4.   
  5.     nextHeight := pool.height + pool.requestersLen()  
  6.     request := newBPRequester(pool, nextHeight)  
  7.     // request.SetLogger(pool.Logger.With("height", nextHeight))  
  8.   
  9.     pool.requesters[nextHeight] = request  
  10.     pool.numPending++  
  11.   
  12.     err := request.Start()  
  13.     if err != nil {  
  14.         request.Logger.Error("Error starting request", "err", err)  
  15.     }  
  16. }  

新建请求的时候rquester和pending都会+1,然后调用request.Start(),这个调用的是blockchain/pool.go的另外一个OnStart()

[plain]

  1. func (bpr *bpRequester) OnStart() error {  
  2.     go bpr.requestRoutine()  
  3.     return nil  
  4. }  

看下requestRoutine的代码

[plain] 

  1. func (bpr *bpRequester) requestRoutine() {  
  2. OUTER_LOOP:  
  3.     for {  
  4.         // Pick a peer to send request to.  
  5.         var peer *bpPeer = nil  
  6.     PICK_PEER_LOOP:  
  7.         for {  
  8.             if !bpr.IsRunning() || !bpr.pool.IsRunning() {  
  9.                 return  
  10.             }  
  11.             peer = bpr.pool.pickIncrAvailablePeer(bpr.height)  
  12.             if peer == nil {  
  13.                 //log.Info("No peers available", "height", height)  
  14.                 time.Sleep(requestIntervalMS * time.Millisecond)  
  15.                 continue PICK_PEER_LOOP  
  16.             }  
  17.             break PICK_PEER_LOOP  
  18.         }  
  19.         bpr.mtx.Lock()  
  20.         bpr.peerID = peer.id  
  21.         bpr.mtx.Unlock()  
  22.   
  23.         // Send request and wait.  
  24.         bpr.pool.sendRequest(bpr.height, peer.id)  
  25.         ...  
  26.     }  
  27. }  

pickIncrAvailablePeer的作用是找到有高度(bpr.height)的节点

sendRequest其实是写了一个channel

[plain] 

  1. func (pool *BlockPool) sendRequest(height int64, peerID string) {  
  2.     if !pool.IsRunning() {  
  3.         return  
  4.     }  
  5.     pool.requestsCh <- BlockRequest{height, peerID}  
  6. }  

接着找requestsCh读取的地方,就在blockchain/reactor.go中:

[plain] 

  1. func (bcR *BlockchainReactor) poolRoutine() {  
  2.   
  3.     trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)  
  4.     statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)  
  5.     switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)  
  6.   
  7.     blocksSynced := 0  
  8.   
  9.     chainID := bcR.initialState.ChainID  
  10.     state := bcR.initialState  
  11.   
  12.     lastHundred := time.Now()  
  13.     lastRate := 0.0  
  14.   
  15. FOR_LOOP:  
  16.     for {  
  17.         select {  
  18.         case request := <-bcR.requestsCh: // chan BlockRequest  
  19.             peer := bcR.Switch.Peers().Get(request.PeerID)  
  20.             if peer == nil {  
  21.                 continue FOR_LOOP // Peer has since been disconnected.  
  22.             }  
  23.             msg := &bcBlockRequestMessage{request.Height}  
  24.             queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})  
  25.             if !queued {  
  26.                 // We couldn't make the request, send-queue full.  
  27.                 // The pool handles timeouts, just let it go.  
  28.                 continue FOR_LOOP  
  29.             }  
  30.         case peerID := <-bcR.timeoutsCh: // chan string  
  31.             // Peer timed out.  
  32.             peer := bcR.Switch.Peers().Get(peerID)  
  33.             if peer != nil {  
  34.                 bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))  
  35.             }  
  36.         case <-statusUpdateTicker.C:  
  37.             // ask for status updates  
  38.             go bcR.BroadcastStatusRequest() // nolint: errcheck  
  39.         case <-switchToConsensusTicker.C:  
  40.             height, numPending, lenRequesters := bcR.pool.GetStatus()  
  41.             outbound, inbound, _ := bcR.Switch.NumPeers()  
  42.             bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,  
  43.                 "outbound", outbound, "inbound", inbound)  
  44.             if bcR.pool.IsCaughtUp() {  
  45.                 bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)  
  46.                 bcR.pool.Stop()  
  47.   
  48.                 conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)  
  49.                 conR.SwitchToConsensus(state, blocksSynced)  
  50.   
  51.                 break FOR_LOOP  
  52.             }  
  53.         case <-trySyncTicker.C: // chan time  
  54.             // This loop can be slow as long as it's doing syncing work.  
  55.         SYNC_LOOP:  
  56.             for i := 0; i < 10; i++ {  
  57.                 // See if there are any blocks to sync.  
  58.                 first, second := bcR.pool.PeekTwoBlocks()  
  59.                 //bcR.Logger.Info("TrySync peeked", "first", first, "second", second)  
  60.                 if first == nil || second == nil {  
  61.                     // We need both to sync the first block.  
  62.                     break SYNC_LOOP  
  63.                 }  
  64.                 firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes)  
  65.                 firstPartsHeader := firstParts.Header()  
  66.                 firstID := types.BlockID{first.Hash(), firstPartsHeader}  
  67.                 // Finally, verify the first block using the second's commit  
  68.                 // NOTE: we can probably make this more efficient, but note that calling  
  69.                 // first.Hash() doesn't verify the tx contents, so MakePartSet() is  
  70.                 // currently necessary.  
  71.                 err := state.Validators.VerifyCommit(  
  72.                     chainID, firstID, first.Height, second.LastCommit)  
  73.                 if err != nil {  
  74.                     bcR.Logger.Error("Error in validation", "err", err)  
  75.                     bcR.pool.RedoRequest(first.Height)  
  76.                     break SYNC_LOOP  
  77.                 } else {  
  78.                     bcR.pool.PopRequest()  
  79.   
  80.                     bcR.store.SaveBlock(first, firstParts, second.LastCommit)  
  81.   
  82.                     // NOTE: we could improve performance if we  
  83.                     // didn't make the app commit to disk every block  
  84.                     // ... but we would need a way to get the hash without it persisting  
  85.                     var err error  
  86.                     state, err = bcR.blockExec.ApplyBlock(state, firstID, first)  
  87.                     if err != nil {  
  88.                         // TODO This is bad, are we zombie?  
  89.                         cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))  
  90.                     }  
  91.                     blocksSynced += 1  
  92.   
  93.                     // update the consensus params  
  94.                     bcR.updateConsensusParams(state.ConsensusParams)  
  95.   
  96.                     if blocksSynced%100 == 0 {  
  97.                         lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())  
  98.                         bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,  
  99.                             "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)  
  100.                         lastHundred = time.Now()  
  101.                     }  
  102.                 }  
  103.             }  
  104.             continue FOR_LOOP  
  105.         case <-bcR.Quit:  
  106.             break FOR_LOOP  
  107.         }  
  108.     }  
  109. }  

1 收到reqeustCh的数据,发送bcBlockRequestMessage到对端节点(对端节点收到消息后返回高度的block)。是在本文件中的Receive()函数处理的

2 本函数有个trySyncTicker 50ms的timer一直在运行,作用是拿到两个已经从对端peer获取的block,经过VerifyCommit之后,SaveBlock存在tendermint内部,再调用ApplyBlock存到application

3 也有个statusUpdateTicker 10s的timer,作用是broadcast bcStatusRequestMessage去获取对端节点的block高度

4 还有个switchToConsensusTicker 1s的timer也在运行,作用是看本节点的block高度有没有大于等于其他节点的最大高度,如果到达了(也就是同步区块完成了)就SwitchToConsensus切换fast mode到consensus mode


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
18人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序