以太坊源码解读(3)以太坊启动流程

启动命令:

geth --identity "TestNode1" --datadir "data0" --rpc --rpcapi "db,eth,net,web3" --port "30303" --networkid "29382" --ws --wsorigins="*" --rpccorsdomain="*" console

启动后,我们可以从日志来分析程序启动的流程。

INFO [10-29|12:27:11.737] Maximum peer count
INFO [10-29|12:27:11.747] Starting peer-to-peer node  // node/node.go
INFO [10-29|12:27:11.747] Allocated cache and file handles  // 
INFO [10-29|12:27:11.763] Initialised chain configuration   // eth/backend.go
INFO [10-29|12:27:11.763] Disk storage enabled for ethash caches // consensus/ethash/ethash.go
INFO [10-29|12:27:11.763] Disk storage enabled for ethash DAGs 
INFO [10-29|12:27:11.763] Initialising Ethereum protocol
WARN [10-29|12:27:11.765] Head state missing, repairing chain 
INFO [10-29|12:27:11.775] Rewound blockchain to past state
INFO [10-29|12:27:11.776] Loaded most recent local header
INFO [10-29|12:27:11.776] Loaded most recent local full block
INFO [10-29|12:27:11.776] Loaded most recent local fast block
INFO [10-29|12:27:11.776] Loaded local transaction journal
INFO [10-29|12:27:11.777] Regenerated local transaction journal
WARN [10-29|12:27:11.777] Blockchain not empty, fast sync disabled
INFO [10-29|12:27:11.777] Starting P2P networking
INFO [10-29|12:27:13.928] Mapped network port 
INFO [10-29|12:27:13.990] UDP listener up                         
INFO [10-29|12:27:13.991] RLPx listener up                        
INFO [10-29|12:27:13.995] IPC endpoint opened              
INFO [10-29|12:27:13.995] HTTP endpoint opened 
INFO [10-29|12:27:13.996] WebSocket endpoint opened     
INFO [10-29|12:27:14.001] Mapped network port    
Welcome to the Geth JavaScript console!

启动流程图

一、启动的main函数 cmd/geth/main.go

Go里面有两个保留的函数:init函数(能够应用于所有的package)和main函数(只能应用于package main)。这两个函数在定义时不能有任何的参数和返回值。

在cmd/geth/main.go中,首先定义app:

cmd/geth/main.go

var (
    // Git SHA1 commit hash of the release (set via linker flags)
    gitCommit = ""
    // The app that holds all commands and flags.
    app = utils.NewApp(gitCommit, "the go-ethereum command line interface")
    // flags
    nodeFlags = []cli.Flag{
	utils.IdentityFlag,    // 所有这些flag都来自cmd/utils模块中
	...
    }
    rpcFlags = []cli.Flag{...}
    consoleFlags = []cli.Flag{...}  // 来自cmd/geth/consolecmd.go
    whisperFlags = []cli.Flag{...}
    metricsFlafs = []cli.Flag{...}
)

然后通过init()函数来初始化app,其中app.Action表示如果用户没有输入其他的子命令的情况下,会调用这个字段指向的函数,即geth()。

geth的命令使用了urfave/cli这个库,这个库是go语言命令行程序常用的库,它把命令行解析的过程做了一下封装,抽象出flag/command/subcommand这些模块,用户只需要提供一些模块的配置,参数的解析和关联在库内部完成,帮助信息也可以自动生成。

app.Flags和app.Commands分别设置了支持的[option]和[command],是当从用户输入命令解析出相应的参数后指向特定的函数并执行。这里先不做介绍,后面以console的启动为例介绍这一部分的原理。

func init() {
	// Initialize the CLI app and start Geth
	app.Action = geth
	app.HideVersion = true // we have a command to print the version
	app.Copyright = "Copyright 2013-2018 The go-ethereum Authors"
        // 所有能够支持的子命令
	app.Commands = []cli.Command{
		// See chaincmd.go:
		initCommand,
                ...
		// See monitorcmd.go:
		// See accountcmd.go:
		// See consolecmd.go:
		// See misccmd.go:
		// See config.go
	}
	sort.Sort(cli.CommandsByName(app.Commands))
    
        // 所有能够解析的Options
	app.Flags = append(app.Flags, nodeFlags...)
	app.Flags = append(app.Flags, rpcFlags...)
	app.Flags = append(app.Flags, consoleFlags...)
	app.Flags = append(app.Flags, debug.Flags...)
	app.Flags = append(app.Flags, whisperFlags...)
	app.Flags = append(app.Flags, metricsFlags...)

	app.Before = func(ctx *cli.Context) error {
           ...
	}

	app.After = func(ctx *cli.Context) error {
		debug.Exit()
		console.Stdin.Close() // Resets terminal mode.
		return nil
	}
}

