以太坊源码分析 启动流程

以太坊入口代码位于 cmd/geth/main.go,先看一下 main 函数:

func main() {
    if err := app.Run(os.Args); err != nil {
        fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
}

这里有个疑问,为什么没有看到 app 的 flag 和 command 配置呢?我们了解一下Go语言的执行流程:

可以看到,main() 并不是真正意义上的入口,在初始化完常量和变量以后,会先调用模块的 init() 函数,然后才是 main() 函数,所以初始化的工作是在 init() 函数里完成的:

func init() {
    // Initialize the CLI app and start Geth
    app.Action = geth
    app.HideVersion = true // we have a command to print the version
    app.Copyright = "Copyright 2013-2018 The go-ethereum Authors"
    app.Commands = []cli.Command{
        // See chaincmd.go:
        initCommand,
        ……
    }
    sort.Sort(cli.CommandsByName(app.Commands))

    app.Flags = append(app.Flags, nodeFlags...)
    ……

    app.Before = func(ctx *cli.Context) error {
        runtime.GOMAXPROCS(runtime.NumCPU())
        if err := debug.Setup(ctx); err != nil {
            return err
        }
        // Start system runtime metrics collection
        go metrics.CollectProcessMetrics(3 * time.Second)

        utils.SetupNetwork(ctx)
        return nil
    }

    app.After = func(ctx *cli.Context) error {
        debug.Exit()
        console.Stdin.Close() // Resets terminal mode.
        return nil
    }
}

可以看到:app.Action=geth,如果没有添加任何command参数的话,主入口是geth()函数。

flag的配置代码位于cmd/utils/flags.go,command的配置代码跟main.go在同一个包中,分散在不同的文件里。

在进入主入口之前,app.Before 做了 3 件事情:

  • I. runtime.GOMAXPROCS():设置最大可用处理器数
  • II. metrics.CollectProcessMetrics():创建一个goroutine,每3秒监测一次系统的 ram 和 disk 状态
  • III. utils.SetupNetwork():配置gas limit值

下面开始看geth()函数:

func geth(ctx *cli.Context) error {
    node := makeFullNode(ctx)
    startNode(ctx, node)
    node.Wait()
    return nil
}

可以看到,主要做了3件事情:

  • I. 创建结点
  • II. 启动结点
  • III. 结点进入等待状态

1. 创建结点

func makeFullNode(ctx *cli.Context) *node.Node {
    stack, cfg := makeConfigNode(ctx)
    utils.RegisterEthService(stack, &cfg.Eth)
    ……
    return stack
}

中间一堆代码默认情况下不会走进去,先忽略~ 

所以这个函数就干了2件事:创建结点、注册EthService。

1.1 创建结点

先看makeConfigNode()函数:

func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
    // Load defaults.
    cfg := gethConfig{
        Eth:       eth.DefaultConfig,
        Shh:       whisper.DefaultConfig,
        Node:      defaultNodeConfig(),
        Dashboard: dashboard.DefaultConfig,
    }

    // Load config file.
    if file := ctx.GlobalString(configFileFlag.Name); file != "" {
        if err := loadConfig(file, &cfg); err != nil {
            utils.Fatalf("%v", err)
        }
    }

    // Apply flags.
    utils.SetNodeConfig(ctx, &cfg.Node)
    stack, err := node.New(&cfg.Node)
    if err != nil {
        utils.Fatalf("Failed to create the protocol stack: %v", err)
    }
    utils.SetEthConfig(ctx, stack, &cfg.Eth)
    if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {
        cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
    }

    utils.SetShhConfig(ctx, stack, &cfg.Shh)
    utils.SetDashboardConfig(ctx, &cfg.Dashboard)

    return stack, cfg
}

主要是初始化和加载一些配置,其中最重要的一行是通过node.New()创建结点,变量名叫stack,代表协议栈的含义。

先看一下结点的默认配置,代码位于node/defaults.go:

var DefaultConfig = Config{
    DataDir:          DefaultDataDir(),
    HTTPPort:         DefaultHTTPPort,
    HTTPModules:      []string{"net", "web3"},
    HTTPVirtualHosts: []string{"localhost"},
    WSPort:           DefaultWSPort,
    WSModules:        []string{"net", "web3"},
    P2P: p2p.Config{
        ListenAddr: ":30303",
        MaxPeers:   25,
        NAT:        nat.Any(),
    },
}

mac默认的datadir位于$HOME/Library/Ethereum,HTTP默认端口号8545,WebSocket默认端口号8546,P2P默认端口号30303,最大支持25个对等结点。

utils.SetNodeConfig()代码位于cmd/utils/flags.go,主要是检查有没有一些global的配置,如果有的话覆盖掉刚刚的默认配置。代码比较简单就不分析了。

