以太坊源码解读(12)广播和同步 Fetcher
Fetcher是基于广播的同步工具,所以Fetcher必然是通过protocolManager的handleMsg()来调用的。下面就是handleMsg中收到NewBlockHashesMsg和NewBlockMsg时,均会调用fetcher的方法来进行相应的处理
case msg.Code == NewBlockHashesMsg: var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } // Mark the hashes as present at the remote node for _, block := range announces { p.MarkBlock(block.Hash) } // Schedule all the unknown hashes for retrieval unknown := make(newBlockHashesData, 0, len(announces)) for _, block := range announces { if !pm.blockchain.HasBlock(block.Hash, block.Number) { unknown = append(unknown, block) } } for _, block := range unknown { pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) } case msg.Code == NewBlockMsg: // Retrieve and decode the propagated block var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } request.Block.ReceivedAt = msg.ReceivedAt request.Block.ReceivedFrom = p // Mark the peer as owning the block and schedule it for import p.MarkBlock(request.Block.Hash()) pm.fetcher.Enqueue(p.id, request.Block) // Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. var ( trueHead = request.Block.ParentHash() trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) ) // Update the peers total difficulty if better than the previous if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD) // Schedule a sync if above ours. Note, this will not fire a sync for a gap of // a singe block (as the true TD is below the propagated block), however this // scenario should easily be covered by the fetcher. currentBlock := pm.blockchain.CurrentBlock() if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { go pm.synchronise(p) } }
Fetcher到底是个什么东西呢?其实它是一个累积块通知的模块,然后对这些通知进行管理和调度,取得完整的block。
type Fetcher struct { notify chan *announce // 收到区块hash值的通道 inject chan *inject // 收到完整区块的通道 blockFilter chan chan []*types.Block headerFilter chan chan *headerFilterTask // 过滤header的通道的通道 bodyFilter chan chan *bodyFilterTask // 过滤body的通道的通道 done chan common.Hash quit chan struct{} // Announce states announces map[string]int // Peer已经给了本节点多少区块头通知 announced map[common.Hash][]*announce // 已经announced的区块列表 fetching map[common.Hash]*announce // 正在fetching区块头的请求 fetched map[common.Hash][]*announce // 已经fetch到区块头,还差body的请求,用来获取body completing map[common.Hash]*announce // 已经得到区块头的 // Block cache queue *prque.Prque // queue,优先级队列,高度做优先级 queues map[string]int // queues,统计peer通告了多少块 queued map[common.Hash]*inject // queued,代表这个块如队列了 // Callbacks getBlock blockRetrievalFn // Retrieves a block from the local chain verifyHeader headerVerifierFn // 验证区块头,包含了PoW验证 broadcastBlock blockBroadcasterFn // 广播给peer chainHeight chainHeightFn // Retrieves the current chain's height insertChain chainInsertFn // 插入区块到链的函数 dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks announceChangeHook func(common.Hash, bool) queueChangeHook func(common.Hash, bool) fetchingHook func([]common.Hash) completingHook func([]common.Hash) importedHook func(*types.Block) }
fetcher收到NewBlockHashesMsg消息后,要向发送消息的节点请求header,如果header中有交易或者叔区块,则继续向该节点请求交易和叔区块,最终生成完整区块。在这个过程中,fetcher处理区块时将区块设定为四种状态:announced、fetching、fetched和completing。
当收到NewBlockHashesMsg消息时,从广播通知里会获取到一个newBlockHashesData的列表。newBlockHashesData只包括block的hash值和block的number值。然后每个newBlockHashesData调用pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)方法,除了传入block的hash值和block的number值,还需要传入当前的时间戳,peer.go的两个函数指针。
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error { block := &announce{ hash: hash, number: number, time: time, origin: peer, fetchHeader: headerFetcher, fetchBodies: bodyFetcher, } select { case f.notify <- block: return nil case <-f.quit: return errTerminated } }
这里,Notify()方法将传入的参数拼成announce对象,然后send给f.notify通道。fetcher的主循环loop()中会从该通道取得这个announce,进行处理。 1、确保节点没有DOS攻击; 2、检查高度是否可用; 3、检查是否已经下载,即已经在fetching或comoleting状态列表中的announce 4、添加到announced中,供后面程序调度。 5、当f.announced==1时,代表有需要处理的announce,则重设定时器,由loop处理进行后续操作。
case notification := <-f.notify: // A block was announced, make sure the peer isn't DOSing us propAnnounceInMeter.Mark(1) count := f.announces[notification.origin] + 1 if count > hashLimit { log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit) propAnnounceDOSMeter.Mark(1) break } // If we have a valid block number, check that it's potentially useful if notification.number > 0 { if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) propAnnounceDropMeter.Mark(1) break } } // All is well, schedule the announce if block's not yet downloading if _, ok := f.fetching[notification.hash]; ok { break } if _, ok := f.completing[notification.hash]; ok { break } f.announces[notification.origin] = count f.announced[notification.hash] = append(f.announced[notification.hash], notification) if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 { f.announceChangeHook(notification.hash, true) } if len(f.announced) == 1 { f.rescheduleFetch(fetchTimer) }
在loop中有另一个case来处理消息通知,这是一个基于定时器的通道: 1、遍历announced中的每个hash的消息,构建request; 2、把所有的request都发送出去,每个peer都有一个协程,并从peer获得应答(fetchHeader(hash));
case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval request := make(map[string][]common.Hash) for hash, announces := range f.announced { if time.Since(announces[0].time) > arriveTimeout-gatherSlack { // Pick a random peer to retrieve from, reset all others announce := announces[rand.Intn(len(announces))] f.forgetHash(hash) // If the block still didn't arrive, queue for fetching if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.fetching[hash] = announce } } } // Send out all block header requests for peer, hashes := range request { log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes) // Create a closure of the fetch and schedule in on a new thread fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes go func() { if f.fetchingHook != nil { f.fetchingHook(hashes) } for _, hash := range hashes { headerFetchMeter.Mark(1) fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals } }() } // Schedule the next fetch if blocks are still pending f.rescheduleFetch(fetchTimer)
这里的fetcherHeader函数实际上是在Notify中传入的,实际上使用的是RequestOneHeader,发送的消息是GetBlockHeadersMsg,用来请求多个区块头。
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) // RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *peer) RequestOneHeader(hash common.Hash) error { p.Log().Debug("Fetching single header", "hash", hash) return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) }
远程节点收到getBlockHeaderMsg后调用pm.blockchain.GetHeaderByHash或者pm.blockchain.GetHeaderByNumber从数据库中取出Header,发送给原来请求的节点,调用SendBlockHeaders(headers),发送BlockHeadersMsg。
BlockHeadersMsg是一个特殊的消息,因为这个消息不止是fetcher要用,downloader也要用。所以这里需要用一个fetcher.FilterHeaders()去把fetcher需要的区块头给过滤下来,剩下的结果直接交给downloader去下载。
case msg.Code == BlockHeadersMsg: // A batch of headers arrived to one of our previous requests var headers []*types.Header if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } ... // Filter out any explicitly requested headers, deliver the rest to the downloader filter := len(headers) == 1 if filter { ... // Irrelevant of the fork checks, send the header to the fetcher just in case headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) if err != nil { log.Debug("Failed to deliver headers", "err", err) } }
FilterHeaders()执行后不但要把需要的区块拿出来放到fetched或者completing中,还要将其余的没有请求过的headers返回再交给downloader。但是这个过程并不是在FilterHeaders()中执行的,而是在fetcher的主loop中执行的。
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { log.Trace("Filtering headers", "peer", peer, "headers", len(headers)) // Send the filter channel to the fetcher filter := make(chan *headerFilterTask) select { case f.headerFilter <- filter: case <-f.quit: return nil } // Request the filtering of the header list select { case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: case <-f.quit: return nil } // Retrieve the headers remaining after filtering select { case task := <-filter: return task.headers case <-f.quit: return nil } }
FilterHeaders()创建了一个filter通道,里面传输headerFilterTask,然后在将filter通道传入f.headerFilter。在fetcher的主循环中,先从headerFilter中取出filter,然后再从filter中取出headerFilterTask,对其进行分类和筛选,然后将过滤掉的未请求的header传入filter,再通过filter传回FilterHeader(),作为函数的返回值。
case filter := <-f.headerFilter: var task *headerFilterTask select { case task = <-filter: case <-f.quit: return } headerFilterInMeter.Mark(int64(len(task.headers))) // unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete还需要获取uncle和交易的区块 unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{} // 遍历所有收到的header for _, header := range task.headers { hash := header.Hash() // 必须是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { // 高度校验 if header.Number.Uint64() != announce.number { log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) f.dropPeer(announce.origin) f.forgetHash(hash) continue } // 本地链没有当前区块方可保存 if f.getBlock(hash) == nil { announce.header = header announce.time = task.time // 如果区块没有交易和uncle,加入到complete if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) block := types.NewBlockWithHeader(header) block.ReceivedAt = task.time complete = append(complete, block) f.completing[hash] = announce continue } // 否则就是不完整的区块 incomplete = append(incomplete, announce) } else { log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash()) f.forgetHash(hash) } } else { // 没请求过的header unknown = append(unknown, header) } } // 把未知的区块头,再传递会filter headerFilterOutMeter.Mark(int64(len(unknown))) select { case filter <- &headerFilterTask{headers: unknown, time: task.time}: case <-f.quit: return } // 把未完整的区块加入到fetched,跳过已经在completeing中的,然后触发completeTimer定时器 for _, announce := range incomplete { hash := announce.header.Hash() if _, ok := f.completing[hash]; ok { continue } f.fetched[hash] = append(f.fetched[hash], announce) if len(f.fetched) == 1 { f.rescheduleComplete(completeTimer) } } // 把只有头的区块入队列 for _, block := range complete { if announce := f.completing[block.Hash()]; announce != nil { f.enqueue(announce.origin, block) } }
上面fetcher主循环里主要做了以下几件事:
1、从f.headerFilter取出filter,然后取出过滤任务task。 2、它把区块头分成3类:unknown这不是分是要返回给调用者的,即handleMsg(), incomplete存放还需要获取body的区块头,complete存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中。 3、把unknonw中的区块返回给handleMsg()。 4、把incomplete的区块头获取状态移动到fetched状态,然后触发completeTimer定时器,以便去处理complete的区块。 5、把compelete的区块加入到queued。
到这里从fetched状态到completing状态的转变类似于上面从announced到fetching状态,就是通过completeTimer定时器的相关逻辑进行处理,流程类似于fetchTmer,即先转变状态,然后构建request。这里实际上调用了RequestBodies函数,发出了一个GetBlockBodiesMsg,从远程节点获得一个BlockBodiesMsg。
case <-completeTimer.C: // 至少有1个header已经获取完了 request := make(map[string][]common.Hash) // 遍历所有待获取body的announce for hash, announces := range f.fetched { // Pick a random peer to retrieve from, reset all others // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了 announce := announces[rand.Intn(len(announces))] f.forgetHash(hash) // 如果本地没有这个区块,则放入到completing,创建请求 if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.completing[hash] = announce } } // 发送所有的请求,获取body,依然是每个peer一个单独协程 for peer, hashes := range request { log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes) if f.completingHook != nil { f.completingHook(hashes) } bodyFetchMeter.Mark(int64(len(hashes))) go f.completing[hashes[0]].fetchBodies(hashes) } f.rescheduleComplete(completeTimer)
响应BlockBodiesMsg的处理与headers类似,需要有一个FilterBodies来过滤出fetcher请求的bodies,剩余的交给downloader。所以这部分就不在赘述。主要来看一下fetcher主循环处理bodies过滤的逻辑:
1、它要的区块,单独取出来存到blocks中,它不要的继续留在task中。 2、判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。 3、将blocks中的区块加入到queued。
case filter := <-f.bodyFilter: var task *bodyFilterTask select { case task = <-filter: case <-f.quit: return } bodyFilterInMeter.Mark(int64(len(task.transactions))) blocks := []*types.Block{} // 获取的每个body的txs列表和uncle列表 // 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { matched := false // 遍历所有保存的请求,因为tx和uncle,不知道它是属于哪个区块的,只能去遍历所有的请求 for hash, announce := range f.completing { if f.queued[hash] == nil { // 把传入的每个块的hash和unclehash和它请求出去的记录进行对比,匹配则说明是fetcher请求的区块body txnHash := types.DeriveSha(types.Transactions(task.transactions[i])) uncleHash := types.CalcUncleHash(task.uncles[i]) if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer { // Mark the body matched, reassemble if still unknown matched = true // 如果当前链还没有这个区块,则收集这个区块,合并成新区块 if f.getBlock(hash) == nil { block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i]) block.ReceivedAt = task.time blocks = append(blocks, block) } else { f.forgetHash(hash) } } } } // 从task中移除fetcher请求的数据 if matched { task.transactions = append(task.transactions[:i], task.transactions[i+1:]...) task.uncles = append(task.uncles[:i], task.uncles[i+1:]...) i-- continue } } // 将剩余的数据返回 bodyFilterOutMeter.Mark(int64(len(task.transactions))) select { case filter <- task: case <-f.quit: return } // 把收集的区块加入到队列 for _, block := range blocks { if announce := f.completing[block.Hash()]; announce != nil { f.enqueue(announce.origin, block) } } }
至此,fetcher获取完整区块的流程讲完了,最后一步就是讲queued中的区块插入区块链。
1、调用共识引擎的方法f.verifyHeader(block.Header()),验证blockHeader的有效性。 2、如果没问题就广播出去,告诉全世界我的区块链更新了一个新区块。 3、调用f.insertChain(types.Blocks{block}) 插入本地区块链。
// insert spawns a new goroutine to run a block insertion into the chain. If the // block's number is at the same height as the current import phase, it updates // the phase states accordingly. func (f *Fetcher) insert(peer string, block *types.Block) { hash := block.Hash() // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { defer func() { f.done <- hash }() // If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } // Quickly validate the header and propagate the block if it passes switch err := f.verifyHeader(block.Header()); err { case nil: // All ok, quickly propagate to our peers propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true) case consensus.ErrFutureBlock: // Weird future block, don't fail, but neither propagate default: // Something went very wrong, drop the peer log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) f.dropPeer(peer) return } // Run the actual import and log any issues if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } // If import succeeded, broadcast the block propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false) // Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(block) } }() }
下一章:以太坊源码解读(13)广播和同步 Downloader
protocolManager启动了四个go程,其中包含用于定期同步的协程:syncer,该协程调用protocolManager的synchronise方法调用,选择td最高的节点进 ...