以太坊源码解读(14)交易池流程

一、交易池的概念和原理

首先了解一下交易池的工作概况:

1、交易池中交易来源:本地的交易和远端节点广播的交易; 2、交易池中交易去向:被Miner模块获取并验证,用于挖矿;挖矿成功后写进区块并被广播; 3、Miner取走交易是复制,交易池中的交易并不减少。直到交易被写进规范链后才从交易池删除; 4、交易如果被写进分叉,交易池中的交易也不减少,等待重新打包。

交易池模块

type TxPool struct {
	config       TxPoolConfig
	chainconfig  *params.ChainConfig
	chain        blockChain
	gasPrice     *big.Int
	txFeed       event.Feed
	scope        event.SubscriptionScope
	chainHeadCh  chan ChainHeadEvent  // txpool订阅区块头的消息
	chainHeadSub event.Subscription  //  区块头消息订阅器,通过它可以取消消息
	signer       types.Signer    //  封装了事务签名处理,椭圆算法
	mu           sync.RWMutex

	currentState  *state.StateDB      // 当前头区块对应的状态
	pendingState  *state.ManagedState // 假设pending列表最后一个交易执行完后应该的状态
	currentMaxGas uint64              // Current gas limit for transaction caps

	locals  *accountSet // Set of local transaction to exempt from eviction rules
	journal *txJournal  // Journal of local transaction to back up to disk

	pending map[common.Address]*txList   // 当前可以处理的交易,key为交易发起者地址,交易按nonce排序
	queue   map[common.Address]*txList   // 当前暂时不能处理的交易
	beats   map[common.Address]time.Time // 每个账号最后一次交易时间
	all     *txLookup                    // 全局交易列表
	priced  *txPricedList                // 所有交易按价格排序

	wg sync.WaitGroup // for shutdown sync

	homestead bool
}

这里值得注意的是两个map,一个是pending,一个是queue,分别是当前待处理的交易列表和暂时不处理的交易,其中key是交易发起者的地址,交易按照nonce进行排序。这两个列表的维护对于整个交易池的运行至关重要。

pendingaddress Atx3tx4tx5  
queueaddress Atx7tx8   

加入某个节点交易池中pending和queue中分别都持有address A发起的一些交易。可以看到: 1、pending中有tx3、4、5,这意味着address A 发起的交易tx0、1、2都已经被插入规范链了。 2、queue中有tx7、tx8是因为该节点没有收到tx5的交易,所以tx7、8暂时不处理; 3、tx7、8等待处理的时间是有限的,如果超过30分钟(当前时间-beats)还没有收到tx6,则会将7、8抛弃; 4、如果收到一笔新交易,交易池先把该交易往pending中进行比对和替换,如果pending中有则替换,没有则放入queue。所以如果此时收到了tx6,会先对pending进行比较,发现没有tx6,然后放到queue中。 5、如果又收到另一笔tx4,交易池会对原有tx4进行替换,替换的条件是新交易的价值要超过原交易的110%。

交易池初始化 1、初始化Txpool类,并且从本地文件中加载local交易; 2、订阅规范链更新事件; 3、启动事件监听;

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
	// Sanitize the input to ensure no vulnerable gas prices are set
	config = (&config).sanitize()

	// Create the transaction pool with its initial settings
	pool := &TxPool{
		config:      config,
		chainconfig: chainconfig,
		chain:       chain,
		signer:      types.NewEIP155Signer(chainconfig.ChainID),
		pending:     make(map[common.Address]*txList),
		queue:       make(map[common.Address]*txList),
		beats:       make(map[common.Address]time.Time),
		all:         newTxLookup(),
		chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
		gasPrice:    new(big.Int).SetUint64(config.PriceLimit),
	}
	pool.locals = newAccountSet(pool.signer)
	pool.priced = newTxPricedList(pool.all)
	pool.reset(nil, chain.CurrentBlock().Header())

	// If local transactions and journaling is enabled, load from disk
	if !config.NoLocals && config.Journal != "" {
		pool.journal = newTxJournal(config.Journal)

		if err := pool.journal.load(pool.AddLocals); err != nil {
			log.Warn("Failed to load transaction journal", "err", err)
		}
		if err := pool.journal.rotate(pool.local()); err != nil {
			log.Warn("Failed to rotate transaction journal", "err", err)
		}
	}
	// Subscribe events from blockchain
	pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)

	// Start the event loop and return
	pool.wg.Add(1)
	go pool.loop()

	return pool
}