通过上面的代码就把我们解析用户命令的对象设置完成了,下一步就是执行app.Run()。

func main() {
    if err := app.Run(os.Args); err != nil {
        fmt.Fprintln(os.Stderr, err)
        os.Exit(1)
    }
}

在以太坊客户端geth中,如果什么命令都不输入直接运行geth, 就会默认启动一个全节点模式的节点,连接到主网络。这时候就是按照上面所说的,启动了geth()函数:

cmd/geth/main.go

func geth(ctx *cli.Context) error {
    if args := ctx.Args(); len(args) > 0 {
	return fmt.Errorf("invalid command: %q", args[0])
    }
    node := makeFullNode(ctx)  // 定义全节点对象
    startNode(ctx, node)       // 启动全节点
    node.Wait()
    return nil
}

二、全节点配置

在cmd/geth/main.go中有一个startNode()函数用来启动全节点,首先调用cmd/geth/config.go中的makeFullNode()函数:

func makeFullNode(ctx *cli.Context) *node.Node {
	stack, cfg := makeConfigNode(ctx) // 进行节点配置

	utils.RegisterEthService(stack, &cfg.Eth)  // 注册eth服务

	if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
		utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
	}
	// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
	shhEnabled := enableWhisper(ctx)
	shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
	if shhEnabled || shhAutoEnabled {
		if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
			cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
		}
		if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
			cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
		}
		utils.RegisterShhService(stack, &cfg.Shh)
	}

	// Add the Ethereum Stats daemon if requested.
	if cfg.Ethstats.URL != "" {
		utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
	}
	return stack
}

可以看出,makeFullNode首先通过makeConfigNode(ctx) 对节点进行配置包括Eth、Shh、Node、Dashboard,返回Node和geth配置,然后开启两条路线:1、通过Node.Start()——>Server.Start()启动p2p服务;2、通过RegisterEthService将Ethereum服务注册到Node的services map[reflect.Type]Service中,通过Node.Start()来启动Ethereum服务。

func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
	// Load defaults.
	cfg := gethConfig{
		Eth:       eth.DefaultConfig,
		Shh:       whisper.DefaultConfig,
		Node:      defaultNodeConfig(),
		Dashboard: dashboard.DefaultConfig,
	}

	// Load config file.
	if file := ctx.GlobalString(configFileFlag.Name); file != "" {
		if err := loadConfig(file, &cfg); err != nil {
			utils.Fatalf("%v", err)
		}
	}

	// Apply flags.
	utils.SetNodeConfig(ctx, &cfg.Node)
	stack, err := node.New(&cfg.Node)
	if err != nil {
		utils.Fatalf("Failed to create the protocol stack: %v", err)
	}
	utils.SetEthConfig(ctx, stack, &cfg.Eth)
	if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) {
		cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
	}

	utils.SetShhConfig(ctx, stack, &cfg.Shh)
	utils.SetDashboardConfig(ctx, &cfg.Dashboard)

	return stack, cfg
}

三、注册ETH服务

makeFullNode()函数里在做好节点配置之后,再调用cmd/utils/flag.go里RegisterEthService()函数注册eth服务,而这个函数是调用了node/node.go模块中的Register()方法 ,这个方法的参数是一个构造函数(constructor),正如下面的代码所示:

cmd/utils/flag.go

func RegisterEthService(stack *node.Node, cfg *eth.Config) {
	var err error
	if cfg.SyncMode == downloader.LightSync {
		err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
			return les.New(ctx, cfg)
		})
	} else {
		err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
			fullNode, err := eth.New(ctx, cfg)
			if fullNode != nil && cfg.LightServ > 0 {
				ls, _ := les.NewLesServer(fullNode, cfg)
				fullNode.AddLesServer(ls)
			}
			return fullNode, err
		})
	}
	if err != nil {
		Fatalf("Failed to register the Ethereum service: %v", err)
	}
}

这个函数里会判断同步的方式 ,如果是LightSync则会使用les.New()创建轻节点,否则就使用eth.New()创建全节点,这里我们还是建立全节点,即调用eth.New()方法。

