以太坊源码解读(10)广播和同步 protocolManager及其handle()方法
前面提到了ProtocolManager,从字面上看是协议管理器,负责着p2p通信协议的管理。它连接了p2p的逻辑层peer与顶层peer之间的调用,从顶层将协议传递至逻辑层,再从逻辑层得到message传递到顶层。
1、fastSync规定了同步的模式 ,通过atmoic.LoadUint32(&pm.fastSync) == 1或0来打开或关闭fastSync模式; 2、acceptTxs是节点是否接受交易的阀门,只有当pm.acceptTxs == 1时,节点才会接受交易。这个操作只会在同步结束后再开始,即同步的时候节点是不会接受交易的; 3、SubProtocols中是以太坊的通讯协议,通常只有一个值,即eth63。在以太坊中规定了一个常量ProtocolVersion = [eth63, eth62],它规定了以太坊的两个版本,这两种版本的ProtocolLength分别是[17,8]。 4、downloader是一个下载器,从远程网络节点中获取hashes和blocks。 5、fetcher则收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。 6、peers是经过验证可信任的通信节点的集合。
一、protocolManager的初始化和启动:NewProtocolManager和Start()
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ networkID: networkID, eventMux: mux, txpool: txpool, blockchain: blockchain, chainconfig: config, peers: newPeerSet(), newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { log.Warn("Blockchain not empty, fast sync disabled") mode = downloader.FullSync } if mode == downloader.FastSync { manager.fastSync = uint32(1) } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { // Skip protocol version if incompatible with the mode of operation if mode == downloader.FastSync && version < eth63 { continue } // Compatible; initialise the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: ProtocolName, Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() return manager.handle(peer) case <-manager.quitSync: return p2p.DiscQuitting } }, NodeInfo: func() interface{} { return manager.NodeInfo() }, PeerInfo: func(id discover.NodeID) interface{} { if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { return p.Info() } return nil }, }) } if len(manager.SubProtocols) == 0 { return nil, errIncompatibleConfig } // Construct the different synchronisation mechanisms manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) } heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() } inserter := func(blocks types.Blocks) (int, error) { // If fast sync is running, deny importing weird blocks if atomic.LoadUint32(&manager.fastSync) == 1 { log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) return 0, nil } atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import return manager.blockchain.InsertChain(blocks) } manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) return manager, nil }
NewProtocolManager()的逻辑如下: 1、初始化protocolManager对象,包括事件复用、区块链、交易池以及节点等; 2、启动fastSync或fullSync,以太坊启动时默认的同步方式是fastSync模式,这种方式 直接下载区块头、区块体和状态,不执行交易,直到最后64个区块再采取fullSync模式,这种模式不下载状态,而是验证并执行每个交易,消耗CPU; 3、将ethereum协议配置到SubProtocols,设置ethereum的Run()方法,等待p2p.peer模块的调用; 4、新建downloader下载器; 5、定义validator、heighter和inserter三个函数辅助验证和插入新区块,并以此新建fetcher。
node.Start()会启动以太坊协议,ethereum.Start()会执行protocolManager.Start():
func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop() }
ProtocolManager.Start()启动了四条go程,分别是交易订阅广播协程(txBroadcastLoop)、挖矿订阅协程(minedBroadcastLoop)、节点定期同步协程(syncer)和交易同步协程(txsyncLoop)。
1. 创建一个新交易的订阅通道,并启动交易广播的goroutine。广播新出现的交易对象。txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
2. 创建一个挖坑的订阅通道,并启动挖坑广播的goroutine。广播新挖掘出的区块。minedBroadcastLoop()持续等待本个体的新挖掘出区块事件,然后立即广播给需要的相邻个体。当不再订阅新挖掘区块事件时,这个函数才会结束等待并返回。很有意思的是,在收到新挖掘出区块事件后,minedBroadcastLoop()会连续调用两次BroadcastBlock(),两次调用仅仅一个bool型参数@propagate不一样,当该参数为true时,会将整个新区块依次发给相邻区块中的一小部分;而当其为false时,仅仅将新区块的Hash值和Number发送给所有相邻列表。
3. pm.syncer() 启动同步goroutine,定时的和网络其他节点同步,并处理网络节点的相关通知。定时与相邻个体进行区块全链的强制同步。syncer()首先启动fetcher成员,然后进入一个无限循环,每次循环中都会向相邻peer列表中“最优”的那个peer作一次区块全链同步。发起上述同步的理由分两种:如果有新登记(加入)的相邻个体,则在整个peer列表数目大于5时,发起之;如果没有新peer到达,则以10s为间隔定时的发起之。这里所谓"最优"指的是peer中所维护区块链的TotalDifficulty(td)最高,由于Td是全链中从创世块到最新头块的Difficulty值总和,所以Td值最高就意味着它的区块链是最新的,跟这样的peer作区块全链同步,显然改动量是最小的,此即"最优"。
4. pm.txsyncLoop() 启动交易同步goroutine,把新的交易均匀的同步给网路节点。ProtocolManager随着ethereum.Start()而启动,txsyncLoop()在s:=<-pm.txsyncCh处阻塞,直到新的节点出现,通过验证并启动peer.run()之后,通过pm.handle(peer)调用syncTransactions(peer),将交易打包成txsync{p, txs}结构,传入txsyncCh通道。从而使txssyncLoop()解除阻塞,执行Send(s)将交易发送到这个peer。
peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。
二、交给其他peer的回调函数handle()
func (pm *ProtocolManager) handle(p *peer) error { // Ignore maxPeers if this is a trusted peer if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { return p2p.DiscTooManyPeers } p.Log().Debug("Ethereum peer connected", "name", p.Name()) // Execute the Ethereum handshake var ( genesis = pm.blockchain.Genesis() head = pm.blockchain.CurrentHeader() hash = head.Hash() number = head.Number.Uint64() td = pm.blockchain.GetTd(hash, number) ) if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil { p.Log().Debug("Ethereum handshake failed", "err", err) return err } if rw, ok := p.rw.(*meteredMsgReadWriter); ok { rw.Init(p.version) } // Register the peer locally if err := pm.peers.Register(p); err != nil { p.Log().Error("Ethereum peer registration failed", "err", err) return err } defer pm.removePeer(p.id) // Register the peer in the downloader. If the downloader considers it banned, we disconnect if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { return err } // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { p.Log().Debug("Ethereum message handling failed", "err", err) return err } } }
handle()函数针对一个新peer做了如下几件事:
- p.Handshakes()握手,与对方peer沟通己方的区块链状态;
- p.rw初始化一个读写通道,用以跟对方peer相互数据传输。
- p.peers.Register()注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
- Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
- 调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,推送到内部通道txsyncCh。
- 在无限循环中启动handleMsg(),当对方peer发出任何msg时,handleMsg()可以捕捉相应类型的消息并在己方进行处理。
以太坊的通信基于以太坊协议,以太坊协议是Service的实现体,其构造函数收集于Node中的map[reflect.Type]Service中。Node将其中的Protocol交给Server对象。当Node.Start()时,则Server执行Start(),即执行ethereum的Start()。随即会新建protocolManager,将以太坊协议交给后者,并实现了对protocol接口的实现:
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: ProtocolName, Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() return manager.handle(peer) case <-manager.quitSync: return p2p.DiscQuitting } }, NodeInfo: func() interface{} { return manager.NodeInfo() }, PeerInfo: func(id discover.NodeID) interface{} { if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { return p.Info() } return nil }, })
Server.Start()中启动一个单独线程(listenLoop())去监听某个端口有无主动发来的IP连接;另外一个单独线程启动run()函数,在无限循环里处理接收到的任何新消息新对象。在run()函数中,如果有远端peer发来连接请求(新的p2p.conn{}),则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
handle()在ProtocolManager对象创建时包含在p2p.Protocol对象的Run方法中,一同被交给新的peer对象,负责对对方peer的沟通,向对方peer发起请求并等待接受对方的处理信息,故而是回调函数。
下一章:以太坊源码解读(11)广播和同步 同步模块
以太坊网络中不断重复着广播和同步,这样才能保证对以太坊全网的规范链的维护和统一。一、广播【广播】主要是广播区块、区块hash和交易,分别通过ProtocolManager.BroadcastBlo ...