注意:local交易比remote交易具有更高的权限,一是不轻易被替换;二是持久化,即通过一个本地的journal文件保存尚未打包的local交易。所以在节点启动的时候,优先从本地加载local交易。

本地地址会被加入白名单,凡由此地址发送的交易均被认为是local交易,不论是从本地递交还是从远端发送来的。

loop监听哪些事件?

首先,pool.loop()定义了三个定时器: 1、report为每8秒钟报告一次状态; 2、evict为每分钟检测不活动account的交易移除(超过3小时不活动的账户交易要被移除) 3、journal为每小时更新一下本地的local transaction journal。

然后开启监听循环: 1、监听规范链更新事件,重置交易池:pool.reset(); 2、监听report.C,定时打印最新pending和queue状态; 3、监听evict.C,定时删除超时交易; 4、监听hournal.C,定时本地储存尚未额打包的local交易;

func (pool *TxPool) loop() {
	defer pool.wg.Done()

	// Start the stats reporting and transaction eviction tickers
	var prevPending, prevQueued, prevStales int

	report := time.NewTicker(statsReportInterval)
	defer report.Stop()

	evict := time.NewTicker(evictionInterval)
	defer evict.Stop()

	journal := time.NewTicker(pool.config.Rejournal)
	defer journal.Stop()

	// Track the previous head headers for transaction reorgs
	head := pool.chain.CurrentBlock()

	// Keep waiting for and reacting to the various events
	for {
		select {
		// Handle ChainHeadEvent
		case ev := <-pool.chainHeadCh:
			if ev.Block != nil {
				pool.mu.Lock()
				if pool.chainconfig.IsHomestead(ev.Block.Number()) {
					pool.homestead = true
				}
				pool.reset(head.Header(), ev.Block.Header())
				head = ev.Block

				pool.mu.Unlock()
			}
		// Be unsubscribed due to system stopped
		case <-pool.chainHeadSub.Err():
			return

		// Handle stats reporting ticks
		case <-report.C:
			pool.mu.RLock()
			pending, queued := pool.stats()
			stales := pool.priced.stales
			pool.mu.RUnlock()

			if pending != prevPending || queued != prevQueued || stales != prevStales {
				log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
				prevPending, prevQueued, prevStales = pending, queued, stales
			}

		// Handle inactive account transaction eviction
		case <-evict.C:
			pool.mu.Lock()
			for addr := range pool.queue {
				// Skip local transactions from the eviction mechanism
				if pool.locals.contains(addr) {
					continue
				}
				// Any non-locals old enough should be removed
				if time.Since(pool.beats[addr]) > pool.config.Lifetime {
					for _, tx := range pool.queue[addr].Flatten() {
						pool.removeTx(tx.Hash(), true)
					}
				}
			}
			pool.mu.Unlock()

		// Handle local transaction journal rotation
		case <-journal.C:
			if pool.journal != nil {
				pool.mu.Lock()
				if err := pool.journal.rotate(pool.local()); err != nil {
					log.Warn("Failed to rotate local tx journal", "err", err)
				}
				pool.mu.Unlock()
			}
		}
	}
}

二、交易入池验证:validateTx

主要功能是验证一个交易的合法性: 1、交易的size不能过大; 2、交易转账值不能为负; 3、交易的gas值超过了当前规范链头区块的gas值; 4、确保交易签名的正确性; 5、在没有指定local参数或本节点白名单中没有包含这个交易地址的情况下,交易的gas不能小于txpool下限; 6、如果本地节点的最新状态中交易发起方的余额不能小于交易gas总消耗(value+tx.gasLimit*tx.gasPrice); 7、如果固定gas消耗不能大于交易设置的gasLimit;