接下来就是调用node.New()创建结点了,代码位于node/node.go:

func New(conf *Config) (*Node, error) {
    // Copy config and resolve the datadir so future changes to the current
    // working directory don't affect the node.
    confCopy := *conf
    conf = &confCopy
    if conf.DataDir != "" {
        absdatadir, err := filepath.Abs(conf.DataDir)
        if err != nil {
            return nil, err
        }
        conf.DataDir = absdatadir
    }

    ……

    // Ensure that the AccountManager method works before the node has started.
    // We rely on this in cmd/geth.
    am, ephemeralKeystore, err := makeAccountManager(conf)
    ……

    // Note: any interaction with Config that would create/touch files
    // in the data directory or instance directory is delayed until Start.
    return &Node{
        accman:            am,
        ephemeralKeystore: ephemeralKeystore,
        config:            conf,
        serviceFuncs:      []ServiceConstructor{},
        ipcEndpoint:       conf.IPCEndpoint(),
        httpEndpoint:      conf.HTTPEndpoint(),
        wsEndpoint:        conf.WSEndpoint(),
        eventmux:          new(event.TypeMux),
        log:               conf.Logger,
    }, nil
}

可以看出,主要做了3件事:

I. 把datadir转成绝对路径

II. 调用makeAccountManager()初始化账号管理器

III. 创建一个Node实例并返回

关于账号管理系统后面会专门写一篇文章分析,这里就先略过了。

至此,我们获得了一个Node实例,makeConfigNode()函数就分析完了。

看一下Node结构中的一些重要成员:

I. accman:刚刚创建的账号管理器实例

II. config:创建该结点使用的配置

III. serviceFuncs:一个函数指针数组,保存所有注册Service的构造函数

那么Service是一个什么样的概念呢?先看一下构造函数的原型:

type ServiceConstructor func(ctx *ServiceContext) (Service, error)

出现了两个新类型:ServiceContext和Service。先看一下ServiceContext的定义:

// ServiceContext is a collection of service independent options inherited from
// the protocol stack, that is passed to all constructors to be optionally used;
// as well as utility methods to operate on the service environment.
type ServiceContext struct {
    config         *Config
    services       map[reflect.Type]Service // Index of the already constructed services
    EventMux       *event.TypeMux           // Event multiplexer used for decoupled notifications
    AccountManager *accounts.Manager        // Account manager created by the node.
}

从注释可以看出,ServiceContext主要是存储了一些从结点(或者叫协议栈)那里继承过来的、和具体Service无关的一些信息,比如结点config、account manager等。其中有一个services字段保存了当前正在运行的所有Service,是一个map类型,key是Service的类型,value是Service实例。

接下来看一下Service的定义:

type Service interface {
    // Protocols retrieves the P2P protocols the service wishes to start.
    Protocols() []p2p.Protocol

    // APIs retrieves the list of RPC descriptors the service provides
    APIs() []rpc.API

    // Start is called after all services have been constructed and the networking
    // layer was also initialized to spawn any goroutines required by the service.
    Start(server *p2p.Server) error

    // Stop terminates all goroutines belonging to the service, blocking until they
    // are all terminated.
    Stop() error
}

Service是一个接口,定义了4个需要实现的函数。换句话说,任何实现了这4个方法的类型,都可以称之为一个Service。

1.2 注册eth Service

接下来分析utils.RegisterEthService()方法:(cmd/utils/flags.go)

func RegisterEthService(stack *node.Node, cfg *eth.Config) {
    var err error
    if cfg.SyncMode == downloader.LightSync {
        err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
            return les.New(ctx, cfg)
        })
    } else {
        err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
            fullNode, err := eth.New(ctx, cfg)
            if fullNode != nil && cfg.LightServ > 0 {
                ls, _ := les.NewLesServer(fullNode, cfg)
                fullNode.AddLesServer(ls)
            }
            return fullNode, err
        })
    }
    if err != nil {
        Fatalf("Failed to register the Ethereum service: %v", err)
    }
}

这里有2个分支,如果是配置成轻量级结点会进上面那个分支,如果是全结点会进else分支。然后调用Node的Register()方法注册Service:(node/node.go)

func (n *Node) Register(constructor ServiceConstructor) error {
    n.lock.Lock()
    defer n.lock.Unlock()

    if n.server != nil {
        return ErrNodeRunning
    }
    n.serviceFuncs = append(n.serviceFuncs, constructor)
    return nil
}

看到了吧?所谓的注册,其实就是把Service的构造函数放进结点的serviceFuncs数组。

这里只是注册,具体要等到启动结点的时候才真正调用构造函数创建Service。

2. 启动结点

我们回到cmd/geth/main.go分析下一个函数startNode():

