以太坊源码解读(12)广播和同步 Fetcher

Fetcher是基于广播的同步工具,所以Fetcher必然是通过protocolManager的handleMsg()来调用的。下面就是handleMsg中收到NewBlockHashesMsg和NewBlockMsg时,均会调用fetcher的方法来进行相应的处理

	case msg.Code == NewBlockHashesMsg:
		var announces newBlockHashesData
		if err := msg.Decode(&announces); err != nil {
			return errResp(ErrDecode, "%v: %v", msg, err)
		}
		// Mark the hashes as present at the remote node
		for _, block := range announces {
			p.MarkBlock(block.Hash)
		}
		// Schedule all the unknown hashes for retrieval
		unknown := make(newBlockHashesData, 0, len(announces))
		for _, block := range announces {
			if !pm.blockchain.HasBlock(block.Hash, block.Number) {
				unknown = append(unknown, block)
			}
		}
		for _, block := range unknown {
			pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
		}

	case msg.Code == NewBlockMsg:
		// Retrieve and decode the propagated block
		var request newBlockData
		if err := msg.Decode(&request); err != nil {
			return errResp(ErrDecode, "%v: %v", msg, err)
		}
		request.Block.ReceivedAt = msg.ReceivedAt
		request.Block.ReceivedFrom = p

		// Mark the peer as owning the block and schedule it for import
		p.MarkBlock(request.Block.Hash())
		pm.fetcher.Enqueue(p.id, request.Block)

		// Assuming the block is importable by the peer, but possibly not yet done so,
		// calculate the head hash and TD that the peer truly must have.
		var (
			trueHead = request.Block.ParentHash()
			trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
		)
		// Update the peers total difficulty if better than the previous
		if _, td := p.Head(); trueTD.Cmp(td) > 0 {
			p.SetHead(trueHead, trueTD)

			// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
			// a singe block (as the true TD is below the propagated block), however this
			// scenario should easily be covered by the fetcher.
			currentBlock := pm.blockchain.CurrentBlock()
			if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
				go pm.synchronise(p)
			}
		}

Fetcher到底是个什么东西呢?其实它是一个累积块通知的模块,然后对这些通知进行管理和调度,取得完整的block。

type Fetcher struct {

    notify chan *announce   // 收到区块hash值的通道
    inject chan *inject    // 收到完整区块的通道

    blockFilter chan chan []*types.Block
    headerFilter chan chan *headerFilterTask    // 过滤header的通道的通道
    bodyFilter chan chan *bodyFilterTask    // 过滤body的通道的通道

    done chan common.Hash
    quit chan struct{}

    // Announce states
    announces map[string]int    // Peer已经给了本节点多少区块头通知
    announced map[common.Hash][]*announce    // 已经announced的区块列表
    fetching map[common.Hash]*announce     // 正在fetching区块头的请求
    fetched map[common.Hash][]*announce     // 已经fetch到区块头,还差body的请求,用来获取body
    completing map[common.Hash]*announce    // 已经得到区块头的

    // Block cache
    queue  *prque.Prque               // queue,优先级队列,高度做优先级
    queues map[string]int              // queues,统计peer通告了多少块
    queued map[common.Hash]*inject     // queued,代表这个块如队列了

    // Callbacks
    getBlock       blockRetrievalFn   // Retrieves a block from the local chain
    verifyHeader   headerVerifierFn   // 验证区块头,包含了PoW验证
    broadcastBlock blockBroadcasterFn // 广播给peer
    chainHeight    chainHeightFn      // Retrieves the current chain's height
    insertChain    chainInsertFn      // 插入区块到链的函数
    dropPeer       peerDropFn         // Drops a peer for misbehaving

    // Testing hooks
    announceChangeHook func(common.Hash, bool) 
    queueChangeHook    func(common.Hash, bool) 
    fetchingHook       func([]common.Hash)     
    completingHook     func([]common.Hash)     
    importedHook       func(*types.Block)      
}

fetcher收到NewBlockHashesMsg消息后,要向发送消息的节点请求header,如果header中有交易或者叔区块,则继续向该节点请求交易和叔区块,最终生成完整区块。在这个过程中,fetcher处理区块时将区块设定为四种状态:announced、fetching、fetched和completing。

当收到NewBlockHashesMsg消息时,从广播通知里会获取到一个newBlockHashesData的列表。newBlockHashesData只包括block的hash值和block的number值。然后每个newBlockHashesData调用pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)方法,除了传入block的hash值和block的number值,还需要传入当前的时间戳,peer.go的两个函数指针。

