以太坊源码解读(5)BlockChain类和NewBlockChain
一、blockchain的数据结构
type BlockChain struct { chainConfig *params.ChainConfig // 初始化配置 cacheConfig *CacheConfig // 缓存配置 db ethdb.Database // 底层的数据储存 triegc *prque.Prque gcproc time.Duration hc *HeaderChain // 区块头组成的链 rmLogsFeed event.Feed chainFeed event.Feed chainSideFeed event.Feed chainHeadFeed event.Feed logsFeed event.Feed scope event.SubscriptionScope genesisBlock *types.Block // 创始区块 mu sync.RWMutex // 锁 chainmu sync.RWMutex procmu sync.RWMutex checkpoint int // checkpoint counts towards the new checkpoint currentBlock atomic.Value // 主链的头区块 currentFastBlock atomic.Value // 快速同步模式下链的头区块,这种情况下可能比主链长 stateCache state.Database // State database to reuse between imports (contains state cache) bodyCache *lru.Cache // 最近区块体的缓存 bodyRLPCache *lru.Cache // 最近区块体RLP格式的缓存 blockCache *lru.Cache // 最近完整区块的缓存 futureBlocks *lru.Cache // 待上链的区块的缓存 quit chan struct{} running int32 // running must be called atomically // procInterrupt must be atomically called procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup // chain processing wait group for shutting down engine consensus.Engine // 用来验证区块的接口 processor Processor // 执行区块交易的接口 validator Validator // 验证数据有效性的接口 vmConfig vm.Config badBlocks *lru.Cache // Bad block cache,来自DAO事件 }
以太坊blockchain有这么几个关键的元素:
1)db:连接到底层数据储存,即leveldb; 2)hc:headerchain区块头链,由blockchain额外维护的另一条链,由于Header和Block的储存空间是有很大差别的,但同时Block的Hash值就是Header(RLP)的Hash值,所以维护一个headerchain可以用于快速延长链,验证通过后再下载blockchain,或者可以与blockchain进行相互验证; 3)genesisBlock:创始区块; 4)currentBlock:当前区块,blockchain中并不是储存链所有的block,而是通过currentBlock向前回溯直到genesisBlock,这样就构成了区块链。 5)bodyCache、bodyRLPCache、blockCache、futureBlocks:区块链中的缓存结构,用于加快区块链的读取和构建; 6)engine:是consensus模块中的接口,用来验证block的接口; 7)processor:执行区块链交易的接口,收到一个新的区块时,要对区块中的所有交易执行一遍,一方面是验证,一方面是更新worldState; 8)validator:验证数据有效性的接口 9)futureBlocks:收到的区块时间大于当前头区块时间15s而小于30s的区块,可作为当前节点待处理的区块。
blockchain.go中的部分函数和方法:
// NewBlockChain使用数据库中可用的信息返回完全初始化的块链。 它初始化默认的以太坊Validitor和Processor。 func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {} // BadBlocks 处理客户端从网络上获取的最近的bad block列表 func (bc *BlockChain) BadBlocks() []*types.Block {} // addBadBlock 把bad block放入缓存 func (bc *BlockChain) addBadBlock(block *types.Block) {} // CurrentBlock取回主链的当前头区块,这个区块是从blockchian的内部缓存中取得 func (bc *BlockChain) CurrentBlock() *types.Block {} // CurrentHeader检索规范链的当前头区块header。从HeaderChain的内部缓存中检索标头。 func (bc *BlockChain) CurrentHeader() *types.Header{} // CurrentFastBlock取回主链的当前fast-sync头区块,这个区块是从blockchian的内部缓存中取得 func (bc *BlockChain) CurrentFastBlock() *types.Block {} // 将活动链或其子集写入给定的编写器. func (bc *BlockChain) Export(w io.Writer) error {} func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {} // FastSyncCommitHead快速同步,将当前头块设置为特定hash的区块。 func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {} // GasLimit返回当前头区块的gas limit func (bc *BlockChain) GasLimit() uint64 {} // Genesis 取回genesis区块 func (bc *BlockChain) Genesis() *types.Block {} // 通过hash从数据库或缓存中取到一个区块体(transactions and uncles)或RLP数据 func (bc *BlockChain) GetBody(hash common.Hash) *types.Body {} func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {} // GetBlock 通过hash和number取到区块 func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {} // GetBlockByHash 通过hash取到区块 func (bc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {} // GetBlockByNumber 通过number取到区块 func (bc *BlockChain) GetBlockByNumber(number uint64) *types.Block {} // 获取给定hash的区块的总难度 func (bc *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int{} // 获取给定hash和number区块的header func (bc *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header{} // 获取给定hash的区块header func (bc *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header{} // 获取给定number的区块header func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header{} // 获取从给定hash的区块到genesis区块的所有hash func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash{} // GetReceiptsByHash 在特定的区块中取到所有交易的收据 func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {} // GetBlocksFromHash 取到特定hash的区块及其n-1个父区块 func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {} // GetUnclesInChain 取回从给定区块到向前回溯特定距离到区块上的所有叔区块 func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {} // HasBlock检验hash对应的区块是否完全存在数据库中 func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool {} // 检查给定hash和number的区块的区块头是否存在数据库 func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool{} // HasState检验state trie是否完全存在数据库中 func (bc *BlockChain) HasState(hash common.Hash) bool {} // HasBlockAndState检验hash对应的block和state trie是否完全存在数据库中 func (bc *BlockChain) HasBlockAndState(hash common.Hash, number uint64) bool {} // insert 将新的头块注入当前块链。 该方法假设该块确实是真正的头。 // 如果它们较旧或者它们位于不同的侧链上,它还会将头部标题和头部快速同步块重置为同一个块。 func (bc *BlockChain) insert(block *types.Block) {} // InsertChain尝试将给定批量的block插入到规范链中,否则,创建一个分叉。 如果返回错误,它将返回失败块的索引号以及描述错误的错误。 //插入完成后,将触发所有累积的事件。 func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error){} // insertChain将执行实际的链插入和事件聚合。 // 此方法作为单独方法存在的唯一原因是使用延迟语句使锁定更清晰。 func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error){} // InsertHeaderChain尝试将给定的headerchain插入到本地链中,可能会创建一个重组 func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error){} // InsertReceiptChain 使用交易和收据数据来完成已经存在的headerchain func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {} //loadLastState从数据库加载最后一个已知的链状态。 func (bc *BlockChain) loadLastState() error {} // Processor 返回当前current processor. func (bc *BlockChain) Processor() Processor {} // Reset重置清除整个区块链,将其恢复到genesis state. func (bc *BlockChain) Reset() error {} // ResetWithGenesisBlock 清除整个区块链, 用特定的genesis state重塑,被Reset所引用 func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {} // repair尝试通过回滚当前块来修复当前的区块链,直到找到具有关联状态的块。 // 用于修复由崩溃/断电或简单的非提交尝试导致的不完整的数据库写入。 //此方法仅回滚当前块。 当前标头和当前快速块保持不变。 func (bc *BlockChain) repair(head **types.Block) error {} // reorgs需要两个块、一个旧链以及一个新链,并将重新构建块并将它们插入到新的规范链中,并累积潜在的缺失事务并发布有关它们的事件 func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error{} // Rollback 旨在从数据库中删除不确定有效的链片段 func (bc *BlockChain) Rollback(chain []common.Hash) {} // SetReceiptsData 计算收据的所有非共识字段 func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts types.Receipts) error {} // SetHead将本地链回滚到指定的头部。 // 通常可用于处理分叉时重选主链。对于Header,新Header上方的所有内容都将被删除,新的头部将被设置。 // 但如果块体丢失,则会进一步回退(快速同步后的非归档节点)。 func (bc *BlockChain) SetHead(head uint64) error {} // SetProcessor设置状态修改所需要的processor func (bc *BlockChain) SetProcessor(processor Processor) {} // SetValidator 设置用于验证未来区块的validator func (bc *BlockChain) SetValidator(validator Validator) {} // State 根据当前头区块返回一个可修改的状态 func (bc *BlockChain) State() (*state.StateDB, error) {} // StateAt 根据特定时间点返回新的可变状态 func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {} // Stop 停止区块链服务,如果有正在import的进程,它会使用procInterrupt来取消。 // it will abort them using the procInterrupt. func (bc *BlockChain) Stop() {} // TrieNode从memory缓存或storage中检索与trie节点hash相关联的数据。 func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) {} // Validator返回当前validator. func (bc *BlockChain) Validator() Validator {} // WriteBlockWithoutState仅将块及其元数据写入数据库,但不写入任何状态。 这用于构建竞争方叉,直到超过规范总难度。 func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error){} // WriteBlockWithState将块和所有关联状态写入数据库。 func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) {} // writeHeader将标头写入本地链,因为它的父节点已知。 如果新插入的报头的总难度变得大于当前已知的TD,则重新路由规范链 func (bc *BlockChain) writeHeader(header *types.Header) error{} // 处理未来区块链 func (bc *BlockChain) update() {}
二、HeaderChain
如上所述,HeaderChain是BlockChain额外维护的另一条链,其结构与后者有很大的相似性,包括要连接数据库、genesisHeader、currentHeader、currentHeaderHash以及缓存和engine。注意:consensus.Engine是一个很有用的接口,以太坊中凡是需要用到数据验证的地方都要用到这个接口。
不同之处在于,HeaderChain中由于不存在block的body部分,所以也就用不到processor和validator这两个接口。
type HeaderChain struct { config *params.ChainConfig chainDb ethdb.Database genesisHeader *types.Header currentHeader atomic.Value // 当前区块的区块头 currentHeaderHash common.Hash // 当前区块的区块头hash值(禁止重计算) headerCache *lru.Cache tdCache *lru.Cache numberCache *lru.Cache procInterrupt func() bool rand *mrand.Rand engine consensus.Engine }
headerchain的方法与blockchain类似,其中很多都是被blockchain中相应的方法调用的,所以我们都是通过Blockchain来管理HeaderChain的,这里就不一一列举了。
从这些方法我们可以看到 blockchain和headerchain的行为方式:
1、blockchain模块初始化 2、blockchain模块插入校验分析 3、blockchain模块区块链分叉处理 4、blockchian模块规范链更新
三、详解blockchain模块初始化的方法和函数
区块链本身是一个用来记录数据的数据库,那么维护它的方法无非就是增删查(没有“改”,因为区块链最大的特点之一就是数据不可更改)。
创建:NewBlockChain 增加、插入:InsertChain—>insertChain—>WriteBlockWithState 删除:Reset —> ResetWithGenesisBlock 查询:getBlockByNumber等
初始化blockchain:NewBlockChain()
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { if cacheConfig == nil { cacheConfig = &CacheConfig{ TrieNodeLimit: 256 * 1024 * 1024, TrieTimeLimit: 5 * time.Minute, } } bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) futureBlocks, _ := lru.New(maxFutureBlocks) badBlocks, _ := lru.New(badBlockLimit) bc := &BlockChain{ chainConfig: chainConfig, cacheConfig: cacheConfig, db: db, triegc: prque.New(), stateCache: state.NewDatabase(db), quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, blockCache: blockCache, futureBlocks: futureBlocks, engine: engine, vmConfig: vmConfig, badBlocks: badBlocks, } bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) var err error bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) if err != nil { return nil, err } bc.genesisBlock = bc.GetBlockByNumber(0) if bc.genesisBlock == nil { return nil, ErrNoGenesis } // 加载规范链的头区块到blockchain中 if err := bc.loadLastState(); err != nil { return nil, err } // 遍历badHash列表,如果发现规范链中存在badHash列表中的错误区块,则将规范链回滚到错误区块的父区块 for hash := range BadHashes { if header := bc.GetHeaderByHash(hash); header != nil { // get the canonical block corresponding to the offending header's number headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64()) // make sure the headerByNumber (if present) is in our current canonical chain if headerByNumber != nil && headerByNumber.Hash() == header.Hash() { log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) bc.SetHead(header.Number.Uint64() - 1) log.Error("Chain rewind was successful, resuming normal operation") } } } // 开启处理未来区块的go线程 go bc.update() return bc, nil }
NewBlockChain函数的调用时机:从上一篇介绍以太坊启动的流程中可以知道,该函数的调用流程是
geth ——> makeFullNode ——> RegisterEthService ——> eth.New ——> core.NewBlockChain
NewBlockChain函数的执行过程:
1. 配置cacheConfig,创建各种lru缓存 2. 初始化triegc(按优先级映射排列Block number用以gc):prque.New() 3. 初始化stateDb:state.NewDatabase(db) 4. 初始化区块和状态验证:NewBlockValidator() 5. 初始化状态处理器:NewStateProcessor() 6. 初始化区块头部链:NewHeaderChain() 7. 查找创世区块:bc.genesisBlock = bc.GetBlockByNumber(0) 8. 加载最新的状态数据:bc.loadLastState() 9. 检查本地区块链上是否有bad block,如果有调用bc.SetHead回到硬分叉之前的区块头 10. go bc.update() 定时处理future block
这里细说一下第9步,这涉及到如何查找一个区块以及如何写数据库:
for hash := range BadHashes { if header := bc.GetHeaderByHash(hash); header != nil { // get the canonical block corresponding to the offending header's number headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64()) // make sure the headerByNumber (if present) is in our current canonical chain if headerByNumber != nil && headerByNumber.Hash() == header.Hash() { log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash) bc.SetHead(header.Number.Uint64() - 1) log.Error("Chain rewind was successful, resuming normal operation") } } }
首先遍历BadHash列表,通过hash来取得header:
bc.GetHeaderByHash(hash) —> bc.hc.GetHeaderByHash(hash) —> hc.GetBlockNumber(hash) // 通过hash来找到这个区块的number,即用‘H’+hash为key在数据库中查找 —> hc.GetHeader(hash, *number) // 通过hash+number来找到header —> hc.headerCache.Get(hash) // 先从缓存里找,找不到再去数据库找 —> rawdb.ReadHeader(hc.chainDb, hash, number) // 在数据库中,通过'h'+num+hash为key来找到header的RLP编码值
然后判断该badHash对应的区块头是否在规范链上:
bc.GetHeaderByNumber(number) —> hc.GetHeaderByNumber(number) —> raw.ReadCanonicalHash(hc.chainDb, number) // 在规范链上以‘h’+num+‘n’为key查找区块的hash, // 如果找到了,说明区块链上确实有该badblock // 如果找不到,则说明该bad block只存在数据库,没有上规范链 —> hc.GetHeader(hash,number) // 如果规范链上有这个badblock,则返回该block header
如果规范链上存在坏区块,则回滚到坏区块的父区块。
SetHead(head uint64)
func (bc *BlockChain) SetHead(head uint64) error { log.Warn("Rewinding blockchain", "target", head) bc.mu.Lock() defer bc.mu.Unlock() // Rewind the header chain, deleting all block bodies until then delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) { rawdb.DeleteBody(db, hash, num) } bc.hc.SetHead(head, delFn) currentHeader := bc.hc.CurrentHeader() // Clear out any stale content from the caches bc.bodyCache.Purge() bc.bodyRLPCache.Purge() bc.blockCache.Purge() bc.futureBlocks.Purge() // Rewind the block chain, ensuring we don't end up with a stateless head block if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() { bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())) } if currentBlock := bc.CurrentBlock(); currentBlock != nil { if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { // Rewound state missing, rolled back to before pivot, reset to genesis bc.currentBlock.Store(bc.genesisBlock) } } // Rewind the fast block in a simpleton way to the target head if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() { bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())) } // If either blocks reached nil, reset to the genesis state if currentBlock := bc.CurrentBlock(); currentBlock == nil { bc.currentBlock.Store(bc.genesisBlock) } if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil { bc.currentFastBlock.Store(bc.genesisBlock) } currentBlock := bc.CurrentBlock() currentFastBlock := bc.CurrentFastBlock() rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()) rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()) return bc.loadLastState() }
1. 首先调用bc.hc.SetHead(head, delFn),回滚head对应的区块头。并清除中间区块头所有的数据和缓存。设置head为新的currentHeadr。 2. 重新设置bc.currentBlock,bc.currentFastBlock 3. 在数据库中记录新的headBlockHash和HeadFastBlockHash 4. 调用bc.loadLastState(),重新加载本地的最新状态
loadLastState()加载本地最新状态
func (bc *BlockChain) loadLastState() error { // Restore the last known head block head := rawdb.ReadHeadBlockHash(bc.db) if head == (common.Hash{}) { // Corrupt or empty database, init from scratch log.Warn("Empty database, resetting chain") return bc.Reset() } // Make sure the entire head block is available currentBlock := bc.GetBlockByHash(head) if currentBlock == nil { // Corrupt or empty database, init from scratch log.Warn("Head block missing, resetting chain", "hash", head) return bc.Reset() } // Make sure the state associated with the block is available if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { // Dangling block without a state associated, init from scratch log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash()) if err := bc.repair(¤tBlock); err != nil { return err } } // Everything seems to be fine, set as the head block bc.currentBlock.Store(currentBlock) // Restore the last known head header currentHeader := currentBlock.Header() if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) { if header := bc.GetHeaderByHash(head); header != nil { currentHeader = header } } bc.hc.SetCurrentHeader(currentHeader) // Restore the last known head fast block bc.currentFastBlock.Store(currentBlock) if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) { if block := bc.GetBlockByHash(head); block != nil { bc.currentFastBlock.Store(block) } } // Issue a status log for the user currentFastBlock := bc.CurrentFastBlock() headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64()) blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()) log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd) log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd) log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd) return nil }
所谓加载最新状态,就是要找到最新的区块头,然后设置currentBlock、currentHeader和currentFastBlock,因此:
1. 获取到最新区块以及它的hash 2 . 从stateDb中打开最新区块的状态trie,如果打开失败调用bc.repair(¤tBlock)方法进行修复。修复方法就是从当前区块一个个的往前面找,直到找到好的区块,然后赋值给currentBlock。 3 . 获取到最新的区块头 4 .找到最新的fast模式下的block,并设置bc.currentFastBlock
下一章:以太坊源码解读(6)BlockChain 区块插入和校验
以太坊blockchain的管理事务:1、blockchain模块初始化2、blockchain模块插入校验分析3、blockchain模块区块链分叉处理4、blockchian模块 ...