以太坊源码解读(15)交易池升级、降级和重置
1、交易池降级:TxPool.demoteUnexecutables
主要功能是从pending中移除无效的交易,同时将后续的交易都移动到queue。 1)丢弃nonce值过低的交易:list.Foward(nonce) 2)删除账户余额已经不足以支付交易的交易:list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) 3)将暂时无效的交易移动到queue: 4)如果前面有间隙,将后面的交易移到queue中。
func (pool *TxPool) demoteUnexecutables() { // 遍历pending列表,获取每个addr的最新nonce值 for addr, list := range pool.pending { nonce := pool.currentState.GetNonce(addr) // 剔除nonce小于上面nonce值的交易,从all和priced中删除 for _, tx := range list.Forward(nonce) { hash := tx.Hash() log.Trace("Removed old pending transaction", "hash", hash) pool.all.Remove(hash) pool.priced.Removed() } // 返回账户余额已经不足以支付交易费用和一些暂时无效的交易 drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable pending transaction", "hash", hash) pool.all.Remove(hash) pool.priced.Removed() pendingNofundsCounter.Inc(1) } // 将暂时无效的交易放到queue中 for _, tx := range invalids { hash := tx.Hash() log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } // 如果有间隙,将后面的交易移动到queue列表中 if list.Len() > 0 && list.txs.Get(nonce) == nil { for _, tx := range list.Cap(0) { hash := tx.Hash() log.Error("Demoting invalidated transaction", "hash", hash) pool.enqueueTx(hash, tx) } } // 如果经过上面的降级,pending里某个addr一个交易都没有,就把该账户给删除 if list.Empty() { delete(pool.pending, addr) delete(pool.beats, addr) } } }
交易降级有三种可能的情况:
1、分叉导致Account的Nonce值降低:假如原规范链上某账户的交易序号m都已经上链,但分叉后新规范链上交易序号m没有上链,这就导致在规范链上的记录的账户的Nonce降低(由m+1变成了m),这样交易m就必须要回滚到交易池,放到queue中;
2、分叉后出现间隙:间隙的出现通常是因为交易余额问题导致的。假如原规范链上交易m花费100,分叉后该账户又发出一个交易m花费200,这就导致该账户余额本来可以支付原来规范链上的某笔交易,但在新的规范链上可能就不够了。这个余额不足的交易如果是m+3,那么在m+2,m+4号交易之间就出现了空隙,这就导致从m+3开始往后所有的交易都要降级;
3、分叉导致pending最前一个交易的nonce值与状态的nonce值不等
2、交易池升级:TxPool.promoteTx
主要功能是将交易放入pending列表中。这个方法与add的不同之处在于,add是获得到的新交易插入pending,而promoteTx是将queue列表中的Txs放入pending。
1)将交易插入pending中,如果待插入的交易序号在pending列表中已经有一个交易了,那么待插入的交易价值大于或等于原交易价值的110%时,替换原交易; 2)如果新交易替换了某个交易,从all列表中删除这个交易; 3)更新all列表 4)发送交易池更新事件(只有pending列表更新才会有这个事件,queue更新不会,因为queue列表中的交易最终是放入pending的)
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool { // Try to insert the transaction into the pending queue if pool.pending[addr] == nil { pool.pending[addr] = newTxList(true) } list := pool.pending[addr] inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { // An older transaction was better, discard this pool.all.Remove(hash) pool.priced.Removed() pendingDiscardCounter.Inc(1) return false } // Otherwise discard any previous transaction and mark this if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed() pendingReplaceCounter.Inc(1) } // Failsafe to work around direct pending inserts (tests) if pool.all.Get(hash) == nil { pool.all.Add(tx) pool.priced.Put(tx) } // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) return true }
这个函数的调用时在交易池重新整理(TxPool.reset)的过程中,例如当有新的区块产生时,一部分交易会被打包进入区块,从而从交易池中删除,此时就要调用reset去更新交易池。更新交易池时必须将一部分queue中的交易移入pending,判断哪些交易是可以放入pending里的,这些过程是TxPool.promoteExecuables中实现的,该方法在判断交易合法后调用promoteTx将通过判断的交易放入pending。
promoteExecuables()的主要任务是把给定的账号地址列表中已经变得可以执行的交易从queue列表中插入到pending中,同时检查一些失效的交易,然后发送交易池更新事件。
func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Track the promoted transactions to broadcast them at once var promoted []*types.Transaction // 首先从queue中取出要升级交易的账户 if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) for addr := range pool.queue { accounts = append(accounts, addr) } } // 从账户中取出交易 for _, addr := range accounts { list := pool.queue[addr] if list == nil { continue // Just in case someone calls with a non existing account } // 类似于降级中的方法,删除低于账户Nonce值的交易 for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) { hash := tx.Hash() log.Trace("Removed old queued transaction", "hash", hash) pool.all.Remove(hash) pool.priced.Removed() } // 过滤交易余额不足的交易或gas过大的交易,然后从all交易池中删除 drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable queued transaction", "hash", hash) pool.all.Remove(hash) pool.priced.Removed() queuedNofundsCounter.Inc(1) } // 收集所有可以被加入pending列表的交易,依据是交易序号连续且递增 for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() if pool.promoteTx(addr, hash, tx) { log.Trace("Promoting queued transaction", "hash", hash) promoted = append(promoted, tx) } } // 如果某个账户下的交易数量超过了阈值(默认64) if !pool.locals.contains(addr) { // list.Cap返回超过数量限制的交易,然后从all中删除 for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() pool.all.Remove(hash) pool.priced.Removed() queuedRateLimitCounter.Inc(1) log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } } // 如果当前账户为空,则删除该账户 if list.Empty() { delete(pool.queue, addr) } }
经过上面的处理,将queue中的交易移到pending中后,即可发布交易池更新事件:
// Notify subsystem for new promoted transactions. if len(promoted) > 0 { go pool.txFeed.Send(NewTxsEvent{promoted}) }
值得注意的是,在升级和降级的过程中都调用了list.Filter()方法,但是在升级的时候并没有得到invalids,原因在于pending列表的strict属性为true,而queue列表的strict属性为false,所以在执行Filter方法的时候,queue并不能筛选掉所有invalid的交易。
func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) { // If all transactions are below the threshold, short circuit if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit { return nil, nil } l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds l.gascap = gasLimit // Filter out all the transactions above the account's funds removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit }) // If the list was strict, filter anything above the lowest nonce var invalids types.Transactions if l.strict && len(removed) > 0 { lowest := uint64(math.MaxUint64) for _, tx := range removed { if nonce := tx.Nonce(); lowest > nonce { lowest = nonce } } invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) } return removed, invalids }
三、重置TxPool.reset()
主要功能是由于规范链的更新,重新整理交易池。
1)找到由于规范链更新而作废的交易; 2)给交易池设置最新的世界状态; 3)把旧链退回的交易重新放入交易池; 4)由于分叉引起的降级,将pending中部分交易移到queue里面; 5)由于规范链更新,将queue里面部分交易升级移到pending里。
func (pool *TxPool) reset(oldHead, newHead *types.Header) { // If we're reorging an old state, reinject all dropped transactions var reinject types.Transactions // 第一,找到由于规范链更新而作废的交易 // 新区快头的父区块不等于老区块,说明新老区块不在同一条链 if oldHead != nil && oldHead.Hash() != newHead.ParentHash { // If the reorg is too deep, avoid doing it (will happen during fast sync) oldNum := oldHead.Number.Uint64() newNum := newHead.Number.Uint64() // 如果新头区块和旧头区块相差大于64,则所有交易不必回退到交易池 if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { log.Debug("Skipping deep transaction reorg", "depth", depth) } else { // Reorg seems shallow enough to pull in all transactions into memory var discarded, included types.Transactions var ( rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) ) // 如果旧链的头区块大于新链的头区块高度,旧链向后退并回收所有回退的交易 for rem.NumberU64() > add.NumberU64() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) return } } // 如果新链的头区块大于旧链的头区块,新链后退并回收交易 for add.NumberU64() > rem.NumberU64() { included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) return } } // 当新旧链到达同一高度的时候同时回退,知道找到共同的父节点 for rem.Hash() != add.Hash() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) return } included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) return } } // 找到discarded中那些不存在于included中的交易 reinject = types.TxDifference(discarded, included) } } // Initialize the internal state to the current head if newHead == nil { newHead = pool.chain.CurrentBlock().Header() // Special case during testing } // 第二,给交易池设置最新的世界状态 statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { log.Error("Failed to reset txpool state", "err", err) return } // 设置新链头区块的状态 pool.currentState = statedb pool.pendingState = state.ManageState(statedb) pool.currentMaxGas = newHead.GasLimit // 第三,把旧链回退的交易放入交易池 log.Debug("Reinjecting stale transactions", "count", len(reinject)) senderCacher.recover(pool.signer, reinject) pool.addTxsLocked(reinject, false) // 第四,交易降级 pool.demoteUnexecutables() // 把所有的accounts的nonce更新到最新的pending Nouce for addr, list := range pool.pending { txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1) } // 第五,交易升级 pool.promoteExecutables(nil) }
reset的调用时机: 1、TxPool初始化的过程:NewTxPool; 2、TxPool事件监听go程收到规范链更新事件。
下一章:以太坊源码解读(16)miner模块和Worker模块
最新的以太坊源码Miner模块有较大的改动,取消了原来的agent模块以及work对象,但是基本上的逻辑还是一样的。Miner模块的主要执行部分在worker中,Miner对象及其方 ...