以太坊源码解读(13)广播和同步 Downloader

protocolManager启动了四个go程,其中包含用于定期同步的协程:syncer,该协程调用protocolManager的synchronise方法调用,选择td最高的节点进行同步:

go pm.synchronise(pm.peers.BestPeer())

synchronise获取了节点的头区块和难度值td,确定了同步模式(fastSync或fullSync),最终调用downloader的Synchronise()方法:

pm.downloader.Synchronise(peer.id, pHead, pTd, mode)

Synchronise进一步执行downloader.synchronise,该函数主要做了以下几件事: 1、确保只有一个同步goroutine在执行:atomic.CompareAndSwapInt32(&d.synchronising, 0, 1); 2、重置queue和peers,分别是同步的hash队列和下载区块的节点; 3、设置同步模式:d.mode = mode; 4、初始化同步进程:d.syncWithPeer(p, hash, td);

syncWithPeer开启了区块的同步,其同步方式是基于hash chain:

func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
	d.mux.Post(StartEvent{})
	defer func() {
		// reset on error
		if err != nil {
			d.mux.Post(FailedEvent{err})
		} else {
			d.mux.Post(DoneEvent{})
		}
	}()
	if p.version < 62 {
		return errTooOld
	}

	log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
	defer func(start time.Time) {
		log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
	}(time.Now())

	// Look up the sync boundaries: the common ancestor and the target block
	latest, err := d.fetchHeight(p)
	if err != nil {
		return err
	}
	height := latest.Number.Uint64()

	origin, err := d.findAncestor(p, height)
	if err != nil {
		return err
	}
	d.syncStatsLock.Lock()
	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
		d.syncStatsChainOrigin = origin
	}
	d.syncStatsChainHeight = height
	d.syncStatsLock.Unlock()

	// Ensure our origin point is below any fast sync pivot point
	pivot := uint64(0)
	if d.mode == FastSync {
		if height <= uint64(fsMinFullBlocks) {
			origin = 0
		} else {
			pivot = height - uint64(fsMinFullBlocks)
			if pivot <= origin {
				origin = pivot - 1
			}
		}
	}
	d.committed = 1
	if d.mode == FastSync && pivot != 0 {
		d.committed = 0
	}
	// Initiate the sync using a concurrent header and content retrieval algorithm
	d.queue.Prepare(origin+1, d.mode)
	if d.syncInitHook != nil {
		d.syncInitHook(origin, height)
	}

	fetchers := []func() error{
		func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
		func() error { return d.fetchBodies(origin + 1) },          // Bodies are retrieved during normal and fast sync
		func() error { return d.fetchReceipts(origin + 1) },        // Receipts are retrieved during fast sync
		func() error { return d.processHeaders(origin+1, pivot, td) },
	}
	if d.mode == FastSync {
		fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
	} else if d.mode == FullSync {
		fetchers = append(fetchers, d.processFullSyncContent)
	}
	return d.spawnSync(fetchers)
}

1、首先获取远程节点peer的最新区块头:lastest,err = d.fetchHeight(p)。从节点p根据head hash(建立节点handshake的时候节点p传过来的最新的head hash)获取header,发送GetBlockHeadersMsg消息,节点p收到消息后从自己本地拿到header然后发送BlockHeaderMsg消息,当前节点一直等待知道收到header的消息。

2、找到本地链和远程链的共同祖先:orign, err = d.findAncestor(p, height)。从节点p寻找共同的祖先header。也是发送GetBlockHeadersMsg消息,带的参数是算出来的从某个高度拿某些数量的headers。拿到headers后从最新的header往前for循环,看本地的lightchain是否有这个header,有的话就找到了共同祖先。如果未找到,再从创世快开始二分法查找是否能找到共同的祖先,然后返回这个共同的祖先origin(是header)

3、拿到最新origin后,更新queue从共同祖先开始sync,d.queue.Prepare(origin+1, d.mode)

4、配置fetchers,包括fetchHeaders、fetchBodies、fetchReceipts和processHeaders。如果同步模式是FastSync,则在fetchers后添加processFastSyncContent,如果同步模式是FullSync,则在fetchers后面processFullSyncContent。最后调用spawnSync开始同步。

for _, fn := range fetchers { 
  fn := fn 
  go func() { defer d.cancelWg.Done(); errc <- fn() }() 
}

一、同步Headers:fetchHeaders

fetchHeaders中定义了getHeaders函数来获取Headers:

getHeaders := func(from uint64) {
	request = time.Now()

	ttl = d.requestTTL()
	timeout.Reset(ttl)

	if skeleton {
		p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
		go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
	} else {
		p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
		go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
	}
}

