以太坊源码解读(15)交易池升级、降级和重置
1、交易池降级:TxPool.demoteUnexecutables
主要功能是从pending中移除无效的交易,同时将后续的交易都移动到queue。 1)丢弃nonce值过低的交易:list.Foward(nonce) 2)删除账户余额已经不足以支付交易的交易:list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) 3)将暂时无效的交易移动到queue: 4)如果前面有间隙,将后面的交易移到queue中。
func (pool *TxPool) demoteUnexecutables() {
// 遍历pending列表,获取每个addr的最新nonce值
for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)
// 剔除nonce小于上面nonce值的交易,从all和priced中删除
for _, tx := range list.Forward(nonce) {
hash := tx.Hash()
log.Trace("Removed old pending transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
}
// 返回账户余额已经不足以支付交易费用和一些暂时无效的交易
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
pendingNofundsCounter.Inc(1)
}
// 将暂时无效的交易放到queue中
for _, tx := range invalids {
hash := tx.Hash()
log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
// 如果有间隙,将后面的交易移动到queue列表中
if list.Len() > 0 && list.txs.Get(nonce) == nil {
for _, tx := range list.Cap(0) {
hash := tx.Hash()
log.Error("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
}
// 如果经过上面的降级,pending里某个addr一个交易都没有,就把该账户给删除
if list.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
}
}
交易降级有三种可能的情况:
1、分叉导致Account的Nonce值降低:假如原规范链上某账户的交易序号m都已经上链,但分叉后新规范链上交易序号m没有上链,这就导致在规范链上的记录的账户的Nonce降低(由m+1变成了m),这样交易m就必须要回滚到交易池,放到queue中;

2、分叉后出现间隙:间隙的出现通常是因为交易余额问题导致的。假如原规范链上交易m花费100,分叉后该账户又发出一个交易m花费200,这就导致该账户余额本来可以支付原来规范链上的某笔交易,但在新的规范链上可能就不够了。这个余额不足的交易如果是m+3,那么在m+2,m+4号交易之间就出现了空隙,这就导致从m+3开始往后所有的交易都要降级;
3、分叉导致pending最前一个交易的nonce值与状态的nonce值不等
2、交易池升级:TxPool.promoteTx
主要功能是将交易放入pending列表中。这个方法与add的不同之处在于,add是获得到的新交易插入pending,而promoteTx是将queue列表中的Txs放入pending。
1)将交易插入pending中,如果待插入的交易序号在pending列表中已经有一个交易了,那么待插入的交易价值大于或等于原交易价值的110%时,替换原交易; 2)如果新交易替换了某个交易,从all列表中删除这个交易; 3)更新all列表 4)发送交易池更新事件(只有pending列表更新才会有这个事件,queue更新不会,因为queue列表中的交易最终是放入pending的)
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
list := pool.pending[addr]
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
pool.all.Remove(hash)
pool.priced.Removed()
pendingDiscardCounter.Inc(1)
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
return true
}
这个函数的调用时在交易池重新整理(TxPool.reset)的过程中,例如当有新的区块产生时,一部分交易会被打包进入区块,从而从交易池中删除,此时就要调用reset去更新交易池。更新交易池时必须将一部分queue中的交易移入pending,判断哪些交易是可以放入pending里的,这些过程是TxPool.promoteExecuables中实现的,该方法在判断交易合法后调用promoteTx将通过判断的交易放入pending。
promoteExecuables()的主要任务是把给定的账号地址列表中已经变得可以执行的交易从queue列表中插入到pending中,同时检查一些失效的交易,然后发送交易池更新事件。
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
// 首先从queue中取出要升级交易的账户
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
accounts = append(accounts, addr)
}
}
// 从账户中取出交易
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// 类似于降级中的方法,删除低于账户Nonce值的交易
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
}
// 过滤交易余额不足的交易或gas过大的交易,然后从all交易池中删除
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
// 收集所有可以被加入pending列表的交易,依据是交易序号连续且递增
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction", "hash", hash)
promoted = append(promoted, tx)
}
}
// 如果某个账户下的交易数量超过了阈值(默认64)
if !pool.locals.contains(addr) {
// list.Cap返回超过数量限制的交易,然后从all中删除
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
// 如果当前账户为空,则删除该账户
if list.Empty() {
delete(pool.queue, addr)
}
}
经过上面的处理,将queue中的交易移到pending中后,即可发布交易池更新事件:
// Notify subsystem for new promoted transactions.
if len(promoted) > 0 {
go pool.txFeed.Send(NewTxsEvent{promoted})
}
值得注意的是,在升级和降级的过程中都调用了list.Filter()方法,但是在升级的时候并没有得到invalids,原因在于pending列表的strict属性为true,而queue列表的strict属性为false,所以在执行Filter方法的时候,queue并不能筛选掉所有invalid的交易。
func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) {
// If all transactions are below the threshold, short circuit
if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit {
return nil, nil
}
l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds
l.gascap = gasLimit
// Filter out all the transactions above the account's funds
removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit })
// If the list was strict, filter anything above the lowest nonce
var invalids types.Transactions
if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64)
for _, tx := range removed {
if nonce := tx.Nonce(); lowest > nonce {
lowest = nonce
}
}
invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
}
return removed, invalids
}
三、重置TxPool.reset()
主要功能是由于规范链的更新,重新整理交易池。
1)找到由于规范链更新而作废的交易; 2)给交易池设置最新的世界状态; 3)把旧链退回的交易重新放入交易池; 4)由于分叉引起的降级,将pending中部分交易移到queue里面; 5)由于规范链更新,将queue里面部分交易升级移到pending里。
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// If we're reorging an old state, reinject all dropped transactions
var reinject types.Transactions
// 第一,找到由于规范链更新而作废的交易
// 新区快头的父区块不等于老区块,说明新老区块不在同一条链
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
// If the reorg is too deep, avoid doing it (will happen during fast sync)
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
// 如果新头区块和旧头区块相差大于64,则所有交易不必回退到交易池
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg", "depth", depth)
} else {
// Reorg seems shallow enough to pull in all transactions into memory
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
// 如果旧链的头区块大于新链的头区块高度,旧链向后退并回收所有回退的交易
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
}
// 如果新链的头区块大于旧链的头区块,新链后退并回收交易
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
// 当新旧链到达同一高度的时候同时回退,知道找到共同的父节点
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
// 找到discarded中那些不存在于included中的交易
reinject = types.TxDifference(discarded, included)
}
}
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentBlock().Header() // Special case during testing
}
// 第二,给交易池设置最新的世界状态
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset txpool state", "err", err)
return
}
// 设置新链头区块的状态
pool.currentState = statedb
pool.pendingState = state.ManageState(statedb)
pool.currentMaxGas = newHead.GasLimit
// 第三,把旧链回退的交易放入交易池
log.Debug("Reinjecting stale transactions", "count", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
// 第四,交易降级
pool.demoteUnexecutables()
// 把所有的accounts的nonce更新到最新的pending Nouce
for addr, list := range pool.pending {
txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
}
// 第五,交易升级
pool.promoteExecutables(nil)
}
reset的调用时机: 1、TxPool初始化的过程:NewTxPool; 2、TxPool事件监听go程收到规范链更新事件。
下一章:以太坊源码解读(16)miner模块和Worker模块
最新的以太坊源码Miner模块有较大的改动,取消了原来的agent模块以及work对象,但是基本上的逻辑还是一样的。Miner模块的主要执行部分在worker中,Miner对象及其方 ...
AI 中文社