func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
	// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
	if tx.Size() > 32*1024 {
		return ErrOversizedData
	}
	// Transactions can't be negative. This may never happen using RLP decoded
	// transactions but may occur if you create a transaction using the RPC.
	if tx.Value().Sign() < 0 {
		return ErrNegativeValue
	}
	// Ensure the transaction doesn't exceed the current block limit gas.
	if pool.currentMaxGas < tx.Gas() {
		return ErrGasLimit
	}
	// Make sure the transaction is signed properly
	from, err := types.Sender(pool.signer, tx)
	if err != nil {
		return ErrInvalidSender
	}
	// Drop non-local transactions under our own minimal accepted gas price
	local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
	if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
		return ErrUnderpriced
	}
	// Ensure the transaction adheres to nonce ordering
	if pool.currentState.GetNonce(from) > tx.Nonce() {
		return ErrNonceTooLow
	}
	// Transactor should have enough funds to cover the costs
	// cost == V + GP * GL
	if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
		return ErrInsufficientFunds
	}
	intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
	if err != nil {
		return err
	}
	if tx.Gas() < intrGas {
		return ErrIntrinsicGas
	}
	return nil
}

在以太坊中一个message的gas的计算方式是被规定好的: 1、如果是合约创建且是家园版本,固定消耗为53000gas; 2、如果是普通交易,固定消耗是21000gas; 3、非0值数据消耗:固定消耗+非0值数量*68,以64位能表示的最大值为封顶,超过则报错; 4、0值数据消耗:固定消耗+0值数量*4,同样以64位能表示的最大值为封顶,超过则报错。

func IntrinsicGas(data []byte, contractCreation, homestead bool) (uint64, error) {
	// Set the starting gas for the raw transaction
	var gas uint64
	if contractCreation && homestead {
		gas = params.TxGasContractCreation
	} else {
		gas = params.TxGas
	}
	// Bump the required gas by the amount of transactional data
	if len(data) > 0 {
		// Zero and non-zero bytes are priced differently
		var nz uint64
		for _, byt := range data {
			if byt != 0 {
				nz++
			}
		}
		// Make sure we don't exceed uint64 for all data combinations
		if (math.MaxUint64-gas)/params.TxDataNonZeroGas < nz {
			return 0, vm.ErrOutOfGas
		}
		gas += nz * params.TxDataNonZeroGas

		z := uint64(len(data)) - nz
		if (math.MaxUint64-gas)/params.TxDataZeroGas < z {
			return 0, vm.ErrOutOfGas
		}
		gas += z * params.TxDataZeroGas
	}
	return gas, nil
}

三、向交易池添加交易

1、添加交易TxPool.add() add()方法用于将本地或远端的交易加入到交易池,这个方法的基本逻辑是: 1)检查交易是否收到过,重复接受的交易直接丢弃; 2)验证交易是否有效; 3)如果交易池满了,待插入的交易的价值比交易池中任意一个都低,则直接丢弃; 4)如果待插入的交易序号在pending列表中已经存在,且待插入的交易价值大于或等于原交易的110%,则替换原交易; 5)如果待插入的交易序号在pending列表中没有,则直接放入queue列表。如果对应的序号已经有交易了,则如果新交易的价值大于或等于原交易的110%,替换原交易;

注意:这里pool.config.GlobalSlots为所有可执行交易的总数,即pending列表总数,默认4096;pool.config.GlobalQueue为不可执行交易总数,即queue列表总数,默认1024;