getHeaders(from)

fetchHeaders规定了两种同步方式,默认skeleton为true,表示先获取骨架(间隔的headers),然后再从其他节点填充骨架间的headers。从查找到的共同祖先区块+192个区块位置开始,每隔192个区块,获取128个区块头。非skeleton方式,从共同祖先区块开始,获取192个区块头。

case packet := <-d.headerCh:
	// Make sure the active peer is giving us the skeleton headers
	if packet.PeerId() != p.id {
		log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
		break
	}
	headerReqTimer.UpdateSince(request)
	timeout.Stop()

	// If the skeleton's finished, pull any remaining head headers directly from the origin
	if packet.Items() == 0 && skeleton {
		skeleton = false
		getHeaders(from)
		continue
	}
	// If no more headers are inbound, notify the content fetchers and return
	if packet.Items() == 0 {
		// Don't abort header fetches while the pivot is downloading
		if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
			p.log.Debug("No headers, waiting for pivot commit")
			select {
			case <-time.After(fsHeaderContCheck):
				getHeaders(from)
				continue
			case <-d.cancelCh:
				return errCancelHeaderFetch
			}
		}
		// Pivot done (or not in fast sync) and no more headers, terminate the process
		p.log.Debug("No more headers available")
		select {
		case d.headerProcCh <- nil:
			return nil
		case <-d.cancelCh:
			return errCancelHeaderFetch
		}
	}
	headers := packet.(*headerPack).headers

	// If we received a skeleton batch, resolve internals concurrently
	if skeleton {
		filled, proced, err := d.fillHeaderSkeleton(from, headers)
		if err != nil {
			p.log.Debug("Skeleton chain invalid", "err", err)
			return errInvalidChain
		}
		headers = filled[proced:]
		from += uint64(proced)
	}
	// Insert all the new headers and fetch the next batch
	if len(headers) > 0 {
		p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
		select {
		case d.headerProcCh <- headers:
		case <-d.cancelCh:
			return errCancelHeaderFetch
		}
		from += uint64(len(headers))
	}
	getHeaders(from)

getHeaders(from)执行调用了FakePeer.dl.DeliverHeaders(p.id, headers),将得到的headers写入d.headerCh通道。紧接着fetchHeaders的主循环从headerCh通道中取出headers。 1、首先如果packet.Items() == 0,则表明skeleton已经完成,将skeleton设置为false,将剩余的headers按顺序获取; 2、如果收到了一个skeleton,则调用d.fillHeaderSkeleton(from, headers)从其他节点下载headers进行填充; 3、填充完毕后,headers后写入channel headerProcCh(下面的处理headers中处理),同时把from赋值为新的from,然后进行下一批headers的获取。

case packet := <-d.headerCh:
	// Make sure the active peer is giving us the skeleton headers
	if packet.PeerId() != p.id {
		log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
		break
	}
	headerReqTimer.UpdateSince(request)
	timeout.Stop()

	// If the skeleton's finished, pull any remaining head headers directly from the origin
	if packet.Items() == 0 && skeleton {
		skeleton = false
		getHeaders(from)
		continue
	}
	// If no more headers are inbound, notify the content fetchers and return
	if packet.Items() == 0 {
		// Don't abort header fetches while the pivot is downloading
		if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
			p.log.Debug("No headers, waiting for pivot commit")
			select {
			case <-time.After(fsHeaderContCheck):
				getHeaders(from)
				continue
			case <-d.cancelCh:
				return errCancelHeaderFetch
			}
		}
		// Pivot done (or not in fast sync) and no more headers, terminate the process
		p.log.Debug("No more headers available")
		select {
		case d.headerProcCh <- nil:
			return nil
		case <-d.cancelCh:
			return errCancelHeaderFetch
		}
	}
	headers := packet.(*headerPack).headers

	// If we received a skeleton batch, resolve internals concurrently
	if skeleton {
		filled, proced, err := d.fillHeaderSkeleton(from, headers)
		if err != nil {
			p.log.Debug("Skeleton chain invalid", "err", err)
			return errInvalidChain
		}
		headers = filled[proced:]
		from += uint64(proced)
	}
	// Insert all the new headers and fetch the next batch
	if len(headers) > 0 {
		p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
		select {
		case d.headerProcCh <- headers:
		case <-d.cancelCh:
			return errCancelHeaderFetch
		}
		from += uint64(len(headers))
	}
	getHeaders(from)

