以太坊源码解读(13)广播和同步 Downloader
protocolManager启动了四个go程,其中包含用于定期同步的协程:syncer,该协程调用protocolManager的synchronise方法调用,选择td最高的节点进行同步:
go pm.synchronise(pm.peers.BestPeer())
synchronise获取了节点的头区块和难度值td,确定了同步模式(fastSync或fullSync),最终调用downloader的Synchronise()方法:
pm.downloader.Synchronise(peer.id, pHead, pTd, mode)
Synchronise进一步执行downloader.synchronise,该函数主要做了以下几件事: 1、确保只有一个同步goroutine在执行:atomic.CompareAndSwapInt32(&d.synchronising, 0, 1); 2、重置queue和peers,分别是同步的hash队列和下载区块的节点; 3、设置同步模式:d.mode = mode; 4、初始化同步进程:d.syncWithPeer(p, hash, td);
syncWithPeer开启了区块的同步,其同步方式是基于hash chain:
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error if err != nil { d.mux.Post(FailedEvent{err}) } else { d.mux.Post(DoneEvent{}) } }() if p.version < 62 { return errTooOld } log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block latest, err := d.fetchHeight(p) if err != nil { return err } height := latest.Number.Uint64() origin, err := d.findAncestor(p, height) if err != nil { return err } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { d.syncStatsChainOrigin = origin } d.syncStatsChainHeight = height d.syncStatsLock.Unlock() // Ensure our origin point is below any fast sync pivot point pivot := uint64(0) if d.mode == FastSync { if height <= uint64(fsMinFullBlocks) { origin = 0 } else { pivot = height - uint64(fsMinFullBlocks) if pivot <= origin { origin = pivot - 1 } } } d.committed = 1 if d.mode == FastSync && pivot != 0 { d.committed = 0 } // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { d.syncInitHook(origin, height) } fetchers := []func() error{ func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync func() error { return d.processHeaders(origin+1, pivot, td) }, } if d.mode == FastSync { fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) } else if d.mode == FullSync { fetchers = append(fetchers, d.processFullSyncContent) } return d.spawnSync(fetchers) }
1、首先获取远程节点peer的最新区块头:lastest,err = d.fetchHeight(p)。从节点p根据head hash(建立节点handshake的时候节点p传过来的最新的head hash)获取header,发送GetBlockHeadersMsg消息,节点p收到消息后从自己本地拿到header然后发送BlockHeaderMsg消息,当前节点一直等待知道收到header的消息。
2、找到本地链和远程链的共同祖先:orign, err = d.findAncestor(p, height)。从节点p寻找共同的祖先header。也是发送GetBlockHeadersMsg消息,带的参数是算出来的从某个高度拿某些数量的headers。拿到headers后从最新的header往前for循环,看本地的lightchain是否有这个header,有的话就找到了共同祖先。如果未找到,再从创世快开始二分法查找是否能找到共同的祖先,然后返回这个共同的祖先origin(是header)
3、拿到最新origin后,更新queue从共同祖先开始sync,d.queue.Prepare(origin+1, d.mode)
4、配置fetchers,包括fetchHeaders、fetchBodies、fetchReceipts和processHeaders。如果同步模式是FastSync,则在fetchers后添加processFastSyncContent,如果同步模式是FullSync,则在fetchers后面processFullSyncContent。最后调用spawnSync开始同步。
for _, fn := range fetchers { fn := fn go func() { defer d.cancelWg.Done(); errc <- fn() }() }
一、同步Headers:fetchHeaders
fetchHeaders中定义了getHeaders函数来获取Headers:
getHeaders := func(from uint64) { request = time.Now() ttl = d.requestTTL() timeout.Reset(ttl) if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) } } getHeaders(from)
fetchHeaders规定了两种同步方式,默认skeleton为true,表示先获取骨架(间隔的headers),然后再从其他节点填充骨架间的headers。从查找到的共同祖先区块+192个区块位置开始,每隔192个区块,获取128个区块头。非skeleton方式,从共同祖先区块开始,获取192个区块头。
case packet := <-d.headerCh: // Make sure the active peer is giving us the skeleton headers if packet.PeerId() != p.id { log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) break } headerReqTimer.UpdateSince(request) timeout.Stop() // If the skeleton's finished, pull any remaining head headers directly from the origin if packet.Items() == 0 && skeleton { skeleton = false getHeaders(from) continue } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { // Don't abort header fetches while the pivot is downloading if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCancelHeaderFetch } } // Pivot done (or not in fast sync) and no more headers, terminate the process p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: return nil case <-d.cancelCh: return errCancelHeaderFetch } } headers := packet.(*headerPack).headers // If we received a skeleton batch, resolve internals concurrently if skeleton { filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) return errInvalidChain } headers = filled[proced:] from += uint64(proced) } // Insert all the new headers and fetch the next batch if len(headers) > 0 { p.log.Trace("Scheduling new headers", "count", len(headers), "from", from) select { case d.headerProcCh <- headers: case <-d.cancelCh: return errCancelHeaderFetch } from += uint64(len(headers)) } getHeaders(from)
getHeaders(from)执行调用了FakePeer.dl.DeliverHeaders(p.id, headers),将得到的headers写入d.headerCh通道。紧接着fetchHeaders的主循环从headerCh通道中取出headers。 1、首先如果packet.Items() == 0,则表明skeleton已经完成,将skeleton设置为false,将剩余的headers按顺序获取; 2、如果收到了一个skeleton,则调用d.fillHeaderSkeleton(from, headers)从其他节点下载headers进行填充; 3、填充完毕后,headers后写入channel headerProcCh(下面的处理headers中处理),同时把from赋值为新的from,然后进行下一批headers的获取。
case packet := <-d.headerCh: // Make sure the active peer is giving us the skeleton headers if packet.PeerId() != p.id { log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) break } headerReqTimer.UpdateSince(request) timeout.Stop() // If the skeleton's finished, pull any remaining head headers directly from the origin if packet.Items() == 0 && skeleton { skeleton = false getHeaders(from) continue } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { // Don't abort header fetches while the pivot is downloading if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") select { case <-time.After(fsHeaderContCheck): getHeaders(from) continue case <-d.cancelCh: return errCancelHeaderFetch } } // Pivot done (or not in fast sync) and no more headers, terminate the process p.log.Debug("No more headers available") select { case d.headerProcCh <- nil: return nil case <-d.cancelCh: return errCancelHeaderFetch } } headers := packet.(*headerPack).headers // If we received a skeleton batch, resolve internals concurrently if skeleton { filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) return errInvalidChain } headers = filled[proced:] from += uint64(proced) } // Insert all the new headers and fetch the next batch if len(headers) > 0 { p.log.Trace("Scheduling new headers", "count", len(headers), "from", from) select { case d.headerProcCh <- headers: case <-d.cancelCh: return errCancelHeaderFetch } from += uint64(len(headers)) } getHeaders(from)
channel headerProcCh通道的另一端在processHeaders()中,processHeaders()从通道中取出一部分headers进行处理。 a) 如果是fast或者light sync,每1K个header处理,调用lightchain.InsertHeaderChain()写入header到leveldb数据库; b) 然后如果当前是fast或者full sync模式后,d.queue.Schedule(chunk, origin)赋值blockTaskPool/blockTaskQueue和receiptTaskPool/receiptTaskQueue(only fast 模式下),供后续同步body和同步receipt使用;
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { ... // Wait for batches of headers to process gotHeaders := false for { select { ... case headers := <-d.headerProcCh: if len(headers) == 0 { ... // 通知并终止header processing } // Otherwise split the chunk of headers into batches and process them gotHeaders = true for len(headers) > 0 { // Terminate if something failed in between processing chunks select { case <-d.cancelCh: return errCancelHeaderProcessing default: } // Select the next chunk of headers to import limit := maxHeadersProcess if limit > len(headers) { limit = len(headers) } chunk := headers[:limit] // In case of header only syncing, validate the chunk immediately if d.mode == FastSync || d.mode == LightSync { // Collect the yet unknown headers to mark them as uncertain unknown := make([]*types.Header, 0, len(headers)) for _, header := range chunk { if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { unknown = append(unknown, header) } } // If we're importing pure headers, verify based on their recentness frequency := fsHeaderCheckFrequency if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { frequency = 1 } if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { rollback = append(rollback, chunk[:n]...) } log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) return errInvalidChain } // All verifications passed, store newly found uncertain headers rollback = append(rollback, unknown...) if len(rollback) > fsHeaderSafetyNet { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { // If we've reached the allowed number of pending headers, stall a bit for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { select { case <-d.cancelCh: return errCancelHeaderProcessing case <-time.After(time.Second): } } // Otherwise insert the headers for content retrieval inserts := d.queue.Schedule(chunk, origin) if len(inserts) != len(chunk) { log.Debug("Stale headers") return errBadPeer } } headers = headers[limit:] origin += uint64(limit) } ... } } }
二、同步Body:fetchbodies
fetchBodies方法中主要是调用了fetchParts() 1、首先ReserveBodies()从bodyTaskPool中取出要同步的body; 2、调用fetch,也就是调用这里的FetchBodies从节点获取body,发送GetBlockBodiesMsg消息; 3、对端节点处理完成后发回消息BlockBodiesMsg,写入channel bodyCh; 4、收到channel bodyCh的数据后,调用deliver函数,将Transactions和Uncles写入resultCache。
func (d *Downloader) fetchBodies(from uint64) error { log.Debug("Downloading block bodies", "origin", from) var ( deliver = func(packet dataPack) (int, error) { pack := packet.(*bodyPack) return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles) } expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) } capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies") log.Debug("Block body download terminated", "err", err) return err }
d.q.DeliverBodies
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) { q.lock.Lock() defer q.lock.Unlock() reconstruct := func(header *types.Header, index int, result *fetchResult) error { if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash { return errInvalidBody } result.Transactions = txLists[index] result.Uncles = uncleLists[index] return nil } return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct) }
三、同步Receipts:fetchReceipts
fetchReceipts方法与fetchBodies如出一辙,也是调用了fetchParts() 1、首先ReserveBodies()从ReceiptTaskPool中取出要同步的Receipt; 2、调用fetch,也就是调用这里的FetchReceipts从节点获取receipts,发送GetReceiptsMsg消息; 3、对端节点处理完成后发回消息ReceiptsMsg,写入channel receiptCh; 4、收到channel receiptCh的数据后,调用deliver函数,将Receipts写入resultCache。
func (d *Downloader) fetchReceipts(from uint64) error { log.Debug("Downloading transaction receipts", "origin", from) var ( deliver = func(packet dataPack) (int, error) { pack := packet.(*receiptPack) return d.queue.DeliverReceipts(pack.peerID, pack.receipts) } expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) } capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts") log.Debug("Transaction receipt download terminated", "err", err) return err }
d.q.DeliverReceipts
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) { q.lock.Lock() defer q.lock.Unlock() reconstruct := func(header *types.Header, index int, result *fetchResult) error { if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash { return errInvalidReceipt } result.Receipts = receiptList[index] return nil } return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct) }
四、同步状态:processFullSyncContent & processFastSyncContent
processFullSyncContent是fullSycn模式下的同步,因为在fullSync模式下Receipts没有缓存到resultCache中,所以这一步逻辑很简单,直接从缓存中取出body数据,然后执行交易生成状态,最后写进区块链即可:
func (d *Downloader) processFullSyncContent() error { for { results := d.queue.Results(true) if len(results) == 0 { return nil } if d.chainInsertHook != nil { d.chainInsertHook(results) } if err := d.importBlockResults(results); err != nil { return err } } }
processFasrSyncContent是fastSync模式下的同步,由于Receipts、Transactions、Uncles都在resultCache中,逻辑上要下载收据然后还要多一步下载“状态”并检验,然后再写进区块链:
1、下载最新区块的状态d.syncState(lastest.Root); 2、从缓存中拿到去处理的数据results; 3、这只pivot为latestHeight - 64,调用splitAroundPivot()方法以pivot为中心,将results分为三个部分:beforeP,P,afterP; 4、对beforeP的部分调用commitFastSyncData,将body和receipt都写入区块链; 5、对P的部分更新状态信息为P block的状态,把P对应的result(包含body和receipt)调用commitPivotBlock插入本地区块链中,并调用FastSyncCommitHead记录这个pivot的hash值,存在downloader中,标记为快速同步的最后一个区块hash值; 6、对afterP调用d.importBlockResults,将body插入区块链,而不插入receipt。因为是最后64个区块,所以此时数据库中只有header和body,没有receipt和状态,要通过fullSync模式进行最后的同步。
func (d *Downloader) processFastSyncContent(latest *types.Header) error { // Start syncing state of the reported head block. This should get us most of // the state of the pivot block. stateSync := d.syncState(latest.Root) defer stateSync.Cancel() go func() { if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { d.queue.Close() // wake up WaitResults } }() // Figure out the ideal pivot block. Note, that this goalpost may move if the // sync takes long enough for the chain head to move significantly. pivot := uint64(0) if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { pivot = height - uint64(fsMinFullBlocks) } // To cater for moving pivot points, track the pivot block and subsequently // accumulated download results separately. var ( oldPivot *fetchResult // Locked in pivot block, might change eventually oldTail []*fetchResult // Downloaded content after the pivot ) for { // Wait for the next batch of downloaded data to be available, and if the pivot // block became stale, move the goalpost results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness if len(results) == 0 { // If pivot sync is done, stop if oldPivot == nil { return stateSync.Cancel() } // If sync failed, stop select { case <-d.cancelCh: return stateSync.Cancel() default: } } if d.chainInsertHook != nil { d.chainInsertHook(results) } if oldPivot != nil { results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) } // Split around the pivot block and process the two sides via fast/full sync if atomic.LoadInt32(&d.committed) == 0 { latest = results[len(results)-1].Header if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) pivot = height - uint64(fsMinFullBlocks) } } P, beforeP, afterP := splitAroundPivot(pivot, results) if err := d.commitFastSyncData(beforeP, stateSync); err != nil { return err } if P != nil { // If new pivot block found, cancel old state retrieval and restart if oldPivot != P { stateSync.Cancel() stateSync = d.syncState(P.Header.Root) defer stateSync.Cancel() go func() { if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { d.queue.Close() // wake up WaitResults } }() oldPivot = P } // Wait for completion, occasionally checking for pivot staleness select { case <-stateSync.done: if stateSync.err != nil { return stateSync.err } if err := d.commitPivotBlock(P); err != nil { return err } oldPivot = nil case <-time.After(time.Second): oldTail = afterP continue } } // Fast sync done, pivot commit done, full import if err := d.importBlockResults(afterP); err != nil { return err } } }
至此,downloader模块的大致流程就梳理清楚了。但这个模块要比fetcher复杂的多,还有很多细节没有提到,感兴趣的话可以自己去撸源码。
下一章:以太坊源码解读(14)交易池流程
一、交易池的概念和原理 首先了解一下交易池的工作概况: 1、交易池中交易来源:本地的交易和远端节点广播的交易; 2、交易池中交易去向:被Miner模块获取并验证,用于挖矿;挖矿成功后写进区块并被广播; 3 ...