以太坊源码解读(15)交易池升级、降级和重置

1、交易池降级:TxPool.demoteUnexecutables

主要功能是从pending中移除无效的交易,同时将后续的交易都移动到queue。 1)丢弃nonce值过低的交易:list.Foward(nonce) 2)删除账户余额已经不足以支付交易的交易:list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) 3)将暂时无效的交易移动到queue: 4)如果前面有间隙,将后面的交易移到queue中。

func (pool *TxPool) demoteUnexecutables() {
	// 遍历pending列表,获取每个addr的最新nonce值
	for addr, list := range pool.pending {
		nonce := pool.currentState.GetNonce(addr)

		// 剔除nonce小于上面nonce值的交易,从all和priced中删除
		for _, tx := range list.Forward(nonce) {
			hash := tx.Hash()
			log.Trace("Removed old pending transaction", "hash", hash)
			pool.all.Remove(hash)
			pool.priced.Removed()
		}
		// 返回账户余额已经不足以支付交易费用和一些暂时无效的交易
		drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
		for _, tx := range drops {
			hash := tx.Hash()
			log.Trace("Removed unpayable pending transaction", "hash", hash)
			pool.all.Remove(hash)
			pool.priced.Removed()
			pendingNofundsCounter.Inc(1)
		}
                // 将暂时无效的交易放到queue中
		for _, tx := range invalids {
			hash := tx.Hash()
			log.Trace("Demoting pending transaction", "hash", hash)
			pool.enqueueTx(hash, tx)
		}
		// 如果有间隙,将后面的交易移动到queue列表中
		if list.Len() > 0 && list.txs.Get(nonce) == nil {
			for _, tx := range list.Cap(0) {
				hash := tx.Hash()
				log.Error("Demoting invalidated transaction", "hash", hash)
				pool.enqueueTx(hash, tx)
			}
		}
		// 如果经过上面的降级,pending里某个addr一个交易都没有,就把该账户给删除
		if list.Empty() {
			delete(pool.pending, addr)
			delete(pool.beats, addr)
		}
	}
}

交易降级有三种可能的情况:

1、分叉导致Account的Nonce值降低:假如原规范链上某账户的交易序号m都已经上链,但分叉后新规范链上交易序号m没有上链,这就导致在规范链上的记录的账户的Nonce降低(由m+1变成了m),这样交易m就必须要回滚到交易池,放到queue中;

2、分叉后出现间隙:间隙的出现通常是因为交易余额问题导致的。假如原规范链上交易m花费100,分叉后该账户又发出一个交易m花费200,这就导致该账户余额本来可以支付原来规范链上的某笔交易,但在新的规范链上可能就不够了。这个余额不足的交易如果是m+3,那么在m+2,m+4号交易之间就出现了空隙,这就导致从m+3开始往后所有的交易都要降级;

3、分叉导致pending最前一个交易的nonce值与状态的nonce值不等

2、交易池升级:TxPool.promoteTx

主要功能是将交易放入pending列表中。这个方法与add的不同之处在于,add是获得到的新交易插入pending,而promoteTx是将queue列表中的Txs放入pending。

1)将交易插入pending中,如果待插入的交易序号在pending列表中已经有一个交易了,那么待插入的交易价值大于或等于原交易价值的110%时,替换原交易; 2)如果新交易替换了某个交易,从all列表中删除这个交易; 3)更新all列表 4)发送交易池更新事件(只有pending列表更新才会有这个事件,queue更新不会,因为queue列表中的交易最终是放入pending的)

