以太坊源码解读(11)广播和同步 同步模块
以太坊网络中不断重复着广播和同步,这样才能保证对以太坊全网的规范链的维护和统一。
一、广播
【广播】主要是广播区块、区块hash和交易,分别通过ProtocolManager.BroadcastBlock和ProtocolManager.BroadcastTxs两个方法执行。广播有下面几种情形:
- 1、minedBroadcastLoop()监听到新区块事件后,把新区块和区块hash分别广播出去;
- 2、从远程节点同步完成后,将CurrentBlock广播出去,此时广播的是区块hash;
- 3、txBlockcastLoop()监听到区块池的新增交易事件时会广播交易;
ProtocolManager.BroadcastBlock
- 1、筛选p2p节点中不含当前区块的节点;
- 2、如果propagate为true,将区块block和总难度td发送给一部分节点,节点数为根号n;
- 3、如果propagate为false,将区块的hash发送给所有的节点。
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash) // If propagation is requested, send to a subset of the peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.AsyncSendNewBlockHash(block) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } }
BroadcastBlock的调用时机:
1、挖矿成功:
Ethereum.Start() —> ProtocolManager.Start() —> ProtocolManager.minedBroadcastLoop —> ProtocolManager.BroadcastBlock
2、同步新区块
ProtocolManager.synchronise —> ProtocolManager.BroadcastBlock
ProtocolManager.BroadcastTxs
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { var txset = make(map[*peer]types.Transactions) // Broadcast transactions to a batch of peers not knowing about it for _, tx := range txs { peers := pm.peers.PeersWithoutTx(tx.Hash()) for _, peer := range peers { txset[peer] = append(txset[peer], tx) } log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) } // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for peer, txs := range txset { peer.AsyncSendTransactions(txs) } }
该方法将针对每一个交易,准确的将它发送到所有没有该交易的节点。
BroadcastTxs的调用时机:接收到交易池添加新交易的事件
Ethereum.Start() —> ProtocolManager.Start —> ProtocolManager.txBroadcastLoop —> ProtocolManager.BraodcastTxs
二、同步
ProtocolManager.synchronise是负责同步的函数:
- 1、确保当前区块的高度小于拥有的最高区块高度的p2p节点的高度;
- 2、获取同步模式:fastSync或fullSync;
- 3、使用该同步模式进行一次同步;
- 4、同步完成后打开交易处理阀门atomic.StoreUint32(&pm.acceptTxs, 1),允许以太坊节点接受其他节点广播的交易;
- 5、将CurrentBlock广播出去。
// synchronise tries to sync up our local block chain with a remote peer. func (pm *ProtocolManager) synchronise(peer *peer) { // Short circuit if no peers are available if peer == nil { return } // Make sure the peer's TD is higher than our own currentBlock := pm.blockchain.CurrentBlock() td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) pHead, pTd := peer.Head() if pTd.Cmp(td) <= 0 { return } // Otherwise try to sync with the downloader mode := downloader.FullSync if atomic.LoadUint32(&pm.fastSync) == 1 { // Fast sync was explicitly requested, and explicitly granted mode = downloader.FastSync } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 { // The database seems empty as the current block is the genesis. Yet the fast // block is ahead, so fast sync was enabled for this node at a certain point. // The only scenario where this can happen is if the user manually (or via a // bad block) rolled back a fast sync node below the sync point. In this case // however it's safe to reenable fast sync. atomic.StoreUint32(&pm.fastSync, 1) mode = downloader.FastSync } if mode == downloader.FastSync { // Make sure the peer's total difficulty we are synchronizing is higher. if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 { return } } // Run the sync cycle, and disable fast sync if we've went past the pivot block if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { return } if atomic.LoadUint32(&pm.fastSync) == 1 { log.Info("Fast sync complete, auto disabling") atomic.StoreUint32(&pm.fastSync, 0) } atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 { // We've completed a sync cycle, notify all peers of new state. This path is // essential in star-topology networks where a gateway node needs to notify // all its out-of-date peers of the availability of a new block. This failure // scenario will most often crop up in private and hackathon networks with // degenerate connectivity, but it should be healthy for the mainnet too to // more reliably update peers or the local TD state. go pm.BroadcastBlock(head, false) } }
ProtocolManger负责区块链的同步,其中有两个工具用于同步区块链:【Downloader】和【Fetcher】。
Fetcher:简单来说,fetcher模块收集其他Peer通知它的信息:NewBlockMsg或NewBlockHashMsg。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块。
Downloader:负责区块链同步的主要工作,在最开始的同步中从远端节点下载区块链信息,上面的ProtocolManager.synchronise就是用downloader从其他节点下载区块的。
Downloader同步有两种模式,分别是fastSync和fullSync模式。
fastSync模式:是一种不需要执行交易,而是直接下载状态和收据的同步方式,用于使本地链快速的跟上规范链的更新速度。
它的总体逻辑是:
- 1、从k桶中拥有最高td的节点,同步所有区块头;
- 2、同步了区块头后,从其他节点同步状态,收据和交易;
- 3、到达(距离最高区块高度64)时,关闭fastSync,采用fullSync模式;
- 4、同步完成后,广播自己的头区块。
fullSync模式:验证所有的交易并执行交易,在本地生成状态和收据。这种同步方式缓慢且消耗CPU和磁盘。
下一章:以太坊源码解读(12)广播和同步 Fetcher
Fetcher是基于广播的同步工具,所以Fetcher必然是通过protocolManager的handleMsg()来调用的。下面就是handleMsg中收到NewBlockHashesM ...