以太坊源码解读(8)以太坊P2P模块——节点发现和K-桶维护

回顾一下,前面说到以太坊分布式网络采用了Kademlia协议,它的特点是: 1、采用了二叉树的拓扑结构; 2、每个节点都对整树进行拆分,分成n棵子树; 3、从每棵树中取K个节点,构成“k-桶”,每个节点控制着n个k-桶; 4、节点的距离是通过异或的二进制运算得到的; 5、k桶中的节点不是固定不变的,而是不断刷新变化的。

下面,我们来看看Kademlia协议在以太坊中的具体实现。

一、以太坊的k桶

以太坊的k值是16,也就是说每个k桶包含16个节点,一共256个k桶。K桶中记录了节点的NodeId,distance,endpoint,ip等信息,按照与target节点的距离进行排序。

distance 0:[2^0, 2^1)node0node1node2...node15
distance 1:[2^1, 2^2)node0node1node2...node15
distance 2:[2^2, 2^3)node0node1node2...node15
distance 3:[2^3, 2^4)node0node1node2...node15
...node0node1node2...node15
distance 255:[2^255, 2^256)node0node1node2...node15

这个表在源码里为Table对象(p2p/discover/table.go):

type Table struct {
	mutex   sync.Mutex        // protects buckets, bucket content, nursery, rand
	buckets [nBuckets]*bucket // index of known nodes by distance
	nursery []*Node           // bootstrap nodes
	rand    *mrand.Rand       // source of randomness, periodically reseeded
	ips     netutil.DistinctNetSet

	db         *nodeDB // database of known nodes
	refreshReq chan chan struct{}
	initDone   chan struct{}
	closeReq   chan struct{}
	closed     chan struct{}

	nodeAddedHook func(*Node) // for testing

	net  transport
	self *Node // metadata of the local node
}

这里有几项是比较重要的: 1)buckets    类型是[nBuckets]*bucket,可以看到这是一个数组,一个bucket就是一个K-桶,一共256个bucket; 2)nursery    信任的种子节点,一个节点启动的时候首先最多能够连接35个种子节点,其中5个是由以太坊官方提供的,另外30个是从数据库里取的; 3)db    以太坊中有两个数据库实例,一个是用来储存区块链,另一个用来储存p2p的节点。 4)refreshReq    刷新K桶事件的管道,其他节点或者其他应用场景可以通过这个管道强制刷新该节点的k桶。

二、table对象的相关方法

1、newTable()新建table

task1:根据外部或默认参数初始化Table类 task2:加载种子节点 task3:启动数据库刷新go程 task4:启动事件监听go程

func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) 
(*Table, error) {
	// If no node database was given, use an in-memory one
	db, err := newNodeDB(nodeDBPath, nodeDBVersion, ourID)
	if err != nil {
		return nil, err
	}
        // 初始化Table类
	tab := &Table{
		net:        t,
		db:         db,
		self:       NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
		refreshReq: make(chan chan struct{}),
		initDone:   make(chan struct{}),
		closeReq:   make(chan struct{}),
		closed:     make(chan struct{}),
		rand:       mrand.New(mrand.NewSource(0)),
		ips:        netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
	}
        // 加载种子节点
        // 首先,初始化K桶
	if err := tab.setFallbackNodes(bootnodes); err != nil {
		return nil, err
	}
	for i := range tab.buckets {
		tab.buckets[i] = &bucket{
			ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
		}
	}
	tab.seedRand() 
	tab.loadSeedNodes() //从table.buckets中随机取30个节点加载种子节点到相应的bucket
	
	// 启动刷新数据库的go程
	tab.db.ensureExpirer()
        // 事件监听go程
	go tab.loop()
	return tab, nil
}

2、加载种子节点    loadSeedNodes()

func (tab *Table) loadSeedNodes() {
	seeds := tab.db.querySeeds(seedCount, seedMaxAge)
	seeds = append(seeds, tab.nursery...)
	for i := range seeds {
		seed := seeds[i]
		age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.lastPongReceived(seed.ID)) }}
		log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
		tab.add(seed)
	}
}

首先是从数据库里随机选取30个节点(seedCount),然后使用table.add()方法将每个节点加载到相应的bucket中。

func (tab *Table) add(n *Node) {
	tab.mutex.Lock()
	defer tab.mutex.Unlock()

	b := tab.bucket(n.sha)
	if !tab.bumpOrAdd(b, n) {
		// Node is not in table. Add it to the replacement list.
		tab.addReplacement(b, n)
	}
}

这里的添加不是直接添加,我们可以看到bucket的结构中有一个replacement列表,当entries是满的时候,新找到的节点不是直接抛弃,而是放到replacement列表中。

type bucket struct {
	entries      []*Node // live entries, sorted by time of last contact
	replacements []*Node // recently seen nodes to be used if revalidation fails
	ips          netutil.DistinctNetSet
}

我们来总结一下k-桶初始化的过程:

1、先新建table对象,连接本地database,如果本地没有database,则先新建一个空的database; 2、初始化K-桶,先获得k-桶信息的源节点:       a. 通过setFallbackNodes(bootnodes)来获得5个nursey节点;       b. 通过tab.loadSeedNodes()——>tab.db.querySeeds()来从本地database获得最多30个节点; 3、把上面的节点存入seeds,进行for循环; 4、在循环内执行tab.add(seed),计算seed节点与本节点的距离,选择相应距离的bucket。如果bucket不满,则用bump()存入bucket;如果bucket已满,则放入replacements。

3、刷新数据库    expireNodes()

实际上是要定期(1小时,nodeDBCleanupCycle = time.Hour)删除数据库中过期的节点。什么是过期的节点?在discovery/database.go中定义了nodeDBNodeExpiration = 24*time.Hour,即只有24小时之内ping过的节点才能得以保留。

func (db *nodeDB) expireNodes() error {
	threshold := time.Now().Add(-nodeDBNodeExpiration)

	// Find discovered nodes that are older than the allowance
	it := db.lvl.NewIterator(nil, nil)
	defer it.Release()

	for it.Next() {
		// Skip the item if not a discovery node
		id, field := splitKey(it.Key())
		if field != nodeDBDiscoverRoot {
			continue
		}
		// Skip the node if not expired yet (and not self)
		if !bytes.Equal(id[:], db.self[:]) {
			if seen := db.lastPongReceived(id); seen.After(threshold) {
				continue
			}
		}
		// Otherwise delete all associated information
		db.deleteNode(id)
	}
	return nil
}

4、事件监听    loop()

// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
	var (
		revalidate     = time.NewTimer(tab.nextRevalidateTime()) // 验证节点是否可以ping通的时间通道
		refresh        = time.NewTicker(refreshInterval)
		copyNodes      = time.NewTicker(copyNodesInterval)
		revalidateDone = make(chan struct{})
		refreshDone    = make(chan struct{})           // where doRefresh reports completion
		waiting        = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
	)
	defer refresh.Stop()
	defer revalidate.Stop()
	defer copyNodes.Stop()

	// doRefresh用于执行lookup以保证k-桶是满的状态
	go tab.doRefresh(refreshDone)

loop:
	for {
		select {
		case <-refresh.C: // 定时刷新k桶事件,refreshInterval=30 min
			tab.seedRand()
			if refreshDone == nil {
				refreshDone = make(chan struct{})
				go tab.doRefresh(refreshDone)
			}
		case req := <-tab.refreshReq: // 刷新k桶的请求事件
			waiting = append(waiting, req)
			if refreshDone == nil {
				refreshDone = make(chan struct{})
				go tab.doRefresh(refreshDone)
			}
		case <-refreshDone:
			for _, ch := range waiting {
				close(ch)
			}
			waiting, refreshDone = nil, nil
		case <-revalidate.C: // 验证k桶节点有效性,10 second
			go tab.doRevalidate(revalidateDone)
		case <-revalidateDone:
			revalidate.Reset(tab.nextRevalidateTime())
		case <-copyNodes.C: // 定时(30秒)将节点存入数据库,如果某个节点在k桶中存在超过5分钟,则认为它是一个稳定的节点
			go tab.copyLiveNodes()
		case <-tab.closeReq:
			break loop
		}
	}

	if tab.net != nil {
		tab.net.close()
	}
	if refreshDone != nil {
		<-refreshDone
	}
	for _, ch := range waiting {
		close(ch)
	}
	tab.db.close()
	close(tab.closed)
}

通过这个函数,我们看到我们的table以及k-桶是如何维护的:

1、每30分钟自动刷新k-桶(刷新k-桶可以补充或保持table是满的状态,刚初始化的table可能并不是满的,需要不断的补充和更新); 2、每10秒钟就去验证k-桶中的节点是否可以ping通; 3、每30秒就将k-桶中存在超过5分钟的节点存入本地数据库,视作稳定节点;

三、节点的查找doRefresh()、lookup()

1、doRefresh()

// doRefresh通过lookup()去查找一个随机的节点来保持bucket满载。
func (tab *Table) doRefresh(done chan struct{}) {
	defer close(done)

	// 加载节点,这些节点在最近一次看见时依然是活动的
	tab.loadSeedNodes()

	// 先用自己的节点ID,运行lookup来发现邻居节点
	tab.lookup(tab.self.ID, false)

	for i := 0; i < 3; i++ {
		var target NodeID
                // 随机一个target,进行lookup
		crand.Read(target[:])
		tab.lookup(target, false)
	}
}

2、lookup函数

task1:从k桶中查找16个离target最近的节点,保存到result切片中; task2:节点发现主循环(使用上一步中查找到的节点进行挨个询问最近的节点,更新result,保证result中的16个节点是最近的)

流程图