func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
	// Try to insert the transaction into the pending queue
	if pool.pending[addr] == nil {
		pool.pending[addr] = newTxList(true)
	}
	list := pool.pending[addr]

	inserted, old := list.Add(tx, pool.config.PriceBump)
	if !inserted {
		// An older transaction was better, discard this
		pool.all.Remove(hash)
		pool.priced.Removed()

		pendingDiscardCounter.Inc(1)
		return false
	}
	// Otherwise discard any previous transaction and mark this
	if old != nil {
		pool.all.Remove(old.Hash())
		pool.priced.Removed()

		pendingReplaceCounter.Inc(1)
	}
	// Failsafe to work around direct pending inserts (tests)
	if pool.all.Get(hash) == nil {
		pool.all.Add(tx)
		pool.priced.Put(tx)
	}
	// Set the potentially new pending nonce and notify any subsystems of the new tx
	pool.beats[addr] = time.Now()
	pool.pendingState.SetNonce(addr, tx.Nonce()+1)

	return true
}

这个函数的调用时在交易池重新整理(TxPool.reset)的过程中,例如当有新的区块产生时,一部分交易会被打包进入区块,从而从交易池中删除,此时就要调用reset去更新交易池。更新交易池时必须将一部分queue中的交易移入pending,判断哪些交易是可以放入pending里的,这些过程是TxPool.promoteExecuables中实现的,该方法在判断交易合法后调用promoteTx将通过判断的交易放入pending。

promoteExecuables()的主要任务是把给定的账号地址列表中已经变得可以执行的交易从queue列表中插入到pending中,同时检查一些失效的交易,然后发送交易池更新事件。

func (pool *TxPool) promoteExecutables(accounts []common.Address) {
	// Track the promoted transactions to broadcast them at once
	var promoted []*types.Transaction

	// 首先从queue中取出要升级交易的账户
	if accounts == nil {
		accounts = make([]common.Address, 0, len(pool.queue))
		for addr := range pool.queue {
			accounts = append(accounts, addr)
		}
	}
	// 从账户中取出交易
	for _, addr := range accounts {
		list := pool.queue[addr]
		if list == nil {
			continue // Just in case someone calls with a non existing account
		}
		// 类似于降级中的方法,删除低于账户Nonce值的交易
		for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
			hash := tx.Hash()
			log.Trace("Removed old queued transaction", "hash", hash)
			pool.all.Remove(hash)
			pool.priced.Removed()
		}
		// 过滤交易余额不足的交易或gas过大的交易,然后从all交易池中删除
		drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
		for _, tx := range drops {
			hash := tx.Hash()
			log.Trace("Removed unpayable queued transaction", "hash", hash)
			pool.all.Remove(hash)
			pool.priced.Removed()
			queuedNofundsCounter.Inc(1)
		}
		// 收集所有可以被加入pending列表的交易,依据是交易序号连续且递增
		for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
			hash := tx.Hash()
			if pool.promoteTx(addr, hash, tx) {
				log.Trace("Promoting queued transaction", "hash", hash)
				promoted = append(promoted, tx)
			}
		}
		// 如果某个账户下的交易数量超过了阈值(默认64)
		if !pool.locals.contains(addr) {
                        // list.Cap返回超过数量限制的交易,然后从all中删除
			for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
				hash := tx.Hash()
				pool.all.Remove(hash)
				pool.priced.Removed()
				queuedRateLimitCounter.Inc(1)
				log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
			}
		}
		// 如果当前账户为空,则删除该账户
		if list.Empty() {
			delete(pool.queue, addr)
		}
	}

经过上面的处理,将queue中的交易移到pending中后,即可发布交易池更新事件:

	// Notify subsystem for new promoted transactions.
	if len(promoted) > 0 {
		go pool.txFeed.Send(NewTxsEvent{promoted})
	}

值得注意的是,在升级和降级的过程中都调用了list.Filter()方法,但是在升级的时候并没有得到invalids,原因在于pending列表的strict属性为true,而queue列表的strict属性为false,所以在执行Filter方法的时候,queue并不能筛选掉所有invalid的交易。