eth/backend.go

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    ...
    chainDb, err := CreateDB(ctx, config, "chaindata")
    ...
    chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
    if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
	return nil, genesisErr
    }
    log.Info("Initialised chain configuration", "config", chainConfig)

    eth := &Ethereum{
		config:         config,
		chainDb:        chainDb,
		chainConfig:    chainConfig,
		eventMux:       ctx.EventMux,
		accountManager: ctx.AccountManager,
		engine:         CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb),
		shutdownChan:   make(chan bool),
		networkID:      config.NetworkId,
		gasPrice:       config.GasPrice,
		etherbase:      config.Etherbase,
		bloomRequests:  make(chan chan *bloombits.Retrieval),
		bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
    }

    log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
    ...
    var (
	vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}
	cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout}
    )
    eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)
    if err != nil {
	return nil, err
    }
    // Rewind the chain in case of an incompatible config upgrade.
    if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
	log.Warn("Rewinding chain to upgrade configuration", "err", compat)
	eth.blockchain.SetHead(compat.RewindTo)
	rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)
    }
    eth.bloomIndexer.Start(eth.blockchain)

    if config.TxPool.Journal != "" {
	config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
    }
    eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)

    if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
	return nil, err
    }

    eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
    eth.miner.SetExtra(makeExtraData(config.ExtraData))

    eth.APIBackend = &EthAPIBackend{eth, nil}
    gpoParams := config.GPO
    if gpoParams.Default == nil {
	gpoParams.Default = config.GasPrice
    }
    eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)

    return eth, nil
}

总体上看来,这个函数主要是执行了以下几步:

1. 如果config.SyncMode 是 downloader.LightSync,走的是les/backend.go的初始化方法。 2. chainDb, err := CreateDB(ctx, config, "chaindata")打开leveldb,leveldb是eth存储数据库。 3. stopDbUpgrade := upgradeDeduplicateData(chainDb) 检查chainDb版本,如果需要的话,启动后台进程进行升级。 4. chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)装载创世区块。 根据节点条件判断是从数据库里面读取,还是从默认配置文件读取,还是从自定义配置文件读取,或者是从代码里面获取默认值。并返回区块链的config和创世块的hash。 5. 装载Etherum struct的各个成员。eventMux和accountManager 是Node 启动 eth service的时候传入的。eventMux可以认为是一个全局的事件多路复用器,accountManager认为是一个全局的账户管理器。engine创建共识引擎。etherbase 配置此Etherum的主账号地址。初始化bloomRequests 通道和bloom过滤器。 6. 判断客户端版本号和数据库版本号是否一致 7. eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) 初始化eth的blockchain,也就是eth的区块链 8. eth.blockchain.SetHead(compat.RewindTo) 根据创始区块设置区块头 9. eth.bloomIndexer.Start(eth.blockchain)启动bloomIndexer 10. eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) 初始化eth 区块链的交易池,存储本地生产的和P2P网络同步过来的交易。 11. eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb)初始化以太坊协议管理器,用于区块链P2P通讯 12. miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) 初始化矿工 13. eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) 创建预言最新gasprice的预言机。

ethereum会作为protocol注册到node中,随着node.Start()的执行而启动ethereum.Start()。

func (s *Ethereum) Start(srvr *p2p.Server) error {
    // Start the bloom bits servicing goroutines
    s.startBloomHandlers()

    // Start the RPC service
    s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())

    // Figure out a max peers count based on the server limits
    maxPeers := srvr.MaxPeers
    if s.config.LightServ > 0 {
        if s.config.LightPeers >= srvr.MaxPeers {
            return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
        }
        maxPeers -= s.config.LightPeers
    }
    // Start the networking layer and the light server if requested
    s.protocolManager.Start(maxPeers)
    if s.lesServer != nil {
        s.lesServer.Start(srvr)
    }
    return nil
}

四、启动P2P网络

回到cmd/geth/main.go,在节点配置完成后执行startNode()函数。startNode()的主要功能有:

1、启动node; 2、解锁账户; 3、开启钱包事件监听;