func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
	headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
	block := &announce{
		hash:        hash,
		number:      number,
		time:        time,
		origin:      peer,
		fetchHeader: headerFetcher,
		fetchBodies: bodyFetcher,
	}
	select {
	case f.notify <- block:
		return nil
	case <-f.quit:
		return errTerminated
	}
}

这里,Notify()方法将传入的参数拼成announce对象,然后send给f.notify通道。fetcher的主循环loop()中会从该通道取得这个announce,进行处理。 1、确保节点没有DOS攻击; 2、检查高度是否可用; 3、检查是否已经下载,即已经在fetching或comoleting状态列表中的announce 4、添加到announced中,供后面程序调度。 5、当f.announced==1时,代表有需要处理的announce,则重设定时器,由loop处理进行后续操作。

case notification := <-f.notify:
	// A block was announced, make sure the peer isn't DOSing us
	propAnnounceInMeter.Mark(1)

	count := f.announces[notification.origin] + 1
	if count > hashLimit {
		log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
		propAnnounceDOSMeter.Mark(1)
		break
	}
	// If we have a valid block number, check that it's potentially useful
	if notification.number > 0 {
		if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
			log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
			propAnnounceDropMeter.Mark(1)
			break
		}
	}
	// All is well, schedule the announce if block's not yet downloading
	if _, ok := f.fetching[notification.hash]; ok {
		break
	}
	if _, ok := f.completing[notification.hash]; ok {
		break
	}
	f.announces[notification.origin] = count
	f.announced[notification.hash] = append(f.announced[notification.hash], notification)
	if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
		f.announceChangeHook(notification.hash, true)
	}
	if len(f.announced) == 1 {
		f.rescheduleFetch(fetchTimer)
	}

在loop中有另一个case来处理消息通知,这是一个基于定时器的通道: 1、遍历announced中的每个hash的消息,构建request; 2、把所有的request都发送出去,每个peer都有一个协程,并从peer获得应答(fetchHeader(hash));

case <-fetchTimer.C:
	// At least one block's timer ran out, check for needing retrieval
	request := make(map[string][]common.Hash)

	for hash, announces := range f.announced {
		if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
			// Pick a random peer to retrieve from, reset all others
			announce := announces[rand.Intn(len(announces))]
			f.forgetHash(hash)

			// If the block still didn't arrive, queue for fetching
			if f.getBlock(hash) == nil {
				request[announce.origin] = append(request[announce.origin], hash)
				f.fetching[hash] = announce
			}
		}
	}
	// Send out all block header requests
	for peer, hashes := range request {
		log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)

		// Create a closure of the fetch and schedule in on a new thread
		fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
		go func() {
			if f.fetchingHook != nil {
				f.fetchingHook(hashes)
			}
			for _, hash := range hashes {
				headerFetchMeter.Mark(1)
				fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
			}
		}()
	}
	// Schedule the next fetch if blocks are still pending
	f.rescheduleFetch(fetchTimer)

这里的fetcherHeader函数实际上是在Notify中传入的,实际上使用的是RequestOneHeader,发送的消息是GetBlockHeadersMsg,用来请求多个区块头。

pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)

// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {
    p.Log().Debug("Fetching single header", "hash", hash)
    return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
}

远程节点收到getBlockHeaderMsg后调用pm.blockchain.GetHeaderByHash或者pm.blockchain.GetHeaderByNumber从数据库中取出Header,发送给原来请求的节点,调用SendBlockHeaders(headers),发送BlockHeadersMsg。

BlockHeadersMsg是一个特殊的消息,因为这个消息不止是fetcher要用,downloader也要用。所以这里需要用一个fetcher.FilterHeaders()去把fetcher需要的区块头给过滤下来,剩下的结果直接交给downloader去下载。

case msg.Code == BlockHeadersMsg:
	// A batch of headers arrived to one of our previous requests
	var headers []*types.Header
	if err := msg.Decode(&headers); err != nil {
		return errResp(ErrDecode, "msg %v: %v", msg, err)
	}
	... 
	// Filter out any explicitly requested headers, deliver the rest to the downloader
	filter := len(headers) == 1
	if filter {
        ... 
		// Irrelevant of the fork checks, send the header to the fetcher just in case
		headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
	}
	if len(headers) > 0 || !filter {
		err := pm.downloader.DeliverHeaders(p.id, headers)
		if err != nil {
			log.Debug("Failed to deliver headers", "err", err)
		}
	}