func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
	// 如果交易已经获取,则直接丢弃
	hash := tx.Hash()
	if pool.all.Get(hash) != nil {
		log.Trace("Discarding already known transaction", "hash", hash)
		return false, fmt.Errorf("known transaction: %x", hash)
	}
	// 如果交易无法通过验证,则直接丢弃
	if err := pool.validateTx(tx, local); err != nil {
		log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
		invalidTxCounter.Inc(1)
		return false, err
	}
	// 如果交易池满了,待插入的交易的价值比交易池中任意一个都低,则丢弃没有价值的交易
	if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
		//如果待插入的交易价值比当前最便宜的还要低,则直接丢弃
		if !local && pool.priced.Underpriced(tx, pool.locals) {
			log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
			underpricedTxCounter.Inc(1)
			return false, ErrUnderpriced
		}
		// 如果待插入的交易价值不是最差的,则腾出空间,返回即将丢弃的交易并删除
		drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
		for _, tx := range drop {
			log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
			underpricedTxCounter.Inc(1)
			pool.removeTx(tx.Hash(), false)
		}
	}
	// 如果插入pending已有的交易,必须交易价值大于或等于原交易的110%,方可替换
	from, _ := types.Sender(pool.signer, tx) // already validated
	if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
		// Nonce already pending, check if required price bump is met
		inserted, old := list.Add(tx, pool.config.PriceBump)
		if !inserted {
                        // 丢弃计数器加1
			pendingDiscardCounter.Inc(1)
			return false, ErrReplaceUnderpriced
		}
		// New transaction is better, replace old one
		if old != nil {
			pool.all.Remove(old.Hash())
			pool.priced.Removed()
                        // pending列表替换计数器加1
			pendingReplaceCounter.Inc(1)
		}
		pool.all.Add(tx)
		pool.priced.Put(tx)
		pool.journalTx(from, tx)

		log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

		// 通知其他队交易池增加新交易感兴趣的子系统:广播和矿工
		go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})

		return old != nil, nil
	}
	// 新交易无法替换,则加入queue列表
	replace, err := pool.enqueueTx(hash, tx)
	if err != nil {
		return false, err
	}
	// Mark local addresses and journal local transactions
	if local {
		pool.locals.add(from)
	}
	pool.journalTx(from, tx)

	log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
	return replace, nil
}

TxPool.add()的调用时机: 1)命令行发送交易

EthApiBackend.SendTx
    ——> TxPool.AddLocal
        ——> TxPool.addTx
            ——> TxPool.add

2)交易池重新整理的过程中

TxPool.reset
    ——> TxPool.addTxLocked
        ——> TxPool.add

3)收到远程节点广播的交易时

AddRemotes
    ——> TxPool.addRemotes
        ——> TxPool.addTxs
            ——> TxPool.addTxLocked
                ——> TxPool.add

这里有addLocal和addRemote的区别,其中有第二个参数来设定该交易是local还是remote。local的交易在打包时有优先权,在删除时有豁免权,还会以文件的形式保存在磁盘上。

func (pool *TxPool) AddLocal(tx *types.Transaction) error {
	return pool.addTx(tx, !pool.config.NoLocals)
}

func (pool *TxPool) AddRemote(tx *types.Transaction) error {
	return pool.addTx(tx, false)
}

2、交易加入queue列表:TxPool.enqueueTx

主要流程: 1)将交易插入queue中,如果待插入的交易序号在queue列表中已经有一个交易,那么待插入的交易价值大于原交易价值的110%,则替换原交易; 2)如果新交易替换成功,则从all列表中删除这个被替换的交易 3)更新all列表

func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
	// Try to insert the transaction into the future queue
	from, _ := types.Sender(pool.signer, tx) // already validated
	if pool.queue[from] == nil {
		pool.queue[from] = newTxList(false)
	}
	inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
	if !inserted {
		// An older transaction was better, discard this
		queuedDiscardCounter.Inc(1)
		return false, ErrReplaceUnderpriced
	}
	// Discard any previous transaction and mark this
	if old != nil {
		pool.all.Remove(old.Hash())
		pool.priced.Removed()
		queuedReplaceCounter.Inc(1)
	}
	if pool.all.Get(hash) == nil {
		pool.all.Add(tx)
		pool.priced.Put(tx)
	}
	return old != nil, nil
}

调用时机: 1、调用TxPool.add函数,新交易没有加入pending而直接放入queue

TxPool.add
    ——> TxPool.enqueueTx

2、调用TxPool.removeTx,删除pending列表中的交易,将后面的交易移入queue列表;

TxPool.removeTx
    ——> TxPool.enqueueTx

3、调用TxPool.demoteYnexecuteables删除无效交易,同时将后续的交易移入queue列表;

TxPool.demoteUnexpectables
    ——> TxPool.enqueueTx

 

下一章:以太坊源码解读(15)交易池升级、降级和重置

1、交易池降级:TxPool.demoteUnexecutables 主要功能是从pending中移除无效的交易,同时将后续的交易都移动到queue。 1)丢弃nonce值过低的交易:list. ...