以太坊源码解读(8)以太坊P2P模块——节点发现和K-桶维护
回顾一下,前面说到以太坊分布式网络采用了Kademlia协议,它的特点是: 1、采用了二叉树的拓扑结构; 2、每个节点都对整树进行拆分,分成n棵子树; 3、从每棵树中取K个节点,构成“k-桶”,每个节点控制着n个k-桶; 4、节点的距离是通过异或的二进制运算得到的; 5、k桶中的节点不是固定不变的,而是不断刷新变化的。
下面,我们来看看Kademlia协议在以太坊中的具体实现。
一、以太坊的k桶
以太坊的k值是16,也就是说每个k桶包含16个节点,一共256个k桶。K桶中记录了节点的NodeId,distance,endpoint,ip等信息,按照与target节点的距离进行排序。
| distance 0:[2^0, 2^1) | node0 | node1 | node2 | ... | node15 |
| distance 1:[2^1, 2^2) | node0 | node1 | node2 | ... | node15 |
| distance 2:[2^2, 2^3) | node0 | node1 | node2 | ... | node15 |
| distance 3:[2^3, 2^4) | node0 | node1 | node2 | ... | node15 |
| ... | node0 | node1 | node2 | ... | node15 |
| distance 255:[2^255, 2^256) | node0 | node1 | node2 | ... | node15 |
这个表在源码里为Table对象(p2p/discover/table.go):
type Table struct {
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*Node // bootstrap nodes
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet
db *nodeDB // database of known nodes
refreshReq chan chan struct{}
initDone chan struct{}
closeReq chan struct{}
closed chan struct{}
nodeAddedHook func(*Node) // for testing
net transport
self *Node // metadata of the local node
}
这里有几项是比较重要的: 1)buckets 类型是[nBuckets]*bucket,可以看到这是一个数组,一个bucket就是一个K-桶,一共256个bucket; 2)nursery 信任的种子节点,一个节点启动的时候首先最多能够连接35个种子节点,其中5个是由以太坊官方提供的,另外30个是从数据库里取的; 3)db 以太坊中有两个数据库实例,一个是用来储存区块链,另一个用来储存p2p的节点。 4)refreshReq 刷新K桶事件的管道,其他节点或者其他应用场景可以通过这个管道强制刷新该节点的k桶。
二、table对象的相关方法
1、newTable()新建table
task1:根据外部或默认参数初始化Table类 task2:加载种子节点 task3:启动数据库刷新go程 task4:启动事件监听go程
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node)
(*Table, error) {
// If no node database was given, use an in-memory one
db, err := newNodeDB(nodeDBPath, nodeDBVersion, ourID)
if err != nil {
return nil, err
}
// 初始化Table类
tab := &Table{
net: t,
db: db,
self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
}
// 加载种子节点
// 首先,初始化K桶
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
tab.seedRand()
tab.loadSeedNodes() //从table.buckets中随机取30个节点加载种子节点到相应的bucket
// 启动刷新数据库的go程
tab.db.ensureExpirer()
// 事件监听go程
go tab.loop()
return tab, nil
}
2、加载种子节点 loadSeedNodes()
func (tab *Table) loadSeedNodes() {
seeds := tab.db.querySeeds(seedCount, seedMaxAge)
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.lastPongReceived(seed.ID)) }}
log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
tab.add(seed)
}
}
首先是从数据库里随机选取30个节点(seedCount),然后使用table.add()方法将每个节点加载到相应的bucket中。
func (tab *Table) add(n *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.sha)
if !tab.bumpOrAdd(b, n) {
// Node is not in table. Add it to the replacement list.
tab.addReplacement(b, n)
}
}
这里的添加不是直接添加,我们可以看到bucket的结构中有一个replacement列表,当entries是满的时候,新找到的节点不是直接抛弃,而是放到replacement列表中。
type bucket struct {
entries []*Node // live entries, sorted by time of last contact
replacements []*Node // recently seen nodes to be used if revalidation fails
ips netutil.DistinctNetSet
}
我们来总结一下k-桶初始化的过程:
1、先新建table对象,连接本地database,如果本地没有database,则先新建一个空的database; 2、初始化K-桶,先获得k-桶信息的源节点: a. 通过setFallbackNodes(bootnodes)来获得5个nursey节点; b. 通过tab.loadSeedNodes()——>tab.db.querySeeds()来从本地database获得最多30个节点; 3、把上面的节点存入seeds,进行for循环; 4、在循环内执行tab.add(seed),计算seed节点与本节点的距离,选择相应距离的bucket。如果bucket不满,则用bump()存入bucket;如果bucket已满,则放入replacements。
3、刷新数据库 expireNodes()
实际上是要定期(1小时,nodeDBCleanupCycle = time.Hour)删除数据库中过期的节点。什么是过期的节点?在discovery/database.go中定义了nodeDBNodeExpiration = 24*time.Hour,即只有24小时之内ping过的节点才能得以保留。
func (db *nodeDB) expireNodes() error {
threshold := time.Now().Add(-nodeDBNodeExpiration)
// Find discovered nodes that are older than the allowance
it := db.lvl.NewIterator(nil, nil)
defer it.Release()
for it.Next() {
// Skip the item if not a discovery node
id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
continue
}
// Skip the node if not expired yet (and not self)
if !bytes.Equal(id[:], db.self[:]) {
if seen := db.lastPongReceived(id); seen.After(threshold) {
continue
}
}
// Otherwise delete all associated information
db.deleteNode(id)
}
return nil
}
4、事件监听 loop()
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime()) // 验证节点是否可以ping通的时间通道
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
revalidateDone = make(chan struct{})
refreshDone = make(chan struct{}) // where doRefresh reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// doRefresh用于执行lookup以保证k-桶是满的状态
go tab.doRefresh(refreshDone)
loop:
for {
select {
case <-refresh.C: // 定时刷新k桶事件,refreshInterval=30 min
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq: // 刷新k桶的请求事件
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case <-refreshDone:
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
case <-revalidate.C: // 验证k桶节点有效性,10 second
go tab.doRevalidate(revalidateDone)
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
case <-copyNodes.C: // 定时(30秒)将节点存入数据库,如果某个节点在k桶中存在超过5分钟,则认为它是一个稳定的节点
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
}
}
if tab.net != nil {
tab.net.close()
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
tab.db.close()
close(tab.closed)
}
通过这个函数,我们看到我们的table以及k-桶是如何维护的:
1、每30分钟自动刷新k-桶(刷新k-桶可以补充或保持table是满的状态,刚初始化的table可能并不是满的,需要不断的补充和更新); 2、每10秒钟就去验证k-桶中的节点是否可以ping通; 3、每30秒就将k-桶中存在超过5分钟的节点存入本地数据库,视作稳定节点;
三、节点的查找doRefresh()、lookup()
1、doRefresh()
// doRefresh通过lookup()去查找一个随机的节点来保持bucket满载。
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// 加载节点,这些节点在最近一次看见时依然是活动的
tab.loadSeedNodes()
// 先用自己的节点ID,运行lookup来发现邻居节点
tab.lookup(tab.self.ID, false)
for i := 0; i < 3; i++ {
var target NodeID
// 随机一个target,进行lookup
crand.Read(target[:])
tab.lookup(target, false)
}
}
2、lookup函数
task1:从k桶中查找16个离target最近的节点,保存到result切片中; task2:节点发现主循环(使用上一步中查找到的节点进行挨个询问最近的节点,更新result,保证result中的16个节点是最近的)
流程图