func startNode(ctx *cli.Context, stack *node.Node) {
    // Start up the node itself
    utils.StartNode(stack)

    // Subscribe and process account related events
    ……
}

后面一部分订阅和处理账号event相关的代码先忽略,看一下utils.StartNode()函数,代码位于cmd/utils/cmd.go:

func StartNode(stack *node.Node) {
    if err := stack.Start(); err != nil {
        Fatalf("Error starting protocol stack: %v", err)
    }
    go func() {
        sigc := make(chan os.Signal, 1)
        signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
        defer signal.Stop(sigc)
        <-sigc
        log.Info("Got interrupt, shutting down...")
        go stack.Stop()
        for i := 10; i > 0; i-- {
            <-sigc
            if i > 1 {
                log.Warn("Already shutting down, interrupt more to panic.", "times", i-1)
            }
        }
        debug.Exit() // ensure trace and CPU profile data is flushed.
        debug.LoudPanic("boom")
    }()
}

后半段的goroutine主要是为了捕获中断信号以停止结点运行的,所以这里实际就是调用了Node的Start()函数。这个函数比较长,我们分成几段来看:

I. 创建P2P server

II. 创建Service

III. 启动P2P server

IV. 启动Service

V. 启动RPC server

2.1 创建P2P server

以太坊是一个去中心化的平台,所以首要任务是创建P2P server:

func (n *Node) Start() error {
    ……
    n.lock.Lock()
    defer n.lock.Unlock()

    // Short circuit if the node's already running
    if n.server != nil {
        return ErrNodeRunning
    }
    if err := n.openDataDir(); err != nil {
        return err
    }

    // Initialize the p2p server. This creates the node key and
    // discovery databases.
    n.serverConfig = n.config.P2P
    n.serverConfig.PrivateKey = n.config.NodeKey()
    n.serverConfig.Name = n.config.NodeName()
    n.serverConfig.Logger = n.log
    if n.serverConfig.StaticNodes == nil {
        n.serverConfig.StaticNodes = n.config.StaticNodes()
    }
    if n.serverConfig.TrustedNodes == nil {
        n.serverConfig.TrustedNodes = n.config.TrustedNodes()
    }
    if n.serverConfig.NodeDatabase == "" {
        n.serverConfig.NodeDatabase = n.config.NodeDB()
    }
    running := &p2p.Server{Config: n.serverConfig}
    n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

    ……
}

代码首先做了一些检查工作:加锁、判断结点是否已经运行、检查datadir是否可以打开,然后初始化P2P server配置,最后用该配置创建了一个p2p.Server实例。

2.2 创建Service

    // Otherwise copy and specialize the P2P configuration
    services := make(map[reflect.Type]Service)
    for _, constructor := range n.serviceFuncs {
        // Create a new context for the particular service
        ctx := &ServiceContext{
            config:         n.config,
            services:       make(map[reflect.Type]Service),
            EventMux:       n.eventmux,
            AccountManager: n.accman,
        }
        for kind, s := range services { // copy needed for threaded access
            ctx.services[kind] = s
        }
        // Construct and save the service
        service, err := constructor(ctx)
        if err != nil {
            return err
        }
        kind := reflect.TypeOf(service)
        if _, exists := services[kind]; exists {
            return &DuplicateServiceError{Kind: kind}
        }
        services[kind] = service
    }

首先初始化Node中的services字段,然后遍历serviceFuncs,也就是之前注册的所有Service的构造函数列表。在创建Service实例之前,先为每个Service创建一个ServiceContext,之前提到过,ServiceContext里存储的是从Node继承过来的一些信息。接着通过构造函数创建Service实例,然后加入到service这个map中。

2.3 启动P2P server

    // Gather the protocols and start the freshly assembled P2P server
    for _, service := range services {
        running.Protocols = append(running.Protocols, service.Protocols()...)
    }
    if err := running.Start(); err != nil {
        return convertFileLockError(err)
    }

首先把所有Service支持的协议集合到一起,然后调用p2p.Server的Start()方法启动P2P server(代码位于p2p/server.go)。P2P server会绑定一个UDP端口和一个TCP端口,端口号是相同的(默认30303)。UDP端口主要用于结点发现,TCP端口主要用于业务数据传输,基于RLPx加密传输协议。所以具体来说,Start()方法做了以下几件事情:

I. 侦听UDP端口:用于结点发现

II. 发起UDP请求获取结点表:内部会启动goroutine来完成

III. 侦听TCP端口:用于业务数据传输,基于RLPx协议

IV. 发起TCP请求连接到其他结点:也是启动goroutine完成

相对应的代码如下所示:

func (srv *Server) Start() (err error) {
    ……
    srv.running = true
    ……

    // 侦听UDP端口(用于结点发现)
    if !srv.NoDiscovery || srv.DiscoveryV5 {
        addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
        if err != nil {
            return err
        }
        conn, err = net.ListenUDP("udp", addr)
        if err != nil {
            return err
        }
        realaddr = conn.LocalAddr().(*net.UDPAddr)
        if srv.NAT != nil {
            if !realaddr.IP.IsLoopback() {
                go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
            }
            // TODO: react to external IP changes over time.
            if ext, err := srv.NAT.ExternalIP(); err == nil {
                realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
            }
        }
    }

    ……
    // 发起UDP请求获取结点表(内部会启动goroutine)
    if !srv.NoDiscovery {
        cfg := discover.Config{
            PrivateKey:   srv.PrivateKey,
            AnnounceAddr: realaddr,
            NodeDBPath:   srv.NodeDatabase,
            NetRestrict:  srv.NetRestrict,
            Bootnodes:    srv.BootstrapNodes,
            Unhandled:    unhandled,
        }
        ntab, err := discover.ListenUDP(conn, cfg)
        if err != nil {
            return err
        }
        srv.ntab = ntab
    }

    ……
    dynPeers := srv.maxDialedConns()
    dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

    ……
    // 侦听TCP端口(用于业务数据传输,基于RLPx协议)
    if srv.ListenAddr != "" {
        if err := srv.startListening(); err != nil {
            return err
        }
    }

    ……
    // 启动新线程发起TCP连接请求
    go srv.run(dialer)
    ……
}

2.4 启动Service

    // Start each of the services
    started := []reflect.Type{}
    for kind, service := range services {
        // Start the next service, stopping all previous upon failure
        if err := service.Start(running); err != nil {
            for _, kind := range started {
                services[kind].Stop()
            }
            running.Stop()

            return err
        }
        // Mark the service started for potential cleanup
        started = append(started, kind)
    }

主要就是依次调用每个Service的Start()方法,然后把启动的Service的类型存储到started表中。另外如果启动过程中发现某个Service之前已经启动过了,则返回错误。

2.5 启动RPC server

    // Lastly start the configured RPC interfaces
    if err := n.startRPC(services); err != nil {
        for _, service := range services {
            service.Stop()
        }
        running.Stop()
        return err
    }

RPC即远程调用接口,也就是Service对外暴露出来的API。具体调用方式可以分为以下几种:

I. InProc:进程内调用,严格来说这种不能算是RPC,不过出于架构上的统一,以太坊也为这种调用方式配置了一个handler

II. IPC:进程间调用,通过Unix Domain Socket(datadir/geth.ipc)

III. HTTP:通过HTTP协议调用

IV. WS:通过WebSocket调用

接下来看下startRPC()函数就比较清楚了,主要就是启动这几项RPC服务。每种RPC服务都需要提供一个handler,另外除了InProc之外,其他3种服务还需要启动一个server来监听外部连接请求。RPC具体实现细节留待后面的文章分析。代码如下:

func (n *Node) startRPC(services map[reflect.Type]Service) error {
    // Gather all the possible APIs to surface
    apis := n.apis()
    for _, service := range services {
        apis = append(apis, service.APIs()...)
    }
    // Start the various API endpoints, terminating all in case of errors
    if err := n.startInProc(apis); err != nil {
        return err
    }
    if err := n.startIPC(apis); err != nil {
        n.stopInProc()
        return err
    }
    if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil {
        n.stopIPC()
        n.stopInProc()
        return err
    }
    if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
        n.stopHTTP()
        n.stopIPC()
        n.stopInProc()
        return err
    }
    // All API endpoints started successfully
    n.rpcAPIs = apis
    return nil
}

到这里结点启动的流程就分析完了。

3. 结点进入等待状态

其实就是让主线程进入阻塞状态,保持进程不退出,直到从channel中收到stop消息。

具体就是调用Node的Wait()函数:

func (n *Node) Wait() {
    n.lock.RLock()
    if n.server == nil {
        n.lock.RUnlock()
        return
    }
    stop := n.stop
    n.lock.RUnlock()

    <-stop
}

至此,以太坊的启动代码就走读完了,总结一下:

  • I. 以太坊启动主要做了3件事:创建结点、启动结点、结点进入等待状态
  • II. 创建结点过程主要做了2件事:根据配置创建Node实例、注册eth Service
  • III. 启动结点过程主要做了5件事:创建P2P server、创建Service、启动P2P server、启动Service、启动RPC server
  • IV. 最后结点进入等待状态,等待退出

下一章:以太坊命令行库 urfave/cli

使用 Go 语言编写命令行程序经常会使用了urfave/cli这个库,比如以太坊软件 geth。在 C 语言中,我们需要根据 argc/argv 解析命令行参数,调用不同的函数,最后还要写一个 usag ...