func startNode(ctx *cli.Context, stack *node.Node) {
	debug.Memsize.Add("node", stack)

	// Start up the node itself
	utils.StartNode(stack)

	// Unlock any account specifically requested
	ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore)

	passwords := utils.MakePasswordList(ctx)
	unlocks := strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name), ",")
	for i, account := range unlocks {
		if trimmed := strings.TrimSpace(account); trimmed != "" {
			unlockAccount(ctx, ks, trimmed, i, passwords)
		}
	}
	// Register wallet event handlers to open and auto-derive wallets
	events := make(chan accounts.WalletEvent, 16)
	stack.AccountManager().Subscribe(events)

	go func() {
		// Create a chain state reader for self-derivation
		rpcClient, err := stack.Attach()
		if err != nil {
			utils.Fatalf("Failed to attach to self: %v", err)
		}
		stateReader := ethclient.NewClient(rpcClient)

		// Open any wallets already attached
		for _, wallet := range stack.AccountManager().Wallets() {
			if err := wallet.Open(""); err != nil {
				log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err)
			}
		}
		// Listen for wallet event till termination
		for event := range events {
			switch event.Kind {
			case accounts.WalletArrived:
				if err := event.Wallet.Open(""); err != nil {
					log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err)
				}
			case accounts.WalletOpened:
				status, _ := event.Wallet.Status()
				log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status)

				derivationPath := accounts.DefaultBaseDerivationPath
				if event.Wallet.URL().Scheme == "ledger" {
					derivationPath = accounts.DefaultLedgerBaseDerivationPath
				}
				event.Wallet.SelfDerive(derivationPath, stateReader)

			case accounts.WalletDropped:
				log.Info("Old wallet dropped", "url", event.Wallet.URL())
				event.Wallet.Close()
			}
		}
	}()
	// Start auxiliary services if enabled
	if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) {
		// Mining only makes sense if a full Ethereum node is running
		if ctx.GlobalString(utils.SyncModeFlag.Name) == "light" {
			utils.Fatalf("Light clients do not support mining")
		}
		var ethereum *eth.Ethereum
		if err := stack.Service(&ethereum); err != nil {
			utils.Fatalf("Ethereum service not running: %v", err)
		}
		// Use a reduced number of threads if requested
		threads := ctx.GlobalInt(utils.MinerLegacyThreadsFlag.Name)
		if ctx.GlobalIsSet(utils.MinerThreadsFlag.Name) {
			threads = ctx.GlobalInt(utils.MinerThreadsFlag.Name)
		}
		if threads > 0 {
			type threaded interface {
				SetThreads(threads int)
			}
			if th, ok := ethereum.Engine().(threaded); ok {
				th.SetThreads(threads)
			}
		}
		// Set the gas price to the limits from the CLI and start mining
		gasprice := utils.GlobalBig(ctx, utils.MinerLegacyGasPriceFlag.Name)
		if ctx.IsSet(utils.MinerGasPriceFlag.Name) {
			gasprice = utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
		}
		ethereum.TxPool().SetGasPrice(gasprice)
		if err := ethereum.StartMining(true); err != nil {
			utils.Fatalf("Failed to start mining: %v", err)
		}
	}
}

在上面的代码中,通过cmd/utils/cmd.go中的StartNode()函数,调用node/node.go中的Start()方法,启动节点。在这个方法中,首先判断节点是否已经在运行,然后要对p2p服务进行初始化,最后构建p2p.Server对象,执行该对象的Start()方法,使p2p服务启动起来。

func (n *Node) Start() error {
	n.lock.Lock()
	defer n.lock.Unlock()

	// Short circuit if the node's already running
	if n.server != nil {
		return ErrNodeRunning
	}
	if err := n.openDataDir(); err != nil {
		return err
	}

	// 初始化p2p服务,配置serverConfig,并以此穿件p2p.Server实例
	n.serverConfig = n.config.P2P
	n.serverConfig.PrivateKey = n.config.NodeKey()
	n.serverConfig.Name = n.config.NodeName()
	n.serverConfig.Logger = n.log
	if n.serverConfig.StaticNodes == nil {
		n.serverConfig.StaticNodes = n.config.StaticNodes()
	}
	if n.serverConfig.TrustedNodes == nil {
		n.serverConfig.TrustedNodes = n.config.TrustedNodes()
	}
	if n.serverConfig.NodeDatabase == "" {
		n.serverConfig.NodeDatabase = n.config.NodeDB()
	}
	running := &p2p.Server{Config: n.serverConfig}
	n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)

	// Otherwise copy and specialize the P2P configuration
	services := make(map[reflect.Type]Service)
	for _, constructor := range n.serviceFuncs {
		// Create a new context for the particular service
		ctx := &ServiceContext{
			config:         n.config,
			services:       make(map[reflect.Type]Service),
			EventMux:       n.eventmux,
			AccountManager: n.accman,
		}
		for kind, s := range services { // copy needed for threaded access
			ctx.services[kind] = s
		}
		// Construct and save the service
		service, err := constructor(ctx)
		if err != nil {
			return err
		}
		kind := reflect.TypeOf(service)
		if _, exists := services[kind]; exists {
			return &DuplicateServiceError{Kind: kind}
		}
		services[kind] = service
	}
	// Gather the protocols and start the freshly assembled P2P server
	for _, service := range services {
		running.Protocols = append(running.Protocols, service.Protocols()...)
	}
	if err := running.Start(); err != nil {
		return convertFileLockError(err)
	}
	// Start each of the services
	started := []reflect.Type{}
	for kind, service := range services {
		// Start the next service, stopping all previous upon failure
		if err := service.Start(running); err != nil {
			for _, kind := range started {
				services[kind].Stop()
			}
			running.Stop()

			return err
		}
		// Mark the service started for potential cleanup
		started = append(started, kind)
	}
	// Lastly start the configured RPC interfaces
	if err := n.startRPC(services); err != nil {
		for _, service := range services {
			service.Stop()
		}
		running.Stop()
		return err
	}
	// Finish initializing the startup
	n.services = services
	n.server = running
	n.stop = make(chan struct{})

	return nil
}

至此,以太坊的启动流程就完成了。

下一章:以太坊源码解读(4)Block类和储存

一、Block类 type Block struct { /******header*******/ header *Header /******header*******/ / ...