FilterHeaders()执行后不但要把需要的区块拿出来放到fetched或者completing中,还要将其余的没有请求过的headers返回再交给downloader。但是这个过程并不是在FilterHeaders()中执行的,而是在fetcher的主loop中执行的。

func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
	log.Trace("Filtering headers", "peer", peer, "headers", len(headers))

	// Send the filter channel to the fetcher
	filter := make(chan *headerFilterTask)

	select {
	case f.headerFilter <- filter:
	case <-f.quit:
		return nil
	}
	// Request the filtering of the header list
	select {
	case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
	case <-f.quit:
		return nil
	}
	// Retrieve the headers remaining after filtering
	select {
	case task := <-filter:
		return task.headers
	case <-f.quit:
		return nil
	}
}

FilterHeaders()创建了一个filter通道,里面传输headerFilterTask,然后在将filter通道传入f.headerFilter。在fetcher的主循环中,先从headerFilter中取出filter,然后再从filter中取出headerFilterTask,对其进行分类和筛选,然后将过滤掉的未请求的header传入filter,再通过filter传回FilterHeader(),作为函数的返回值。

case filter := <-f.headerFilter:

    var task *headerFilterTask
    select {
    case task = <-filter:
    case <-f.quit:
        return
    }
    headerFilterInMeter.Mark(int64(len(task.headers)))

    // unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete还需要获取uncle和交易的区块
    unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
    // 遍历所有收到的header
    for _, header := range task.headers {
        hash := header.Hash()

        // 必须是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued
        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {

            // 高度校验
            if header.Number.Uint64() != announce.number {
                log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
                f.dropPeer(announce.origin)
                f.forgetHash(hash)
                continue
            }
            // 本地链没有当前区块方可保存
            if f.getBlock(hash) == nil {
                announce.header = header
                announce.time = task.time

                // 如果区块没有交易和uncle,加入到complete
                if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
                    log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())

                    block := types.NewBlockWithHeader(header)
                    block.ReceivedAt = task.time

                    complete = append(complete, block)
                    f.completing[hash] = announce
                    continue
                }
                // 否则就是不完整的区块
                incomplete = append(incomplete, announce)
            } else {
                log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
                f.forgetHash(hash)
            }
        } else {
            // 没请求过的header
            unknown = append(unknown, header)
        }
    }
    // 把未知的区块头,再传递会filter
    headerFilterOutMeter.Mark(int64(len(unknown)))
    select {
    case filter <- &headerFilterTask{headers: unknown, time: task.time}:
    case <-f.quit:
        return
    }
    // 把未完整的区块加入到fetched,跳过已经在completeing中的,然后触发completeTimer定时器
    for _, announce := range incomplete {
        hash := announce.header.Hash()
        if _, ok := f.completing[hash]; ok {
            continue
        }
        f.fetched[hash] = append(f.fetched[hash], announce)
        if len(f.fetched) == 1 {
            f.rescheduleComplete(completeTimer)
        }
    }

    // 把只有头的区块入队列
    for _, block := range complete {
        if announce := f.completing[block.Hash()]; announce != nil {
            f.enqueue(announce.origin, block)
        }
    }

上面fetcher主循环里主要做了以下几件事:

1、从f.headerFilter取出filter,然后取出过滤任务task。 2、它把区块头分成3类:unknown这不是分是要返回给调用者的,即handleMsg(), incomplete存放还需要获取body的区块头,complete存放只包含区块头的区块。遍历所有的区块头,填到到对应的分类中。 3、把unknonw中的区块返回给handleMsg()。 4、把incomplete的区块头获取状态移动到fetched状态,然后触发completeTimer定时器,以便去处理complete的区块。 5、把compelete的区块加入到queued。

到这里从fetched状态到completing状态的转变类似于上面从announced到fetching状态,就是通过completeTimer定时器的相关逻辑进行处理,流程类似于fetchTmer,即先转变状态,然后构建request。这里实际上调用了RequestBodies函数,发出了一个GetBlockBodiesMsg,从远程节点获得一个BlockBodiesMsg。

