最近在学习Tendermint的代码,记录下同步区块的流程,直接跳过P2P阶段,后续再写一篇文章记录P2P流程吧
blockchain/pool.go的OnStart()新建了gorountine来发起获取区块请求
[plain]
- func (pool *BlockPool) OnStart() error {
- go pool.makeRequestersRoutine()
- pool.startTime = time.Now()
- return nil
- }
看下makeRequestersRoutine的代码
[plain]
- // Run spawns requesters as needed.
- func (pool *BlockPool) makeRequestersRoutine() {
- for {
- if !pool.IsRunning() {
- break
- }
- _, numPending, lenRequesters := pool.GetStatus()
- if numPending >= maxPendingRequests {
- // sleep for a bit.
- time.Sleep(requestIntervalMS * time.Millisecond)
- // check for timed out peers
- pool.removeTimedoutPeers()
- } else if lenRequesters >= maxTotalRequesters {
- // sleep for a bit.
- time.Sleep(requestIntervalMS * time.Millisecond)
- // check for timed out peers
- pool.removeTimedoutPeers()
- } else {
- // request for more blocks.
- pool.makeNextRequester()
- }
- }
- }
这里是一个for循环,获取当前pool的状态
1 如果当前pending的个数大于等于10000个,sleep 100ms,并且remove掉timeout的peer(remove timeout peer的逻辑是当前接收peer的bitrate小于10KB/S
2 如果当前request的个数大于等于10000个,sleep 100ms并且remove timeout的peer
3 新建一个请求
[plain]
- func (pool *BlockPool) makeNextRequester() {
- pool.mtx.Lock()
- defer pool.mtx.Unlock()
- nextHeight := pool.height + pool.requestersLen()
- request := newBPRequester(pool, nextHeight)
- // request.SetLogger(pool.Logger.With("height", nextHeight))
- pool.requesters[nextHeight] = request
- pool.numPending++
- err := request.Start()
- if err != nil {
- request.Logger.Error("Error starting request", "err", err)
- }
- }
新建请求的时候rquester和pending都会+1,然后调用request.Start(),这个调用的是blockchain/pool.go的另外一个OnStart()
[plain]
- func (bpr *bpRequester) OnStart() error {
- go bpr.requestRoutine()
- return nil
- }
看下requestRoutine的代码
[plain]
- func (bpr *bpRequester) requestRoutine() {
- OUTER_LOOP:
- for {
- // Pick a peer to send request to.
- var peer *bpPeer = nil
- PICK_PEER_LOOP:
- for {
- if !bpr.IsRunning() || !bpr.pool.IsRunning() {
- return
- }
- peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
- if peer == nil {
- //log.Info("No peers available", "height", height)
- time.Sleep(requestIntervalMS * time.Millisecond)
- continue PICK_PEER_LOOP
- }
- break PICK_PEER_LOOP
- }
- bpr.mtx.Lock()
- bpr.peerID = peer.id
- bpr.mtx.Unlock()
- // Send request and wait.
- bpr.pool.sendRequest(bpr.height, peer.id)
- ...
- }
- }
pickIncrAvailablePeer的作用是找到有高度(bpr.height)的节点
sendRequest其实是写了一个channel
[plain]
- func (pool *BlockPool) sendRequest(height int64, peerID string) {
- if !pool.IsRunning() {
- return
- }
- pool.requestsCh <- BlockRequest{height, peerID}
- }
接着找requestsCh读取的地方,就在blockchain/reactor.go中:
[plain]
- func (bcR *BlockchainReactor) poolRoutine() {
- trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
- statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
- switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
- blocksSynced := 0
- chainID := bcR.initialState.ChainID
- state := bcR.initialState
- lastHundred := time.Now()
- lastRate := 0.0
- FOR_LOOP:
- for {
- select {
- case request := <-bcR.requestsCh: // chan BlockRequest
- peer := bcR.Switch.Peers().Get(request.PeerID)
- if peer == nil {
- continue FOR_LOOP // Peer has since been disconnected.
- }
- msg := &bcBlockRequestMessage{request.Height}
- queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- if !queued {
- // We couldn't make the request, send-queue full.
- // The pool handles timeouts, just let it go.
- continue FOR_LOOP
- }
- case peerID := <-bcR.timeoutsCh: // chan string
- // Peer timed out.
- peer := bcR.Switch.Peers().Get(peerID)
- if peer != nil {
- bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
- }
- case <-statusUpdateTicker.C:
- // ask for status updates
- go bcR.BroadcastStatusRequest() // nolint: errcheck
- case <-switchToConsensusTicker.C:
- height, numPending, lenRequesters := bcR.pool.GetStatus()
- outbound, inbound, _ := bcR.Switch.NumPeers()
- bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
- "outbound", outbound, "inbound", inbound)
- if bcR.pool.IsCaughtUp() {
- bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
- bcR.pool.Stop()
- conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
- conR.SwitchToConsensus(state, blocksSynced)
- break FOR_LOOP
- }
- case <-trySyncTicker.C: // chan time
- // This loop can be slow as long as it's doing syncing work.
- SYNC_LOOP:
- for i := 0; i < 10; i++ {
- // See if there are any blocks to sync.
- first, second := bcR.pool.PeekTwoBlocks()
- //bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
- if first == nil || second == nil {
- // We need both to sync the first block.
- break SYNC_LOOP
- }
- firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes)
- firstPartsHeader := firstParts.Header()
- firstID := types.BlockID{first.Hash(), firstPartsHeader}
- // Finally, verify the first block using the second's commit
- // NOTE: we can probably make this more efficient, but note that calling
- // first.Hash() doesn't verify the tx contents, so MakePartSet() is
- // currently necessary.
- err := state.Validators.VerifyCommit(
- chainID, firstID, first.Height, second.LastCommit)
- if err != nil {
- bcR.Logger.Error("Error in validation", "err", err)
- bcR.pool.RedoRequest(first.Height)
- break SYNC_LOOP
- } else {
- bcR.pool.PopRequest()
- bcR.store.SaveBlock(first, firstParts, second.LastCommit)
- // NOTE: we could improve performance if we
- // didn't make the app commit to disk every block
- // ... but we would need a way to get the hash without it persisting
- var err error
- state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
- if err != nil {
- // TODO This is bad, are we zombie?
- cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
- }
- blocksSynced += 1
- // update the consensus params
- bcR.updateConsensusParams(state.ConsensusParams)
- if blocksSynced%100 == 0 {
- lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
- bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
- "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
- lastHundred = time.Now()
- }
- }
- }
- continue FOR_LOOP
- case <-bcR.Quit:
- break FOR_LOOP
- }
- }
- }
1 收到reqeustCh的数据,发送bcBlockRequestMessage到对端节点(对端节点收到消息后返回高度的block)。是在本文件中的Receive()函数处理的
2 本函数有个trySyncTicker 50ms的timer一直在运行,作用是拿到两个已经从对端peer获取的block,经过VerifyCommit之后,SaveBlock存在tendermint内部,再调用ApplyBlock存到application
3 也有个statusUpdateTicker 10s的timer,作用是broadcast bcStatusRequestMessage去获取对端节点的block高度
4 还有个switchToConsensusTicker 1s的timer也在运行,作用是看本节点的block高度有没有大于等于其他节点的最大高度,如果到达了(也就是同步区块完成了)就SwitchToConsensus切换fast mode到consensus mode
登录 | 立即注册