channel headerProcCh通道的另一端在processHeaders()中,processHeaders()从通道中取出一部分headers进行处理。 a) 如果是fast或者light sync,每1K个header处理,调用lightchain.InsertHeaderChain()写入header到leveldb数据库; b) 然后如果当前是fast或者full sync模式后,d.queue.Schedule(chunk, origin)赋值blockTaskPool/blockTaskQueue和receiptTaskPool/receiptTaskQueue(only fast 模式下),供后续同步body和同步receipt使用;

func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
    ...
    // Wait for batches of headers to process
    gotHeaders := false

    for {
    	select {
    	...
    	case headers := <-d.headerProcCh:
    		if len(headers) == 0 {
    			...
    			// 通知并终止header processing
    		}
    		// Otherwise split the chunk of headers into batches and process them
    		gotHeaders = true

     		for len(headers) > 0 {
    			// Terminate if something failed in between processing chunks
    			select {
    			case <-d.cancelCh:
					return errCancelHeaderProcessing
    			default:
    			}
    			// Select the next chunk of headers to import
    			limit := maxHeadersProcess
    			if limit > len(headers) {
    				limit = len(headers)
    			}
    			chunk := headers[:limit]

    			// In case of header only syncing, validate the chunk immediately
    			if d.mode == FastSync || d.mode == LightSync {
    				// Collect the yet unknown headers to mark them as uncertain
    				unknown := make([]*types.Header, 0, len(headers))
    				for _, header := range chunk {
    					if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
    						unknown = append(unknown, header)
    					}
    				}
    				// If we're importing pure headers, verify based on their recentness
    				frequency := fsHeaderCheckFrequency
    				if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
    					frequency = 1
    				}
    				if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
    					// If some headers were inserted, add them too to the rollback list
    					if n > 0 {
    						rollback = append(rollback, chunk[:n]...)
    					}
    					log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
    					return errInvalidChain
    				}
    				// All verifications passed, store newly found uncertain headers
    				rollback = append(rollback, unknown...)
    				if len(rollback) > fsHeaderSafetyNet {
    					rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
     				}
    			}
    			// Unless we're doing light chains, schedule the headers for associated content retrieval
    			if d.mode == FullSync || d.mode == FastSync {
    				// If we've reached the allowed number of pending headers, stall a bit
    				for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
    					select {
    					case <-d.cancelCh:
    						return errCancelHeaderProcessing
    					case <-time.After(time.Second):
    					}
    				}
    				// Otherwise insert the headers for content retrieval
    				inserts := d.queue.Schedule(chunk, origin)
    				if len(inserts) != len(chunk) {
    					log.Debug("Stale headers")
    					return errBadPeer
    				}
    			}
    			headers = headers[limit:]
     			origin += uint64(limit)
    		}
		...
    	}
    }
}

二、同步Body:fetchbodies

fetchBodies方法中主要是调用了fetchParts() 1、首先ReserveBodies()从bodyTaskPool中取出要同步的body; 2、调用fetch,也就是调用这里的FetchBodies从节点获取body,发送GetBlockBodiesMsg消息; 3、对端节点处理完成后发回消息BlockBodiesMsg,写入channel bodyCh; 4、收到channel bodyCh的数据后,调用deliver函数,将Transactions和Uncles写入resultCache。

func (d *Downloader) fetchBodies(from uint64) error {
    log.Debug("Downloading block bodies", "origin", from)

    var (
    	deliver = func(packet dataPack) (int, error) {
    		pack := packet.(*bodyPack)
    		return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
    	}
    	expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
    	fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
    	capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
    	setIdle  = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
    )
    err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
    	d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
    	d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")

    log.Debug("Block body download terminated", "err", err)
    return err
}

d.q.DeliverBodies

func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
		if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
			return errInvalidBody
		}
		result.Transactions = txLists[index]
		result.Uncles = uncleLists[index]
		return nil
	}
	return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
}

三、同步Receipts:fetchReceipts

fetchReceipts方法与fetchBodies如出一辙,也是调用了fetchParts() 1、首先ReserveBodies()从ReceiptTaskPool中取出要同步的Receipt; 2、调用fetch,也就是调用这里的FetchReceipts从节点获取receipts,发送GetReceiptsMsg消息; 3、对端节点处理完成后发回消息ReceiptsMsg,写入channel receiptCh; 4、收到channel receiptCh的数据后,调用deliver函数,将Receipts写入resultCache。

func (d *Downloader) fetchReceipts(from uint64) error {
	log.Debug("Downloading transaction receipts", "origin", from)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*receiptPack)
			return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
		}
		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
	)
	err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")

	log.Debug("Transaction receipt download terminated", "err", err)
	return err
}

d.q.DeliverReceipts