func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
	var (
		target         = crypto.Keccak256Hash(targetID[:])
		asked          = make(map[NodeID]bool) // 被访问过并接收到返回result切片的节点
		seen           = make(map[NodeID]bool) // 在result切片中但还没有访问的节点
		reply          = make(chan []*Node, alpha)
		pendingQueries = 0
		result         *nodesByDistance
	)
	// 不需要询问自己,放在asked里就不用再访问
	asked[tab.self.ID] = true
        // ---------------------------------------------------------
        // task1:从k桶中查找16个离targetId最近的点
        // ---------------------------------------------------------
	for {
		tab.mutex.Lock()
		// 初始化result切片,从k桶中最多取离目标最近的16个非初始节点
                // closest采用最笨的办法,就是遍历table中的每一个节点,比较距离
		result = tab.closest(target, bucketSize)
		tab.mutex.Unlock()
                // 如果从k桶中获取的节点数量大于0,或者上一次循环没有获取到初始节点,直接退出本次lookup
		if len(result.entries) > 0 || !refreshIfEmpty {
			break
		}
                // 如果一个都没找到,则发送刷新事件,从数据库中重新加载种子节点
		<-tab.refresh()
		refreshIfEmpty = false
	}

        // --------------------------------------------------------
        // task2:对result中16个节点进行邻近节点查询
        // 执行至此,说明result中不为空
        // --------------------------------------------------------
	for {
		// 并发查询,同时最多3个goroutine并发请求(通过pendingQueries参数进行控制)
		for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
			n := result.entries[i]
			if !asked[n.ID] { // 只有未查询过的才能查询
				asked[n.ID] = true
				pendingQueries++
				go tab.findnode(n, targetID, reply)
			}
		}
                // 如果没有goroutine在请求,说明result中的节点都是最新的,且都询问过
		if pendingQueries == 0 {
			// we have asked all closest nodes, stop the search
			break
		}
		// 上面启动的3个goroutine返回值为reply,检查如果reply非空且没有seen过
		for _, n := range <-reply {
			if n != nil && !seen[n.ID] {
				seen[n.ID] = true
                                // push函数将节点放入result中,保证result数量不超过16
				result.push(n, bucketSize)
			}
		}
                // 到这里说明某个节点返回了结果,pendingQueries减少后又可以启动新的go程
		pendingQueries--
	}
	return result.entries
}

看到这里,我们发现lookup只返回了一个result.entries,但是这些新找到的节点如何更新到K桶里呢?原来在lookup执行的过程中,就开启了go程,执行tab.findnode(),这个函数直接将找到的节点add进了K桶中。

func (tab *Table) findnode(n *Node, targetID NodeID, reply chan<- []*Node) {
        // 查找失败的节点会储存在本地数据库中
	fails := tab.db.findFails(n.ID)
	r, err := tab.net.findnode(n.ID, n.addr(), targetID)
	if err != nil || len(r) == 0 {
		fails++
		tab.db.updateFindFails(n.ID, fails)
		log.Trace("Findnode failed", "id", n.ID, "failcount", fails, "err", err)
                // 如果有5次以上fails,该节点会被抛弃,从表中删除
		if fails >= maxFindnodeFailures {
			log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
			tab.delete(n)
		}
	} else if fails > 0 {
		tab.db.updateFindFails(n.ID, fails-1)
	}

	// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
	// just remove those again during revalidation.
	for _, n := range r {
		tab.add(n)
	}
	reply <- r
}

所以上面提到的k-桶的维护中每30分钟就要刷新k桶,即调用doRefresh(),doRefresh首先对自身的节点查询,更新了最近的16个节点,然后又对随机的节点进行lookup查询,更新了相应k-桶中的16个节点。

一张图来回顾一下整个table新建和维护的过程:

四、节点查找的通信协议

节点的查找是基于UDP的通信协议:

分类功能描述构成
PING探测一个节点,判断是否在线
type ping struct {
   Version    uint
   From, To   rpcEndpoint
   Expiration uint64
   Rest []rlp.RawValue `rlp:"tail"`
}
PONGPING命令响应
type pong struct {
   To rpcEndpoint
   ReplyTok   []byte 
   Expiration uint64 
   Rest []rlp.RawValue `rlp:"tail"`
}
FINDNODE向节点查询某个与目标节点ID距离接近的节点
type findnode struct {
   Target     NodeID 
   Expiration uint64
   Rest []rlp.RawValue `rlp:"tail"`
}
NEIGHBORSFIND_NODE命令响应,发送与目标节点ID距离接近的K桶中的节点
type neighbors struct {
   Nodes      []rpcNode
   Expiration uint64
   Rest []rlp.RawValue `rlp:"tail"`
}

下一章:以太坊源码解读(9)以太坊P2P模块——底层网络构建和启动

以太坊的底层p2pServer,大约可以分为三层: 1、底层:table对象、node对象,它们分别定义了底层的路由表以及本地节点的数据结构、搜索和验证;     1)datab ...