func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) {
	// If all transactions are below the threshold, short circuit
	if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit {
		return nil, nil
	}
	l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds
	l.gascap = gasLimit

	// Filter out all the transactions above the account's funds
	removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit })

	// If the list was strict, filter anything above the lowest nonce
	var invalids types.Transactions

	if l.strict && len(removed) > 0 {
		lowest := uint64(math.MaxUint64)
		for _, tx := range removed {
			if nonce := tx.Nonce(); lowest > nonce {
				lowest = nonce
			}
		}
		invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
	}
	return removed, invalids
}

三、重置TxPool.reset()

主要功能是由于规范链的更新,重新整理交易池。

1)找到由于规范链更新而作废的交易; 2)给交易池设置最新的世界状态; 3)把旧链退回的交易重新放入交易池; 4)由于分叉引起的降级,将pending中部分交易移到queue里面; 5)由于规范链更新,将queue里面部分交易升级移到pending里。

func (pool *TxPool) reset(oldHead, newHead *types.Header) {
	// If we're reorging an old state, reinject all dropped transactions
	var reinject types.Transactions

        // 第一,找到由于规范链更新而作废的交易
        // 新区快头的父区块不等于老区块,说明新老区块不在同一条链
	if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
		// If the reorg is too deep, avoid doing it (will happen during fast sync)
		oldNum := oldHead.Number.Uint64()
		newNum := newHead.Number.Uint64()
      
                // 如果新头区块和旧头区块相差大于64,则所有交易不必回退到交易池
		if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
			log.Debug("Skipping deep transaction reorg", "depth", depth)
		} else {
			// Reorg seems shallow enough to pull in all transactions into memory
			var discarded, included types.Transactions

			var (
				rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
				add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
			)
                        // 如果旧链的头区块大于新链的头区块高度,旧链向后退并回收所有回退的交易
			for rem.NumberU64() > add.NumberU64() {
				discarded = append(discarded, rem.Transactions()...)
				if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
					log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
					return
				}
			}
                        // 如果新链的头区块大于旧链的头区块,新链后退并回收交易
			for add.NumberU64() > rem.NumberU64() {
				included = append(included, add.Transactions()...)
				if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
					log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
					return
				}
			}
                        // 当新旧链到达同一高度的时候同时回退,知道找到共同的父节点
			for rem.Hash() != add.Hash() {
				discarded = append(discarded, rem.Transactions()...)
				if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
					log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
					return
				}
				included = append(included, add.Transactions()...)
				if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
					log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
					return
				}
			}
                        // 找到discarded中那些不存在于included中的交易
			reinject = types.TxDifference(discarded, included)
		}
	}
	// Initialize the internal state to the current head
	if newHead == nil {
		newHead = pool.chain.CurrentBlock().Header() // Special case during testing
	}

        // 第二,给交易池设置最新的世界状态
	statedb, err := pool.chain.StateAt(newHead.Root)
	if err != nil {
		log.Error("Failed to reset txpool state", "err", err)
		return
	}
        // 设置新链头区块的状态
	pool.currentState = statedb
	pool.pendingState = state.ManageState(statedb)
	pool.currentMaxGas = newHead.GasLimit

	// 第三,把旧链回退的交易放入交易池
	log.Debug("Reinjecting stale transactions", "count", len(reinject))
	senderCacher.recover(pool.signer, reinject)
	pool.addTxsLocked(reinject, false)

	// 第四,交易降级
	pool.demoteUnexecutables()

	// 把所有的accounts的nonce更新到最新的pending Nouce
	for addr, list := range pool.pending {
		txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
		pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
	}
	// 第五,交易升级
	pool.promoteExecutables(nil)
}

reset的调用时机: 1、TxPool初始化的过程:NewTxPool; 2、TxPool事件监听go程收到规范链更新事件。

下一章:以太坊源码解读(16)miner模块和Worker模块

 最新的以太坊源码Miner模块有较大的改动,取消了原来的agent模块以及work对象,但是基本上的逻辑还是一样的。Miner模块的主要执行部分在worker中,Miner对象及其方 ...