以太坊源码解读(10)广播和同步 protocolManager及其handle()方法

前面提到了ProtocolManager,从字面上看是协议管理器,负责着p2p通信协议的管理。它连接了p2p的逻辑层peer与顶层peer之间的调用,从顶层将协议传递至逻辑层,再从逻辑层得到message传递到顶层。

1、fastSync规定了同步的模式 ,通过atmoic.LoadUint32(&pm.fastSync)  == 1或0来打开或关闭fastSync模式; 2、acceptTxs是节点是否接受交易的阀门,只有当pm.acceptTxs == 1时,节点才会接受交易。这个操作只会在同步结束后再开始,即同步的时候节点是不会接受交易的; 3、SubProtocols中是以太坊的通讯协议,通常只有一个值,即eth63。在以太坊中规定了一个常量ProtocolVersion = [eth63, eth62],它规定了以太坊的两个版本,这两种版本的ProtocolLength分别是[17,8]。 4、downloader是一个下载器,从远程网络节点中获取hashes和blocks。 5、fetcher则收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。 6、peers是经过验证可信任的通信节点的集合。

一、protocolManager的初始化和启动:NewProtocolManager和Start()

func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, 
mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, 
chaindb ethdb.Database) (*ProtocolManager, error) {
	// Create the protocol manager with the base fields
	manager := &ProtocolManager{
		networkID:   networkID,
		eventMux:    mux,
		txpool:      txpool,
		blockchain:  blockchain,
		chainconfig: config,
		peers:       newPeerSet(),
		newPeerCh:   make(chan *peer),
		noMorePeers: make(chan struct{}),
		txsyncCh:    make(chan *txsync),
		quitSync:    make(chan struct{}),
	}
	// Figure out whether to allow fast sync or not
	if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
		log.Warn("Blockchain not empty, fast sync disabled")
		mode = downloader.FullSync
	}
	if mode == downloader.FastSync {
		manager.fastSync = uint32(1)
	}
	// Initiate a sub-protocol for every implemented version we can handle
	manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
	for i, version := range ProtocolVersions {
		// Skip protocol version if incompatible with the mode of operation
		if mode == downloader.FastSync && version < eth63 {
			continue
		}
		// Compatible; initialise the sub-protocol
		version := version // Closure for the run
		manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
			Name:    ProtocolName,
			Version: version,
			Length:  ProtocolLengths[i],
			Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
				peer := manager.newPeer(int(version), p, rw)
				select {
				case manager.newPeerCh <- peer:
					manager.wg.Add(1)
					defer manager.wg.Done()
					return manager.handle(peer)
				case <-manager.quitSync:
					return p2p.DiscQuitting
				}
			},
			NodeInfo: func() interface{} {
				return manager.NodeInfo()
			},
			PeerInfo: func(id discover.NodeID) interface{} {
				if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
					return p.Info()
				}
				return nil
			},
		})
	}
	if len(manager.SubProtocols) == 0 {
		return nil, errIncompatibleConfig
	}
	// Construct the different synchronisation mechanisms
	manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)

	validator := func(header *types.Header) error {
		return engine.VerifyHeader(blockchain, header, true)
	}
	heighter := func() uint64 {
		return blockchain.CurrentBlock().NumberU64()
	}
	inserter := func(blocks types.Blocks) (int, error) {
		// If fast sync is running, deny importing weird blocks
		if atomic.LoadUint32(&manager.fastSync) == 1 {
			log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
			return 0, nil
		}
		atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
		return manager.blockchain.InsertChain(blocks)
	}
	manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)

	return manager, nil
}

NewProtocolManager()的逻辑如下: 1、初始化protocolManager对象,包括事件复用、区块链、交易池以及节点等; 2、启动fastSync或fullSync,以太坊启动时默认的同步方式是fastSync模式,这种方式 直接下载区块头、区块体和状态,不执行交易,直到最后64个区块再采取fullSync模式,这种模式不下载状态,而是验证并执行每个交易,消耗CPU; 3、将ethereum协议配置到SubProtocols,设置ethereum的Run()方法,等待p2p.peer模块的调用; 4、新建downloader下载器; 5、定义validator、heighter和inserter三个函数辅助验证和插入新区块,并以此新建fetcher。

node.Start()会启动以太坊协议,ethereum.Start()会执行protocolManager.Start():

func (pm *ProtocolManager) Start(maxPeers int) {
	pm.maxPeers = maxPeers

	// broadcast transactions
	pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
	pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
	go pm.txBroadcastLoop()

	// broadcast mined blocks
	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
	go pm.minedBroadcastLoop()

	// start sync handlers
	go pm.syncer()
	go pm.txsyncLoop()
}

ProtocolManager.Start()启动了四条go程,分别是交易订阅广播协程(txBroadcastLoop)、挖矿订阅协程(minedBroadcastLoop)、节点定期同步协程(syncer)和交易同步协程(txsyncLoop)。

1. 创建一个新交易的订阅通道,并启动交易广播的goroutine。广播新出现的交易对象。txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。

2. 创建一个挖坑的订阅通道,并启动挖坑广播的goroutine。广播新挖掘出的区块。minedBroadcastLoop()持续等待本个体的新挖掘出区块事件,然后立即广播给需要的相邻个体。当不再订阅新挖掘区块事件时,这个函数才会结束等待并返回。很有意思的是,在收到新挖掘出区块事件后,minedBroadcastLoop()会连续调用两次BroadcastBlock(),两次调用仅仅一个bool型参数@propagate不一样,当该参数为true时,会将整个新区块依次发给相邻区块中的一小部分;而当其为false时,仅仅将新区块的Hash值和Number发送给所有相邻列表。