case <-completeTimer.C:
    // 至少有1个header已经获取完了
    request := make(map[string][]common.Hash)

    // 遍历所有待获取body的announce
    for hash, announces := range f.fetched {
        // Pick a random peer to retrieve from, reset all others
        // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了
        announce := announces[rand.Intn(len(announces))]
        f.forgetHash(hash)

        // 如果本地没有这个区块,则放入到completing,创建请求
        if f.getBlock(hash) == nil {
            request[announce.origin] = append(request[announce.origin], hash)
            f.completing[hash] = announce
        }
    }

    // 发送所有的请求,获取body,依然是每个peer一个单独协程
    for peer, hashes := range request {
        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)

        if f.completingHook != nil {
            f.completingHook(hashes)
        }
        bodyFetchMeter.Mark(int64(len(hashes)))
        go f.completing[hashes[0]].fetchBodies(hashes)
    }
    f.rescheduleComplete(completeTimer)

响应BlockBodiesMsg的处理与headers类似,需要有一个FilterBodies来过滤出fetcher请求的bodies,剩余的交给downloader。所以这部分就不在赘述。主要来看一下fetcher主循环处理bodies过滤的逻辑:

1、它要的区块,单独取出来存到blocks中,它不要的继续留在task中。 2、判断是不是fetcher请求的方法:如果交易列表和叔块列表计算出的hash值与区块头中的一样,并且消息来自请求的Peer,则就是fetcher请求的。 3、将blocks中的区块加入到queued。

case filter := <-f.bodyFilter:
    var task *bodyFilterTask
    select {
    case task = <-filter:
    case <-f.quit:
        return
    }
    bodyFilterInMeter.Mark(int64(len(task.transactions)))

    blocks := []*types.Block{}
    // 获取的每个body的txs列表和uncle列表
    // 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body
    for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
        matched := false

        // 遍历所有保存的请求,因为tx和uncle,不知道它是属于哪个区块的,只能去遍历所有的请求
        for hash, announce := range f.completing {
            if f.queued[hash] == nil {
                // 把传入的每个块的hash和unclehash和它请求出去的记录进行对比,匹配则说明是fetcher请求的区块body
                txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
                uncleHash := types.CalcUncleHash(task.uncles[i])

                if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
                    // Mark the body matched, reassemble if still unknown
                    matched = true

                    // 如果当前链还没有这个区块,则收集这个区块,合并成新区块
                    if f.getBlock(hash) == nil {
                        block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
                        block.ReceivedAt = task.time

                        blocks = append(blocks, block)
                    } else {
                        f.forgetHash(hash)
                    }
                }
            }
        }
        // 从task中移除fetcher请求的数据
        if matched {
            task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
            task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
            i--
            continue
        }
    }

    // 将剩余的数据返回
    bodyFilterOutMeter.Mark(int64(len(task.transactions)))
    select {
    case filter <- task:
    case <-f.quit:
        return
    }
    // 把收集的区块加入到队列
    for _, block := range blocks {
        if announce := f.completing[block.Hash()]; announce != nil {
            f.enqueue(announce.origin, block)
        }
    }
}

至此,fetcher获取完整区块的流程讲完了,最后一步就是讲queued中的区块插入区块链。

1、调用共识引擎的方法f.verifyHeader(block.Header()),验证blockHeader的有效性。 2、如果没问题就广播出去,告诉全世界我的区块链更新了一个新区块。 3、调用f.insertChain(types.Blocks{block}) 插入本地区块链。

// insert spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
func (f *Fetcher) insert(peer string, block *types.Block) {
	hash := block.Hash()

	// Run the import on a new thread
	log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
	go func() {
		defer func() { f.done <- hash }()

		// If the parent's unknown, abort insertion
		parent := f.getBlock(block.ParentHash())
		if parent == nil {
			log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
			return
		}
		// Quickly validate the header and propagate the block if it passes
		switch err := f.verifyHeader(block.Header()); err {
		case nil:
			// All ok, quickly propagate to our peers
			propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
			go f.broadcastBlock(block, true)

		case consensus.ErrFutureBlock:
			// Weird future block, don't fail, but neither propagate

		default:
			// Something went very wrong, drop the peer
			log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
			f.dropPeer(peer)
			return
		}
		// Run the actual import and log any issues
		if _, err := f.insertChain(types.Blocks{block}); err != nil {
			log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
			return
		}
		// If import succeeded, broadcast the block
		propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
		go f.broadcastBlock(block, false)

		// Invoke the testing hook if needed
		if f.importedHook != nil {
			f.importedHook(block)
		}
	}()
}

 

下一章:以太坊源码解读(13)广播和同步 Downloader

protocolManager启动了四个go程,其中包含用于定期同步的协程:syncer,该协程调用protocolManager的synchronise方法调用,选择td最高的节点进 ...