以太坊源码解读(3)以太坊启动流程
启动命令:
geth --identity "TestNode1" --datadir "data0" --rpc --rpcapi "db,eth,net,web3" --port "30303" --networkid "29382" --ws --wsorigins="*" --rpccorsdomain="*" console
启动后,我们可以从日志来分析程序启动的流程。
INFO [10-29|12:27:11.737] Maximum peer count INFO [10-29|12:27:11.747] Starting peer-to-peer node // node/node.go INFO [10-29|12:27:11.747] Allocated cache and file handles // INFO [10-29|12:27:11.763] Initialised chain configuration // eth/backend.go INFO [10-29|12:27:11.763] Disk storage enabled for ethash caches // consensus/ethash/ethash.go INFO [10-29|12:27:11.763] Disk storage enabled for ethash DAGs INFO [10-29|12:27:11.763] Initialising Ethereum protocol WARN [10-29|12:27:11.765] Head state missing, repairing chain INFO [10-29|12:27:11.775] Rewound blockchain to past state INFO [10-29|12:27:11.776] Loaded most recent local header INFO [10-29|12:27:11.776] Loaded most recent local full block INFO [10-29|12:27:11.776] Loaded most recent local fast block INFO [10-29|12:27:11.776] Loaded local transaction journal INFO [10-29|12:27:11.777] Regenerated local transaction journal WARN [10-29|12:27:11.777] Blockchain not empty, fast sync disabled INFO [10-29|12:27:11.777] Starting P2P networking INFO [10-29|12:27:13.928] Mapped network port INFO [10-29|12:27:13.990] UDP listener up INFO [10-29|12:27:13.991] RLPx listener up INFO [10-29|12:27:13.995] IPC endpoint opened INFO [10-29|12:27:13.995] HTTP endpoint opened INFO [10-29|12:27:13.996] WebSocket endpoint opened INFO [10-29|12:27:14.001] Mapped network port Welcome to the Geth JavaScript console!
启动流程图
一、启动的main函数 cmd/geth/main.go
Go里面有两个保留的函数:init函数(能够应用于所有的package)和main函数(只能应用于package main)。这两个函数在定义时不能有任何的参数和返回值。
在cmd/geth/main.go中,首先定义app:
cmd/geth/main.go var ( // Git SHA1 commit hash of the release (set via linker flags) gitCommit = "" // The app that holds all commands and flags. app = utils.NewApp(gitCommit, "the go-ethereum command line interface") // flags nodeFlags = []cli.Flag{ utils.IdentityFlag, // 所有这些flag都来自cmd/utils模块中 ... } rpcFlags = []cli.Flag{...} consoleFlags = []cli.Flag{...} // 来自cmd/geth/consolecmd.go whisperFlags = []cli.Flag{...} metricsFlafs = []cli.Flag{...} )
然后通过init()函数来初始化app,其中app.Action表示如果用户没有输入其他的子命令的情况下,会调用这个字段指向的函数,即geth()。
geth的命令使用了urfave/cli这个库,这个库是go语言命令行程序常用的库,它把命令行解析的过程做了一下封装,抽象出flag/command/subcommand这些模块,用户只需要提供一些模块的配置,参数的解析和关联在库内部完成,帮助信息也可以自动生成。
app.Flags和app.Commands分别设置了支持的[option]和[command],是当从用户输入命令解析出相应的参数后指向特定的函数并执行。这里先不做介绍,后面以console的启动为例介绍这一部分的原理。
func init() { // Initialize the CLI app and start Geth app.Action = geth app.HideVersion = true // we have a command to print the version app.Copyright = "Copyright 2013-2018 The go-ethereum Authors" // 所有能够支持的子命令 app.Commands = []cli.Command{ // See chaincmd.go: initCommand, ... // See monitorcmd.go: // See accountcmd.go: // See consolecmd.go: // See misccmd.go: // See config.go } sort.Sort(cli.CommandsByName(app.Commands)) // 所有能够解析的Options app.Flags = append(app.Flags, nodeFlags...) app.Flags = append(app.Flags, rpcFlags...) app.Flags = append(app.Flags, consoleFlags...) app.Flags = append(app.Flags, debug.Flags...) app.Flags = append(app.Flags, whisperFlags...) app.Flags = append(app.Flags, metricsFlags...) app.Before = func(ctx *cli.Context) error { ... } app.After = func(ctx *cli.Context) error { debug.Exit() console.Stdin.Close() // Resets terminal mode. return nil } }
通过上面的代码就把我们解析用户命令的对象设置完成了,下一步就是执行app.Run()。
func main() { if err := app.Run(os.Args); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } }
在以太坊客户端geth中,如果什么命令都不输入直接运行geth, 就会默认启动一个全节点模式的节点,连接到主网络。这时候就是按照上面所说的,启动了geth()函数:
cmd/geth/main.go func geth(ctx *cli.Context) error { if args := ctx.Args(); len(args) > 0 { return fmt.Errorf("invalid command: %q", args[0]) } node := makeFullNode(ctx) // 定义全节点对象 startNode(ctx, node) // 启动全节点 node.Wait() return nil }
二、全节点配置
在cmd/geth/main.go中有一个startNode()函数用来启动全节点,首先调用cmd/geth/config.go中的makeFullNode()函数:
func makeFullNode(ctx *cli.Context) *node.Node { stack, cfg := makeConfigNode(ctx) // 进行节点配置 utils.RegisterEthService(stack, &cfg.Eth) // 注册eth服务 if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) { utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit) } // Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode shhEnabled := enableWhisper(ctx) shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name) if shhEnabled || shhAutoEnabled { if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) { cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name)) } if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) { cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name) } utils.RegisterShhService(stack, &cfg.Shh) } // Add the Ethereum Stats daemon if requested. if cfg.Ethstats.URL != "" { utils.RegisterEthStatsService(stack, cfg.Ethstats.URL) } return stack }
可以看出,makeFullNode首先通过makeConfigNode(ctx) 对节点进行配置,包括Eth、Shh、Node、Dashboard,返回Node和geth配置,然后开启两条路线:1、通过Node.Start()——>Server.Start()启动p2p服务;2、通过RegisterEthService将Ethereum服务注册到Node的services map[reflect.Type]Service中,通过Node.Start()来启动Ethereum服务。
func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) { // Load defaults. cfg := gethConfig{ Eth: eth.DefaultConfig, Shh: whisper.DefaultConfig, Node: defaultNodeConfig(), Dashboard: dashboard.DefaultConfig, } // Load config file. if file := ctx.GlobalString(configFileFlag.Name); file != "" { if err := loadConfig(file, &cfg); err != nil { utils.Fatalf("%v", err) } } // Apply flags. utils.SetNodeConfig(ctx, &cfg.Node) stack, err := node.New(&cfg.Node) if err != nil { utils.Fatalf("Failed to create the protocol stack: %v", err) } utils.SetEthConfig(ctx, stack, &cfg.Eth) if ctx.GlobalIsSet(utils.EthStatsURLFlag.Name) { cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name) } utils.SetShhConfig(ctx, stack, &cfg.Shh) utils.SetDashboardConfig(ctx, &cfg.Dashboard) return stack, cfg }
三、注册ETH服务
makeFullNode()函数里在做好节点配置之后,再调用cmd/utils/flag.go里RegisterEthService()函数注册eth服务,而这个函数是调用了node/node.go模块中的Register()方法 ,这个方法的参数是一个构造函数(constructor),正如下面的代码所示:
cmd/utils/flag.go func RegisterEthService(stack *node.Node, cfg *eth.Config) { var err error if cfg.SyncMode == downloader.LightSync { err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { return les.New(ctx, cfg) }) } else { err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { fullNode, err := eth.New(ctx, cfg) if fullNode != nil && cfg.LightServ > 0 { ls, _ := les.NewLesServer(fullNode, cfg) fullNode.AddLesServer(ls) } return fullNode, err }) } if err != nil { Fatalf("Failed to register the Ethereum service: %v", err) } }
这个函数里会判断同步的方式 ,如果是LightSync则会使用les.New()创建轻节点,否则就使用eth.New()创建全节点,这里我们还是建立全节点,即调用eth.New()方法。
eth/backend.go func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { ... chainDb, err := CreateDB(ctx, config, "chaindata") ... chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { return nil, genesisErr } log.Info("Initialised chain configuration", "config", chainConfig) eth := &Ethereum{ config: config, chainDb: chainDb, chainConfig: chainConfig, eventMux: ctx.EventMux, accountManager: ctx.AccountManager, engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb), shutdownChan: make(chan bool), networkID: config.NetworkId, gasPrice: config.GasPrice, etherbase: config.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms), } log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId) ... var ( vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout} ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { log.Warn("Rewinding chain to upgrade configuration", "err", compat) eth.blockchain.SetHead(compat.RewindTo) rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig) } eth.bloomIndexer.Start(eth.blockchain) if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err } eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner.SetExtra(makeExtraData(config.ExtraData)) eth.APIBackend = &EthAPIBackend{eth, nil} gpoParams := config.GPO if gpoParams.Default == nil { gpoParams.Default = config.GasPrice } eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) return eth, nil }
总体上看来,这个函数主要是执行了以下几步:
1. 如果config.SyncMode 是 downloader.LightSync,走的是les/backend.go的初始化方法。 2. chainDb, err := CreateDB(ctx, config, "chaindata")打开leveldb,leveldb是eth存储数据库。 3. stopDbUpgrade := upgradeDeduplicateData(chainDb) 检查chainDb版本,如果需要的话,启动后台进程进行升级。 4. chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)装载创世区块。 根据节点条件判断是从数据库里面读取,还是从默认配置文件读取,还是从自定义配置文件读取,或者是从代码里面获取默认值。并返回区块链的config和创世块的hash。 5. 装载Etherum struct的各个成员。eventMux和accountManager 是Node 启动 eth service的时候传入的。eventMux可以认为是一个全局的事件多路复用器,accountManager认为是一个全局的账户管理器。engine创建共识引擎。etherbase 配置此Etherum的主账号地址。初始化bloomRequests 通道和bloom过滤器。 6. 判断客户端版本号和数据库版本号是否一致 7. eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) 初始化eth的blockchain,也就是eth的区块链 8. eth.blockchain.SetHead(compat.RewindTo) 根据创始区块设置区块头 9. eth.bloomIndexer.Start(eth.blockchain)启动bloomIndexer 10. eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) 初始化eth 区块链的交易池,存储本地生产的和P2P网络同步过来的交易。 11. eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb)初始化以太坊协议管理器,用于区块链P2P通讯 12. miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) 初始化矿工 13. eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) 创建预言最新gasprice的预言机。
ethereum会作为protocol注册到node中,随着node.Start()的执行而启动ethereum.Start()。
func (s *Ethereum) Start(srvr *p2p.Server) error { // Start the bloom bits servicing goroutines s.startBloomHandlers() // Start the RPC service s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) // Figure out a max peers count based on the server limits maxPeers := srvr.MaxPeers if s.config.LightServ > 0 { if s.config.LightPeers >= srvr.MaxPeers { return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers) } maxPeers -= s.config.LightPeers } // Start the networking layer and the light server if requested s.protocolManager.Start(maxPeers) if s.lesServer != nil { s.lesServer.Start(srvr) } return nil }
四、启动P2P网络
回到cmd/geth/main.go,在节点配置完成后执行startNode()函数。startNode()的主要功能有:
1、启动node; 2、解锁账户; 3、开启钱包事件监听;
func startNode(ctx *cli.Context, stack *node.Node) { debug.Memsize.Add("node", stack) // Start up the node itself utils.StartNode(stack) // Unlock any account specifically requested ks := stack.AccountManager().Backends(keystore.KeyStoreType)[0].(*keystore.KeyStore) passwords := utils.MakePasswordList(ctx) unlocks := strings.Split(ctx.GlobalString(utils.UnlockedAccountFlag.Name), ",") for i, account := range unlocks { if trimmed := strings.TrimSpace(account); trimmed != "" { unlockAccount(ctx, ks, trimmed, i, passwords) } } // Register wallet event handlers to open and auto-derive wallets events := make(chan accounts.WalletEvent, 16) stack.AccountManager().Subscribe(events) go func() { // Create a chain state reader for self-derivation rpcClient, err := stack.Attach() if err != nil { utils.Fatalf("Failed to attach to self: %v", err) } stateReader := ethclient.NewClient(rpcClient) // Open any wallets already attached for _, wallet := range stack.AccountManager().Wallets() { if err := wallet.Open(""); err != nil { log.Warn("Failed to open wallet", "url", wallet.URL(), "err", err) } } // Listen for wallet event till termination for event := range events { switch event.Kind { case accounts.WalletArrived: if err := event.Wallet.Open(""); err != nil { log.Warn("New wallet appeared, failed to open", "url", event.Wallet.URL(), "err", err) } case accounts.WalletOpened: status, _ := event.Wallet.Status() log.Info("New wallet appeared", "url", event.Wallet.URL(), "status", status) derivationPath := accounts.DefaultBaseDerivationPath if event.Wallet.URL().Scheme == "ledger" { derivationPath = accounts.DefaultLedgerBaseDerivationPath } event.Wallet.SelfDerive(derivationPath, stateReader) case accounts.WalletDropped: log.Info("Old wallet dropped", "url", event.Wallet.URL()) event.Wallet.Close() } } }() // Start auxiliary services if enabled if ctx.GlobalBool(utils.MiningEnabledFlag.Name) || ctx.GlobalBool(utils.DeveloperFlag.Name) { // Mining only makes sense if a full Ethereum node is running if ctx.GlobalString(utils.SyncModeFlag.Name) == "light" { utils.Fatalf("Light clients do not support mining") } var ethereum *eth.Ethereum if err := stack.Service(ðereum); err != nil { utils.Fatalf("Ethereum service not running: %v", err) } // Use a reduced number of threads if requested threads := ctx.GlobalInt(utils.MinerLegacyThreadsFlag.Name) if ctx.GlobalIsSet(utils.MinerThreadsFlag.Name) { threads = ctx.GlobalInt(utils.MinerThreadsFlag.Name) } if threads > 0 { type threaded interface { SetThreads(threads int) } if th, ok := ethereum.Engine().(threaded); ok { th.SetThreads(threads) } } // Set the gas price to the limits from the CLI and start mining gasprice := utils.GlobalBig(ctx, utils.MinerLegacyGasPriceFlag.Name) if ctx.IsSet(utils.MinerGasPriceFlag.Name) { gasprice = utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name) } ethereum.TxPool().SetGasPrice(gasprice) if err := ethereum.StartMining(true); err != nil { utils.Fatalf("Failed to start mining: %v", err) } } }
在上面的代码中,通过cmd/utils/cmd.go中的StartNode()函数,调用node/node.go中的Start()方法,启动节点。在这个方法中,首先判断节点是否已经在运行,然后要对p2p服务进行初始化,最后构建p2p.Server对象,执行该对象的Start()方法,使p2p服务启动起来。
func (n *Node) Start() error { n.lock.Lock() defer n.lock.Unlock() // Short circuit if the node's already running if n.server != nil { return ErrNodeRunning } if err := n.openDataDir(); err != nil { return err } // 初始化p2p服务,配置serverConfig,并以此穿件p2p.Server实例 n.serverConfig = n.config.P2P n.serverConfig.PrivateKey = n.config.NodeKey() n.serverConfig.Name = n.config.NodeName() n.serverConfig.Logger = n.log if n.serverConfig.StaticNodes == nil { n.serverConfig.StaticNodes = n.config.StaticNodes() } if n.serverConfig.TrustedNodes == nil { n.serverConfig.TrustedNodes = n.config.TrustedNodes() } if n.serverConfig.NodeDatabase == "" { n.serverConfig.NodeDatabase = n.config.NodeDB() } running := &p2p.Server{Config: n.serverConfig} n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name) // Otherwise copy and specialize the P2P configuration services := make(map[reflect.Type]Service) for _, constructor := range n.serviceFuncs { // Create a new context for the particular service ctx := &ServiceContext{ config: n.config, services: make(map[reflect.Type]Service), EventMux: n.eventmux, AccountManager: n.accman, } for kind, s := range services { // copy needed for threaded access ctx.services[kind] = s } // Construct and save the service service, err := constructor(ctx) if err != nil { return err } kind := reflect.TypeOf(service) if _, exists := services[kind]; exists { return &DuplicateServiceError{Kind: kind} } services[kind] = service } // Gather the protocols and start the freshly assembled P2P server for _, service := range services { running.Protocols = append(running.Protocols, service.Protocols()...) } if err := running.Start(); err != nil { return convertFileLockError(err) } // Start each of the services started := []reflect.Type{} for kind, service := range services { // Start the next service, stopping all previous upon failure if err := service.Start(running); err != nil { for _, kind := range started { services[kind].Stop() } running.Stop() return err } // Mark the service started for potential cleanup started = append(started, kind) } // Lastly start the configured RPC interfaces if err := n.startRPC(services); err != nil { for _, service := range services { service.Stop() } running.Stop() return err } // Finish initializing the startup n.services = services n.server = running n.stop = make(chan struct{}) return nil }
至此,以太坊的启动流程就完成了。
下一章:以太坊源码解读(4)Block类和储存
一、Block类 type Block struct { /******header*******/ header *Header /******header*******/ / ...