以太坊源码解读(9)以太坊P2P模块——底层网络构建和启动
以太坊的底层p2pServer,大约可以分为三层:
1、底层:table对象、node对象,它们分别定义了底层的路由表以及本地节点的数据结构、搜索和验证; 1)database.go //封装node数据库相关操作 2)node.go //节点数据结构 3)ntp.go //同步时间 4)table.go //路由表 5)udp.go //网络相关操作
2、中层:peer对象定义了远端节点、message对象开放发送接口、server对象则提供peer节点的检测、初始化、事件订阅、状态查询、启动和停止等功能; 1)dial.go //封装一个任务生成处理结构以及三种任务结构中(此处命名不太精确) 2)message.go //定义一些数据的读写接口,以及对外的Send/SendItem函数 3)peer.go //封装了Peer 包括消息读取 4)rlpx.go //内部的握手协议 5)server.go //初始化,维护Peer网络,还有一些对外的接口
3、顶层:在eth/peer.go中对p2p/peer.go的peer再封装,包含了对该节点广播的更多区块链的信息,如交易、交易hash、区块以及区块头hash等。peer最终会被收集在peerset中使用。 1)eth/peer.go // 封装了peer和peerset两个结构体以及一些广播数据的方法 2)eth/handler.go // 封装了很多协议管理工具
一、p2p.Server基本结构
Server用于管理所有的peer连接:
type Server struct { // Config包含了所有Server的配置选项,Service第一启动的时候Config未必初始化 Config // 用于测试的hooks newTransport func(net.Conn) transport newPeerHook func(*Peer) lock sync.Mutex // protects running running bool ntab discoverTable // 包括Self()、Close()、Resolve()、Lookup()、ReadRandomNodes()等函数的接口 listener net.Listener ourHandshake *protoHandshake // 协议握手的RLP结构 lastLookup time.Time DiscV5 *discv5.Network // 基于V5发现协议的topic-discovery网络 // 下面是关于peer的操作 peerOp chan peerOpFunc peerOpDone chan struct{} quit chan struct{} addstatic chan *discover.Node removestatic chan *discover.Node addtrusted chan *discover.Node removetrusted chan *discover.Node posthandshake chan *conn addpeer chan *conn delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop peerFeed event.Feed log log.Logger }
再看看Server的配置:
type Config struct { PrivateKey *ecdsa.PrivateKey `toml:"-"` // 本地节点的秘钥 MaxPeers int // 可连接的节点最大数值 // maxpendingpeer是在握手阶段中可以挂起的最大对等点数量,分别计算入站连接和出站连接。预设值的默认值为零 MaxPendingPeers int `toml:",omitempty"` // 拨号比率控制入站与拨号连接的比率。例如:拨号比率为2允许1/2的连接被拨号。设置拨号比率为零默认为3。 DialRatio int `toml:",omitempty"` NoDiscovery bool // NoDiscovery 用来禁用节点发现机制,常用于协议debugging DiscoveryV5 bool `toml:",omitempty"` // DiscoveryV5 决定了是否启用topic-discovery协议 Name string `toml:"-"` // 设置节点名称 BootstrapNodes []*discover.Node // BootstrapNodes 用于建立与网络其余部分的连接。 BootstrapNodesV5 []*discv5.Node `toml:",omitempty"` // BootstrapNodesV5 用于建立与网络其余部分的V5连接。 StaticNodes []*discover.Node // 静态节点用作预先配置的连接,在断开连接时总是维护和重新连接。 TrustedNodes []*discover.Node // 可信节点被用作预先配置的连接,这些连接总是允许连接的,甚至超过对等限制。 // 连接可以被限制到某些IP网络。如果将此选项设置为非nil值,则只考虑与列表中包含的IP网络之一匹配的主机。 NetRestrict *netutil.Netlist `toml:",omitempty"` // NodeDatabase是到数据库的路径,其中包含以前在网络中看到的活动节点 NodeDatabase string `toml:",omitempty"` // 协议应该包含服务器支持的协议。为每个对等点启动匹配协议。 Protocols []Protocol `toml:"-"` // 如果ListenAddr设置为非nil地址,服务器将侦听传入的连接。 // 如果端口为零,操作系统将选择一个端口。当服务器启动时,ListenAddr字段将更新实际地址。 ListenAddr string // 如果设置为非nil值,则使用给定的NAT端口映射器使侦听端口对Internet可用。 NAT nat.Interface `toml:",omitempty"` // 如果拨号器设置为非空值,则使用给定的拨号器拨号出站peer连接。 Dialer NodeDialer `toml:"-"` // 如果NoDial是真的,服务器将不会拨任何节点。 NoDial bool `toml:",omitempty"` // 如果EnableMsgEvents被设置,那么当消息发送到peer或从peer接收到时,服务器将发出PeerEvents EnableMsgEvents bool // 日志记录器是一个自定义日志记录器 Logger log.Logger `toml:",omitempty"` }
配置里包括了服务可连接的节点(静态节点、信任节点、剩余节点)、节点发现机制、协议列表(包括ethereum协议)、端口控制。
二、p2p.Server的启动:sever.Start()
以太坊的启动流程可以参考之前的文章:以太坊源码解读(3)以太坊启动流程简析
-> geth -> startNode // 首先要启动节点 -> utils.StartNode -> Node.Start -> eth.Start // 启动以太坊对象 -> protocolManager.start() // 开启协议管理器 -> go txBroadCastLoop -> go minedBroadCastLoop -> go txsyncLoop -> go syncer -> server.Start // 启动服务 -> ListenUDP // 监听UDP端口,以太坊节点之间通信使用的是UDP协议 -> newUDP // 新建UDP -> newTable // 新建路由表 -> utils.RegisterEthService // 注册以太坊服务 -> eth.New // 新建以太坊对象 -> core.SetupGenesisBlock -> core.NewBlockChian -> core.NewTxPool -> protocol.Manger
之前分析以太坊启动流程的时候提到,启动节点后有两条路线,一个是注册并启动以太坊服务,另一个就是启动p2p server。Start()函数主要做了三件事:
a. 生成路由表于建立底层网络; b. 生成DialState用于驱动维护本地peer的更新与死亡; c. 监听本地接口用于信息应答。
现在来看看server.Start()的源码:
1)首先要设置Server的基本静态属性
func (srv *Server) Start() (err error) { srv.lock.Lock() defer srv.lock.Unlock() if srv.running { return errors.New("server already running") } srv.running = true srv.log = srv.Config.Logger if srv.log == nil { srv.log = log.New() } srv.log.Info("Starting P2P networking") // 配置p2p服务 if srv.PrivateKey == nil { return fmt.Errorf("Server.PrivateKey must be set to a non-nil key") } if srv.newTransport == nil { srv.newTransport = newRLPX } if srv.Dialer == nil { srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}} } srv.quit = make(chan struct{}) srv.addpeer = make(chan *conn) srv.delpeer = make(chan peerDrop) srv.posthandshake = make(chan *conn) srv.addstatic = make(chan *discover.Node) srv.removestatic = make(chan *discover.Node) srv.addtrusted = make(chan *discover.Node) srv.removetrusted = make(chan *discover.Node) srv.peerOp = make(chan peerOpFunc) srv.peerOpDone = make(chan struct{})
2)task1:配置Server的网络,生成路由表
var ( conn *net.UDPConn sconn *sharedUDPConn realaddr *net.UDPAddr unhandled chan discover.ReadPacket )
其中,sharedUDPConn实现一个共享连接。Write将消息发送到基础连接,而read将返回发现不可处理的消息,并由主侦听器发送到未处理的通道。
// 解析本地地址,开启UDP端口 if !srv.NoDiscovery || srv.DiscoveryV5 { addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr) if err != nil { return err } conn, err = net.ListenUDP("udp", addr) if err != nil { return err } // 将UDP的端口注册(map)到NAT网络,使内网程序获得真实的外网IP地址 realaddr = conn.LocalAddr().(*net.UDPAddr) if srv.NAT != nil { if !realaddr.IP.IsLoopback() { go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery") } // TODO: react to external IP changes over time. if ext, err := srv.NAT.ExternalIP(); err == nil { realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port} } } } // 设置unhandled通道以及共享的UDP网络 if !srv.NoDiscovery && srv.DiscoveryV5 { unhandled = make(chan discover.ReadPacket, 100) sconn = &sharedUDPConn{conn, unhandled} } // 配置一个discoverTable接口,用以配置并生成节点路由表table if !srv.NoDiscovery { cfg := discover.Config{ PrivateKey: srv.PrivateKey, AnnounceAddr: realaddr, NodeDBPath: srv.NodeDatabase, NetRestrict: srv.NetRestrict, Bootnodes: srv.BootstrapNodes, Unhandled: unhandled, } ntab, err := discover.ListenUDP(conn, cfg) if err != nil { return err } srv.ntab = ntab } // 配置一个DiscoveryV5网络协议,生成节点路由表table if srv.DiscoveryV5 { var ( ntab *discv5.Network err error ) if sconn != nil { ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase) } else { ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase) } if err != nil { return err } if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil { return err } srv.DiscV5 = ntab }
discover.ListenUDP和discv5.ListenUDP均返回一个新表,侦听laddr上的UDP数据包。discV5还是一个实验性质的网络协议,我们先重点看普通的discover.ListenUDP。
func ListenUDP(c conn, cfg Config) (*Table, error) { tab, _, err := newUDP(c, cfg) if err != nil { return nil, err } log.Info("UDP listener up", "self", tab.self) return tab, nil }
newUDP(c,cfg)函数配置了udp对象,调用newTable()生成了路由表,并将其返回,同时开启了两个协程,分别是开启等待Pending reply协程和网络数据获取协程(处理收到的UDP packet)。
func newUDP(c conn, cfg Config) (*Table, *udp, error) { udp := &udp{ conn: c, priv: cfg.PrivateKey, netrestrict: cfg.NetRestrict, closing: make(chan struct{}), gotreply: make(chan reply), addpending: make(chan *pending), } realaddr := c.LocalAddr().(*net.UDPAddr) if cfg.AnnounceAddr != nil { realaddr = cfg.AnnounceAddr } // TODO: separate TCP port udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port)) tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes) if err != nil { return nil, nil, err } udp.Table = tab go udp.loop() go udp.readLoop(cfg.Unhandled) return udp.Table, udp, nil }
3)task2:生成DialState用于驱动维护本地peer的更新与死亡
dynPeers := srv.maxDialedConns() //newDialState 对象生成,这个对象包含Peer的实际维护代码 dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict) ... //重要的一句,开个协程,在其中做peer的维护 go srv.run(dialer) srv.running = true return nil }
4)task3:启动TCP端口,监听本地接口用于信息应答
// handshake 协议加载 srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)} for _, p := range srv.Protocols { srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap()) } // listen/dial //监听本地端口 if srv.ListenAddr != "" { if err := srv.startListening(); err != nil { return err } } if srv.NoDial && srv.ListenAddr == "" { srv.log.Warn("P2P server will be useless, neither dialing nor listening") }
三、server.Run()
Server.Start()中通过Server.startListening()启动一个单独线程(listenLoop())去监听某个端口有无主动发来的IP连接;另外再启动一个单独线程调用run()函数,在无限循环里处理接收到的任何新消息新对象。在run()函数中,如果有远端peer发来连接请求(新的p2p.conn{}),则调用Server.newPeer()生成新的peer对象,并把Server.Protocols全交给peer。
Server.run()函数接受的是一个dialstate对象,它控制着dial事务和discover lookup事务。
type dialstate struct { maxDynDials int ntab discoverTable // 该接口包含Lookup()方法 netrestrict *netutil.Netlist lookupRunning bool dialing map[discover.NodeID]connFlag lookupBuf []*discover.Node // 当前lookup的结果 randomNodes []*discover.Node // 从table中随机选出的Node static map[discover.NodeID]*dialTask hist *dialHistory start time.Time // time when the dialer was first used bootnodes []*discover.Node // 当没有peers的时候默认dial的节点 }
下面我们看看Server.run(dialer)执行的过程。首先,函数启动后,在内存中定义下面几个变量:
var ( peers = make(map[discover.NodeID]*Peer) inboundCount = 0 trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes)) taskdone = make(chan task, maxActiveDialTasks) runningTasks []task queuedTasks []task // tasks that can't run yet )
peers:所有建立了连接的peer; trusted:信任的节点,因为dial其他节点的时候都要进行验证,对信任的节点可以加速验证过程; runningTasks和queuedTasks分别是正在执行的任务和待执行的任务。
task是什么?
在dial.go中定义了三类task:dialTask,discoverTask和waitExpireTask,以及一个task接口:
type task interface { Do(*Server) } // 每向一个节点发起连接就会生成一个dialTask type dialTask struct { flags connFlag dest *discover.Node lastResolved time.Time resolveDelay time.Duration } // 有时候动态连接的节点不足时,必须要到table中查找新的节点 type discoverTask struct { results []*discover.Node } // waitExpireTask用来保证当没有其他任务时Server.run()的持续执行 type waitExpireTask struct { time.Duration }至于这三种任务何时生成,则属于newTasks()的控制范围。
然后,Server.run()中定义了对tasks的调度函数,用来管理上述的两个[]task,同时对task进行执行:task.Do()。
// removes t from runningTasks delTask := func(t task) { for i := range runningTasks { if runningTasks[i] == t { runningTasks = append(runningTasks[:i], runningTasks[i+1:]...) break } } } // starts until max number of active tasks is satisfied startTasks := func(ts []task) (rest []task) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] srv.log.Trace("New dial task", "task", t) go func() { t.Do(srv); taskdone <- t }() runningTasks = append(runningTasks, t) } return ts[i:] } scheduleTasks := func() { // 先从队列中开始执行task queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...) // 尽可能多的启动task,所以会新建task进行执行 if len(runningTasks) < maxActiveDialTasks { nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now()) queuedTasks = append(queuedTasks, startTasks(nt)...) } }
当这些都准备完后,Server.run()的主体就要正式执行了,一个无限执行的for循环,监听Server各通道的传值,然后执行相应的任务:
for { scheduleTasks() select { case <-srv.quit: break running case n := <-srv.addstatic: srv.log.Trace("Adding static node", "node", n) dialstate.addStatic(n) case n := <-srv.removestatic: srv.log.Trace("Removing static node", "node", n) dialstate.removeStatic(n) if p, ok := peers[n.ID]; ok { p.Disconnect(DiscRequested) } case n := <-srv.addtrusted: srv.log.Trace("Adding trusted node", "node", n) trusted[n.ID] = true if p, ok := peers[n.ID]; ok { p.rw.set(trustedConn, true) } case n := <-srv.removetrusted: srv.log.Trace("Removing trusted node", "node", n) if _, ok := trusted[n.ID]; ok { delete(trusted, n.ID) } if p, ok := peers[n.ID]; ok { p.rw.set(trustedConn, false) } case op := <-srv.peerOp: op(peers) srv.peerOpDone <- struct{}{} case t := <-taskdone: srv.log.Trace("Dial task done", "task", t) dialstate.taskDone(t, time.Now()) delTask(t) case c := <-srv.posthandshake: if trusted[c.id] { c.flags |= trustedConn } select { case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c): case <-srv.quit: break running } case c := <-srv.addpeer: err := srv.protoHandshakeChecks(peers, inboundCount, c) if err == nil { p := newPeer(c, srv.Protocols) if srv.EnableMsgEvents { p.events = &srv.peerFeed } name := truncateName(c.name) srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) go srv.runPeer(p) peers[c.id] = p if p.Inbound() { inboundCount++ } } select { case c.cont <- err: case <-srv.quit: break running } case pd := <-srv.delpeer: d := common.PrettyDuration(mclock.Now() - pd.created) pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err) delete(peers, pd.ID()) if pd.Inbound() { inboundCount-- } } }
每次循环都先执行scheduleTasks(),然后通过StartTasks()开启一个协程执行task.Do(),然后下面监听各通道的传值。多数情况下,task就是dialTask,同一时间内discoverTask只能存在一个。task.Do()所做的事情是调用dial()方法,向task.dest中的节点发起连接,后者的执行路径是:
task.dial() ——> Server.SetupConn() ——> Server.checkpoint(c, srv.posthandshake) ——>Server.checkpoint(c, srv.addpeer)
按照for循环,Server的addpeer通道传出conn对象的时候,就会执行newPeer(),然后启动一个协程执行peer.Run(),从而实现两个节点之间的连接。
整个过程如下图所示:
从上图来看,p2p启动的TCP协议就是两条goroutine,分别是ser.listenLoop和srv.run(dialer)。
listenLoop:接收外部的请求 run:主动发起连接来连接外部节点的流程以及处理checkpoint队列信息的流程
p2p |--discover |--discv5 |--enr |--nat |--netutil |--protocols |--simulations |--testing dial.go message.go metrics.go peer.go protocol.go rlpx.go sever.go
TCP数据解析采用Rlpx协议,Tcp节点挑选和连接的处理主要在dial,节点之间的数据逻辑主要在peer。
1. rlpx.go:RLPx协议就定义了TCP链接过程的的加密过程
RLPX:Perfect Forward Secrecy, 链接的两方生成生成随机的私钥,通过随机的私钥得到公钥。 然后双方交换各自的公钥, 这样双方都可以通过自己随机的私钥和对方的公钥来生成一个同样的共享密钥(shared-secret)。后续的通讯使用这个共享密钥作为对称加密算法的密钥。 这样来说。如果有一天一方的私钥被泄露,也只会影响泄露之后的消息的安全性, 对于之前的通讯是安全的(因为通讯的密钥是随机生成的,用完后就消失了)。
2. dial.go:p2p里面主要负责建立链接的部分工作。 比如发现建立链接的节点。 与节点建立链接。 通过discover来查找指定节点的地址等功能
3. peer.go:peer代表了一条创建好的网络链路。在一条链路上可能运行着多个协议,负责发送和接受message。
四、peer&message
peer.run
peer.run会开启两个协程,一个是pingLoop(),调用SendItems(p.rw, pingMsg)来发起ping请求;另一个是ReadLoop(),其中的for循环不断地调用ReadMsg()读取msg,然后调用peer.handle(msg)来处理msg。如果msg是pingMsg,则发送一个pong回应;如果是其他的信息,则将msg交给proto.in这个通道,等待protocolManager.handleMsg()从通道中取出。
func (p *Peer) run() (remoteRequested bool, err error) { var ( writeStart = make(chan struct{}, 1) writeErr = make(chan error, 1) readErr = make(chan error, 1) reason DiscReason // sent to the peer ) p.wg.Add(2) go p.readLoop(readErr) go p.pingLoop() // Start all protocol handlers. writeStart <- struct{}{} p.startProtocols(writeStart, writeErr) // Wait for an error or disconnect. loop: for { ... } }
readLoop()和handle()
func (p *Peer) readLoop(errc chan<- error) { defer p.wg.Done() for { msg, err := p.rw.ReadMsg() if err != nil { errc <- err return } msg.ReceivedAt = time.Now() if err = p.handle(msg); err != nil { errc <- err return } } } func (p *Peer) handle(msg Msg) error { switch { case msg.Code == pingMsg: msg.Discard() go SendItems(p.rw, pongMsg) case msg.Code == discMsg: var reason [1]DiscReason // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. rlp.Decode(msg.Payload, &reason) return reason[0] case msg.Code < baseProtocolLength: // ignore other base protocol messages return msg.Discard() default: // it's a subprotocol message proto, err := p.getProto(msg.Code) if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } select { case proto.in <- msg: return nil case <-p.closed: return io.EOF } } return nil }
最后,peer.run调用starProtocols()函数让协议运行起来,proto.Run()在protocolManager初始化的时候定义了它将会调用protocolManager.handle(peer)进而调用handleMsg()来读取msg,并执行相应的操作。
handleMsg()是protocolManager中专门用来处理远端消息的,例如如果从远端得到了一个NewBlockMsg,就会紧接着通过Fetcher模块从本地取出block信息,返回给远端节点。因此,handleMsg()本身就是一个回调函数。
总结
通过这两篇文章,我们基本上了解了以太坊p2p网络的基本模块和工作方式: 1、以太坊网络分为三层:一是底层的k桶和储存节点的数据库模块;二是p2p网络服务的逻辑层,包括Server模块、message模块、dail模块、rplx模块和peer模块;最后一层是顶层eth中对逻辑层方法的包装(eth.peer)以及ProtocolManger(连接逻辑层和顶层的桥梁)。
2、p2p网络的启动从Node模块开始,Node.Start()函数开启了两个任务,一个是初始化Ethereum对象,同时初始化ProtocolManager,最后将Ethereum服务注册到node中;另一个是启动p2p.Server,新建并刷新K桶,开启UDP端口监听,同时监听TCP端口,处理从远端节点发来的message。
3、UDP用于K桶维护过程中的节点查找,而TCP则是用于实际实现peer的连接。
4、peer的连接通过dialTask来实现,dialTask会将连接配置发送到Server.run的循环中,调用newPeer生成新的peer,同时让对这个peer开启pingLoop和readLoop两个协程在两个节点之间读取和发送请求。
5、最后handleManager将会验证这两个节点(handshake),将可用的peer设置为可信的,并放在peerset中。
下一章:以太坊源码解读(10)广播和同步 protocolManager及其handle()方法
前面提到了ProtocolManager,从字面上看是协议管理器,负责着p2p通信协议的管理。它连接了p2p的逻辑层peer与顶层peer之间的调用, ...