3. pm.syncer() 启动同步goroutine,定时的和网络其他节点同步,并处理网络节点的相关通知。定时与相邻个体进行区块全链的强制同步。syncer()首先启动fetcher成员,然后进入一个无限循环,每次循环中都会向相邻peer列表中“最优”的那个peer作一次区块全链同步。发起上述同步的理由分两种:如果有新登记(加入)的相邻个体,则在整个peer列表数目大于5时,发起之;如果没有新peer到达,则以10s为间隔定时的发起之。这里所谓"最优"指的是peer中所维护区块链的TotalDifficulty(td)最高,由于Td是全链中从创世块到最新头块的Difficulty值总和,所以Td值最高就意味着它的区块链是最新的,跟这样的peer作区块全链同步,显然改动量是最小的,此即"最优"。

4. pm.txsyncLoop() 启动交易同步goroutine,把新的交易均匀的同步给网路节点。ProtocolManager随着ethereum.Start()而启动,txsyncLoop()在s:=<-pm.txsyncCh处阻塞,直到新的节点出现,通过验证并启动peer.run()之后,通过pm.handle(peer)调用syncTransactions(peer),将交易打包成txsync{p, txs}结构,传入txsyncCh通道。从而使txssyncLoop()解除阻塞,执行Send(s)将交易发送到这个peer。

peer.broadcase()是每个Peer连接的广播函数,它只广播3种消息:交易、完整的区块、区块的Hash,这样表明了节点只会主动广播这3中类型的数据,剩余的数据同步,都是通过请求-响应的方式。

二、交给其他peer的回调函数handle()

func (pm *ProtocolManager) handle(p *peer) error {
	// Ignore maxPeers if this is a trusted peer
	if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
		return p2p.DiscTooManyPeers
	}
	p.Log().Debug("Ethereum peer connected", "name", p.Name())

	// Execute the Ethereum handshake
	var (
		genesis = pm.blockchain.Genesis()
		head    = pm.blockchain.CurrentHeader()
		hash    = head.Hash()
		number  = head.Number.Uint64()
		td      = pm.blockchain.GetTd(hash, number)
	)
	if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
		p.Log().Debug("Ethereum handshake failed", "err", err)
		return err
	}
	if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
		rw.Init(p.version)
	}
	// Register the peer locally
	if err := pm.peers.Register(p); err != nil {
		p.Log().Error("Ethereum peer registration failed", "err", err)
		return err
	}
	defer pm.removePeer(p.id)

	// Register the peer in the downloader. If the downloader considers it banned, we disconnect
	if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
		return err
	}
	// Propagate existing transactions. new transactions appearing
	// after this will be sent via broadcasts.
	pm.syncTransactions(p)

	// main loop. handle incoming messages.
	for {
		if err := pm.handleMsg(p); err != nil {
			p.Log().Debug("Ethereum message handling failed", "err", err)
			return err
		}
	}
}

handle()函数针对一个新peer做了如下几件事:

  1. p.Handshakes()握手,与对方peer沟通己方的区块链状态;
  2. p.rw初始化一个读写通道,用以跟对方peer相互数据传输。
  3. p.peers.Register()注册对方peer,存入己方peer列表;只有handle()函数退出时,才会将这个peer移除出列表。
  4. Downloader成员注册这个新peer;Downloader会自己维护一个相邻peer列表。
  5. 调用syncTransactions(),用当前txpool中新累计的tx对象组装成一个txsync{}对象,推送到内部通道txsyncCh。
  6. 在无限循环中启动handleMsg(),当对方peer发出任何msg时,handleMsg()可以捕捉相应类型的消息并在己方进行处理。

以太坊的通信基于以太坊协议,以太坊协议是Service的实现体,其构造函数收集于Node中的map[reflect.Type]Service中。Node将其中的Protocol交给Server对象。当Node.Start()时,则Server执行Start(),即执行ethereum的Start()。随即会新建protocolManager,将以太坊协议交给后者,并实现了对protocol接口的实现:

manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
			Name:    ProtocolName,
			Version: version,
			Length:  ProtocolLengths[i],
			Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
				peer := manager.newPeer(int(version), p, rw)
				select {
				case manager.newPeerCh <- peer:
					manager.wg.Add(1)
					defer manager.wg.Done()
					return manager.handle(peer)
				case <-manager.quitSync:
					return p2p.DiscQuitting
				}
			},
			NodeInfo: func() interface{} {
				return manager.NodeInfo()
			},
			PeerInfo: func(id discover.NodeID) interface{} {
				if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
					return p.Info()
				}
				return nil
			},
		})

Server.Start()中启动一个单独线程(listenLoop())去监听某个端口有无主动发来的IP连接;另外一个单独线程启动run()函数,在无限循环里处理接收到的任何新消息新对象。在run()函数中,如果有远端peer发来连接请求(新的p2p.conn{}),则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。

handle()在ProtocolManager对象创建时包含在p2p.Protocol对象的Run方法中,一同被交给新的peer对象,负责对对方peer的沟通,向对方peer发起请求并等待接受对方的处理信息,故而是回调函数。

下一章:以太坊源码解读(11)广播和同步 同步模块

以太坊网络中不断重复着广播和同步,这样才能保证对以太坊全网的规范链的维护和统一。一、广播【广播】主要是广播区块、区块hash和交易,分别通过ProtocolManager.BroadcastBlo ...