func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
		if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
			return errInvalidReceipt
		}
		result.Receipts = receiptList[index]
		return nil
	}
	return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
}

四、同步状态:processFullSyncContent & processFastSyncContent

processFullSyncContent是fullSycn模式下的同步,因为在fullSync模式下Receipts没有缓存到resultCache中,所以这一步逻辑很简单,直接从缓存中取出body数据,然后执行交易生成状态,最后写进区块链即可:

func (d *Downloader) processFullSyncContent() error {
	for {
		results := d.queue.Results(true)
		if len(results) == 0 {
			return nil
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
		if err := d.importBlockResults(results); err != nil {
			return err
		}
	}
}

processFasrSyncContent是fastSync模式下的同步,由于Receipts、Transactions、Uncles都在resultCache中,逻辑上要下载收据然后还要多一步下载“状态”并检验,然后再写进区块链:

1、下载最新区块的状态d.syncState(lastest.Root); 2、从缓存中拿到去处理的数据results; 3、这只pivot为latestHeight - 64,调用splitAroundPivot()方法以pivot为中心,将results分为三个部分:beforeP,P,afterP; 4、对beforeP的部分调用commitFastSyncData,将body和receipt都写入区块链; 5、对P的部分更新状态信息为P block的状态,把P对应的result(包含body和receipt)调用commitPivotBlock插入本地区块链中,并调用FastSyncCommitHead记录这个pivot的hash值,存在downloader中,标记为快速同步的最后一个区块hash值; 6、对afterP调用d.importBlockResults,将body插入区块链,而不插入receipt。因为是最后64个区块,所以此时数据库中只有header和body,没有receipt和状态,要通过fullSync模式进行最后的同步。

func (d *Downloader) processFastSyncContent(latest *types.Header) error {
	// Start syncing state of the reported head block. This should get us most of
	// the state of the pivot block.
	stateSync := d.syncState(latest.Root)
	defer stateSync.Cancel()
	go func() {
		if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
			d.queue.Close() // wake up WaitResults
		}
	}()
	// Figure out the ideal pivot block. Note, that this goalpost may move if the
	// sync takes long enough for the chain head to move significantly.
	pivot := uint64(0)
	if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
		pivot = height - uint64(fsMinFullBlocks)
	}
	// To cater for moving pivot points, track the pivot block and subsequently
	// accumulated download results separately.
	var (
		oldPivot *fetchResult   // Locked in pivot block, might change eventually
		oldTail  []*fetchResult // Downloaded content after the pivot
	)
	for {
		// Wait for the next batch of downloaded data to be available, and if the pivot
		// block became stale, move the goalpost
		results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
		if len(results) == 0 {
			// If pivot sync is done, stop
			if oldPivot == nil {
				return stateSync.Cancel()
			}
			// If sync failed, stop
			select {
			case <-d.cancelCh:
				return stateSync.Cancel()
			default:
			}
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
		if oldPivot != nil {
			results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
		}
		// Split around the pivot block and process the two sides via fast/full sync
		if atomic.LoadInt32(&d.committed) == 0 {
			latest = results[len(results)-1].Header
			if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
				log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
				pivot = height - uint64(fsMinFullBlocks)
			}
		}
		P, beforeP, afterP := splitAroundPivot(pivot, results)
		if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
			return err
		}
		if P != nil {
			// If new pivot block found, cancel old state retrieval and restart
			if oldPivot != P {
				stateSync.Cancel()

				stateSync = d.syncState(P.Header.Root)
				defer stateSync.Cancel()
				go func() {
					if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
						d.queue.Close() // wake up WaitResults
					}
				}()
				oldPivot = P
			}
			// Wait for completion, occasionally checking for pivot staleness
			select {
			case <-stateSync.done:
				if stateSync.err != nil {
					return stateSync.err
				}
				if err := d.commitPivotBlock(P); err != nil {
					return err
				}
				oldPivot = nil

			case <-time.After(time.Second):
				oldTail = afterP
				continue
			}
		}
		// Fast sync done, pivot commit done, full import
		if err := d.importBlockResults(afterP); err != nil {
			return err
		}
	}
}

至此,downloader模块的大致流程就梳理清楚了。但这个模块要比fetcher复杂的多,还有很多细节没有提到,感兴趣的话可以自己去撸源码。

下一章:以太坊源码解读(14)交易池流程

一、交易池的概念和原理 首先了解一下交易池的工作概况: 1、交易池中交易来源:本地的交易和远端节点广播的交易; 2、交易池中交易去向:被Miner模块获取并验证,用于挖矿;挖矿成功后写进区块并被广播; 3 ...