func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool) // 被访问过并接收到返回result切片的节点
seen = make(map[NodeID]bool) // 在result切片中但还没有访问的节点
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// 不需要询问自己,放在asked里就不用再访问
asked[tab.self.ID] = true
// ---------------------------------------------------------
// task1:从k桶中查找16个离targetId最近的点
// ---------------------------------------------------------
for {
tab.mutex.Lock()
// 初始化result切片,从k桶中最多取离目标最近的16个非初始节点
// closest采用最笨的办法,就是遍历table中的每一个节点,比较距离
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
// 如果从k桶中获取的节点数量大于0,或者上一次循环没有获取到初始节点,直接退出本次lookup
if len(result.entries) > 0 || !refreshIfEmpty {
break
}
// 如果一个都没找到,则发送刷新事件,从数据库中重新加载种子节点
<-tab.refresh()
refreshIfEmpty = false
}
// --------------------------------------------------------
// task2:对result中16个节点进行邻近节点查询
// 执行至此,说明result中不为空
// --------------------------------------------------------
for {
// 并发查询,同时最多3个goroutine并发请求(通过pendingQueries参数进行控制)
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] { // 只有未查询过的才能查询
asked[n.ID] = true
pendingQueries++
go tab.findnode(n, targetID, reply)
}
}
// 如果没有goroutine在请求,说明result中的节点都是最新的,且都询问过
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
// 上面启动的3个goroutine返回值为reply,检查如果reply非空且没有seen过
for _, n := range <-reply {
if n != nil && !seen[n.ID] {
seen[n.ID] = true
// push函数将节点放入result中,保证result数量不超过16
result.push(n, bucketSize)
}
}
// 到这里说明某个节点返回了结果,pendingQueries减少后又可以启动新的go程
pendingQueries--
}
return result.entries
}
看到这里,我们发现lookup只返回了一个result.entries,但是这些新找到的节点如何更新到K桶里呢?原来在lookup执行的过程中,就开启了go程,执行tab.findnode(),这个函数直接将找到的节点add进了K桶中。
func (tab *Table) findnode(n *Node, targetID NodeID, reply chan<- []*Node) {
// 查找失败的节点会储存在本地数据库中
fails := tab.db.findFails(n.ID)
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil || len(r) == 0 {
fails++
tab.db.updateFindFails(n.ID, fails)
log.Trace("Findnode failed", "id", n.ID, "failcount", fails, "err", err)
// 如果有5次以上fails,该节点会被抛弃,从表中删除
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
tab.db.updateFindFails(n.ID, fails-1)
}
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
for _, n := range r {
tab.add(n)
}
reply <- r
}
所以上面提到的k-桶的维护中每30分钟就要刷新k桶,即调用doRefresh(),doRefresh首先对自身的节点查询,更新了最近的16个节点,然后又对随机的节点进行lookup查询,更新了相应k-桶中的16个节点。
一张图来回顾一下整个table新建和维护的过程:

四、节点查找的通信协议
节点的查找是基于UDP的通信协议:
| 分类 | 功能描述 | 构成 |
| PING | 探测一个节点,判断是否在线 | type ping struct {
Version uint
From, To rpcEndpoint
Expiration uint64
Rest []rlp.RawValue `rlp:"tail"`
} |
| PONG | PING命令响应 | type pong struct {
To rpcEndpoint
ReplyTok []byte
Expiration uint64
Rest []rlp.RawValue `rlp:"tail"`
} |
| FINDNODE | 向节点查询某个与目标节点ID距离接近的节点 | type findnode struct {
Target NodeID
Expiration uint64
Rest []rlp.RawValue `rlp:"tail"`
} |
| NEIGHBORS | FIND_NODE命令响应,发送与目标节点ID距离接近的K桶中的节点 | type neighbors struct {
Nodes []rpcNode
Expiration uint64
Rest []rlp.RawValue `rlp:"tail"`
} |
下一章:以太坊源码解读(9)以太坊P2P模块——底层网络构建和启动
以太坊的底层p2pServer,大约可以分为三层: 1、底层:table对象、node对象,它们分别定义了底层的路由表以及本地节点的数据结构、搜索和验证; 1)datab ...
AI 中文社