以太坊源码解读(10)广播和同步 protocolManager及其handle()方法
前面提到了ProtocolManager,从字面上看是协议管理器,负责着p2p通信协议的管理。它连接了p2p的逻辑层peer与顶层peer之间的调用,从顶层将协议传递至逻辑层,再从逻辑层得到message传递到顶层。

1、fastSync规定了同步的模式 ,通过atmoic.LoadUint32(&pm.fastSync) == 1或0来打开或关闭fastSync模式; 2、acceptTxs是节点是否接受交易的阀门,只有当pm.acceptTxs == 1时,节点才会接受交易。这个操作只会在同步结束后再开始,即同步的时候节点是不会接受交易的; 3、SubProtocols中是以太坊的通讯协议,通常只有一个值,即eth63。在以太坊中规定了一个常量ProtocolVersion = [eth63, eth62],它规定了以太坊的两个版本,这两种版本的ProtocolLength分别是[17,8]。 4、downloader是一个下载器,从远程网络节点中获取hashes和blocks。 5、fetcher则收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。 6、peers是经过验证可信任的通信节点的集合。
一、protocolManager的初始化和启动:NewProtocolManager和Start()
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64,
mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain,
chaindb ethdb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
mode = downloader.FullSync
}
if mode == downloader.FastSync {
manager.fastSync = uint32(1)
}
// Initiate a sub-protocol for every implemented version we can handle
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
// Skip protocol version if incompatible with the mode of operation
if mode == downloader.FastSync && version < eth63 {
continue
}
// Compatible; initialise the sub-protocol
version := version // Closure for the run
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
if len(manager.SubProtocols) == 0 {
return nil, errIncompatibleConfig
}
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
}
heighter := func() uint64 {
return blockchain.CurrentBlock().NumberU64()
}
inserter := func(blocks types.Blocks) (int, error) {
// If fast sync is running, deny importing weird blocks
if atomic.LoadUint32(&manager.fastSync) == 1 {
log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
return manager.blockchain.InsertChain(blocks)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
return manager, nil
}
NewProtocolManager()的逻辑如下: 1、初始化protocolManager对象,包括事件复用、区块链、交易池以及节点等; 2、启动fastSync或fullSync,以太坊启动时默认的同步方式是fastSync模式,这种方式 直接下载区块头、区块体和状态,不执行交易,直到最后64个区块再采取fullSync模式,这种模式不下载状态,而是验证并执行每个交易,消耗CPU; 3、将ethereum协议配置到SubProtocols,设置ethereum的Run()方法,等待p2p.peer模块的调用; 4、新建downloader下载器; 5、定义validator、heighter和inserter三个函数辅助验证和插入新区块,并以此新建fetcher。
node.Start()会启动以太坊协议,ethereum.Start()会执行protocolManager.Start():
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
}
ProtocolManager.Start()启动了四条go程,分别是交易订阅广播协程(txBroadcastLoop)、挖矿订阅协程(minedBroadcastLoop)、节点定期同步协程(syncer)和交易同步协程(txsyncLoop)。

1. 创建一个新交易的订阅通道,并启动交易广播的goroutine。广播新出现的交易对象。txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
2. 创建一个挖坑的订阅通道,并启动挖坑广播的goroutine。广播新挖掘出的区块。minedBroadcastLoop()持续等待本个体的新挖掘出区块事件,然后立即广播给需要的相邻个体。当不再订阅新挖掘区块事件时,这个函数才会结束等待并返回。很有意思的是,在收到新挖掘出区块事件后,minedBroadcastLoop()会连续调用两次BroadcastBlock(),两次调用仅仅一个bool型参数@propagate不一样,当该参数为true时,会将整个新区块依次发给相邻区块中的一小部分;而当其为false时,仅仅将新区块的Hash值和Number发送给所有相邻列表。
3. pm.syncer() 启动同步goroutine,定时的和网络其他节点同步,并处理网络节点的相关通知。定时与相邻个体进行区块全链的强制同步。syncer()首先启动fetcher成员,然后进入一个无限循环,每次循环中都会向相邻peer列表中“最优”的那个peer作一次区块全链同步。发起上述同步的理由分两种:如果有新登记(加入)的相邻个体,则在整个peer列表数目大于5时,发起之;如果没有新peer到达,则以10s为间隔定时的发起之。这里所谓"最优"指的是peer中所维护区块链的TotalDifficulty(td)最高,由于Td是全链中从创世块到最新头块的Difficulty值总和,所以Td值最高就意味着它的区块链是最新的,跟这样的peer作区块全链同步,显然改动量是最小的,此即"最优"。
4. pm.txsyncLoop() 启动交易同步goroutine,把新的交易均匀的同步给网路节点。ProtocolManager随着ethereum.Start()而启动,txsyncLoop()在s:=<-pm.txsyncCh处阻塞,直到新的节点出现,通过验证并启动peer.run()之后,通过pm.handle(peer)调用syncTransactions(peer),将交易打包成txsync{p, txs}结构,传入txsyncCh通道。从而使txssyncLoop()解除阻塞,执行Send(s)将交易发送到这个peer。
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
二、交给其他peer的回调函数handle()
func (pm *ProtocolManager) handle(p *peer) error {
// Ignore maxPeers if this is a trusted peer
if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())
// Execute the Ethereum handshake
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)
// main loop. handle incoming messages.
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}
}
handle()函数针对一个新peer做了如下几件事:
- p.Handshakes()握手,与对方peer沟通己方的区块链状态;
- p.rw初始化一个读写通道,用以跟对方peer相互数据传输。
- p.peers.Register()注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
- Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
- 调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,推送到内部通道txsyncCh。
- 在无限循环中启动handleMsg(),当对方peer发出任何msg时,handleMsg()可以捕捉相应类型的消息并在己方进行处理。
以太坊的通信基于以太坊协议,以太坊协议是Service的实现体,其构造函数收集于Node中的map[reflect.Type]Service中。Node将其中的Protocol交给Server对象。当Node.Start()时,则Server执行Start(),即执行ethereum的Start()。随即会新建protocolManager,将以太坊协议交给后者,并实现了对protocol接口的实现:
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
Server.Start()中启动一个单独线程(listenLoop())去监听某个端口有无主动发来的IP连接;另外一个单独线程启动run()函数,在无限循环里处理接收到的任何新消息新对象。在run()函数中,如果有远端peer发来连接请求(新的p2p.conn{}),则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
handle()在ProtocolManager对象创建时包含在p2p.Protocol对象的Run方法中,一同被交给新的peer对象,负责对对方peer的沟通,向对方peer发起请求并等待接受对方的处理信息,故而是回调函数。

下一章:以太坊源码解读(11)广播和同步 同步模块
以太坊网络中不断重复着广播和同步,这样才能保证对以太坊全网的规范链的维护和统一。一、广播【广播】主要是广播区块、区块hash和交易,分别通过ProtocolManager.BroadcastBlo ...
AI 中文社