以太坊源码解读(14)交易池流程
一、交易池的概念和原理
首先了解一下交易池的工作概况:
1、交易池中交易来源:本地的交易和远端节点广播的交易; 2、交易池中交易去向:被Miner模块获取并验证,用于挖矿;挖矿成功后写进区块并被广播; 3、Miner取走交易是复制,交易池中的交易并不减少。直到交易被写进规范链后才从交易池删除; 4、交易如果被写进分叉,交易池中的交易也不减少,等待重新打包。
交易池模块
type TxPool struct { config TxPoolConfig chainconfig *params.ChainConfig chain blockChain gasPrice *big.Int txFeed event.Feed scope event.SubscriptionScope chainHeadCh chan ChainHeadEvent // txpool订阅区块头的消息 chainHeadSub event.Subscription // 区块头消息订阅器,通过它可以取消消息 signer types.Signer // 封装了事务签名处理,椭圆算法 mu sync.RWMutex currentState *state.StateDB // 当前头区块对应的状态 pendingState *state.ManagedState // 假设pending列表最后一个交易执行完后应该的状态 currentMaxGas uint64 // Current gas limit for transaction caps locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk pending map[common.Address]*txList // 当前可以处理的交易,key为交易发起者地址,交易按nonce排序 queue map[common.Address]*txList // 当前暂时不能处理的交易 beats map[common.Address]time.Time // 每个账号最后一次交易时间 all *txLookup // 全局交易列表 priced *txPricedList // 所有交易按价格排序 wg sync.WaitGroup // for shutdown sync homestead bool }
这里值得注意的是两个map,一个是pending,一个是queue,分别是当前待处理的交易列表和暂时不处理的交易,其中key是交易发起者的地址,交易按照nonce进行排序。这两个列表的维护对于整个交易池的运行至关重要。
pending | address A | tx3 | tx4 | tx5 |
queue | address A | tx7 | tx8 |
加入某个节点交易池中pending和queue中分别都持有address A发起的一些交易。可以看到: 1、pending中有tx3、4、5,这意味着address A 发起的交易tx0、1、2都已经被插入规范链了。 2、queue中有tx7、tx8是因为该节点没有收到tx5的交易,所以tx7、8暂时不处理; 3、tx7、8等待处理的时间是有限的,如果超过30分钟(当前时间-beats)还没有收到tx6,则会将7、8抛弃; 4、如果收到一笔新交易,交易池先把该交易往pending中进行比对和替换,如果pending中有则替换,没有则放入queue。所以如果此时收到了tx6,会先对pending进行比较,发现没有tx6,然后放到queue中。 5、如果又收到另一笔tx4,交易池会对原有tx4进行替换,替换的条件是新交易的价值要超过原交易的110%。
交易池初始化 1、初始化Txpool类,并且从本地文件中加载local交易; 2、订阅规范链更新事件; 3、启动事件监听;
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() // Create the transaction pool with its initial settings pool := &TxPool{ config: config, chainconfig: chainconfig, chain: chain, signer: types.NewEIP155Signer(chainconfig.ChainID), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), all: newTxLookup(), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(pool.all) pool.reset(nil, chain.CurrentBlock().Header()) // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { pool.journal = newTxJournal(config.Journal) if err := pool.journal.load(pool.AddLocals); err != nil { log.Warn("Failed to load transaction journal", "err", err) } if err := pool.journal.rotate(pool.local()); err != nil { log.Warn("Failed to rotate transaction journal", "err", err) } } // Subscribe events from blockchain pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) // Start the event loop and return pool.wg.Add(1) go pool.loop() return pool }
注意:local交易比remote交易具有更高的权限,一是不轻易被替换;二是持久化,即通过一个本地的journal文件保存尚未打包的local交易。所以在节点启动的时候,优先从本地加载local交易。
本地地址会被加入白名单,凡由此地址发送的交易均被认为是local交易,不论是从本地递交还是从远端发送来的。
loop监听哪些事件?
首先,pool.loop()定义了三个定时器: 1、report为每8秒钟报告一次状态; 2、evict为每分钟检测不活动account的交易移除(超过3小时不活动的账户交易要被移除) 3、journal为每小时更新一下本地的local transaction journal。
然后开启监听循环: 1、监听规范链更新事件,重置交易池:pool.reset(); 2、监听report.C,定时打印最新pending和queue状态; 3、监听evict.C,定时删除超时交易; 4、监听hournal.C,定时本地储存尚未额打包的local交易;
func (pool *TxPool) loop() { defer pool.wg.Done() // Start the stats reporting and transaction eviction tickers var prevPending, prevQueued, prevStales int report := time.NewTicker(statsReportInterval) defer report.Stop() evict := time.NewTicker(evictionInterval) defer evict.Stop() journal := time.NewTicker(pool.config.Rejournal) defer journal.Stop() // Track the previous head headers for transaction reorgs head := pool.chain.CurrentBlock() // Keep waiting for and reacting to the various events for { select { // Handle ChainHeadEvent case ev := <-pool.chainHeadCh: if ev.Block != nil { pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { pool.homestead = true } pool.reset(head.Header(), ev.Block.Header()) head = ev.Block pool.mu.Unlock() } // Be unsubscribed due to system stopped case <-pool.chainHeadSub.Err(): return // Handle stats reporting ticks case <-report.C: pool.mu.RLock() pending, queued := pool.stats() stales := pool.priced.stales pool.mu.RUnlock() if pending != prevPending || queued != prevQueued || stales != prevStales { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) prevPending, prevQueued, prevStales = pending, queued, stales } // Handle inactive account transaction eviction case <-evict.C: pool.mu.Lock() for addr := range pool.queue { // Skip local transactions from the eviction mechanism if pool.locals.contains(addr) { continue } // Any non-locals old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { for _, tx := range pool.queue[addr].Flatten() { pool.removeTx(tx.Hash(), true) } } } pool.mu.Unlock() // Handle local transaction journal rotation case <-journal.C: if pool.journal != nil { pool.mu.Lock() if err := pool.journal.rotate(pool.local()); err != nil { log.Warn("Failed to rotate local tx journal", "err", err) } pool.mu.Unlock() } } } }
二、交易入池验证:validateTx
主要功能是验证一个交易的合法性: 1、交易的size不能过大; 2、交易转账值不能为负; 3、交易的gas值超过了当前规范链头区块的gas值; 4、确保交易签名的正确性; 5、在没有指定local参数或本节点白名单中没有包含这个交易地址的情况下,交易的gas不能小于txpool下限; 6、如果本地节点的最新状态中交易发起方的余额不能小于交易gas总消耗(value+tx.gasLimit*tx.gasPrice); 7、如果固定gas消耗不能大于交易设置的gasLimit;
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // Heuristic limit, reject transactions over 32KB to prevent DOS attacks if tx.Size() > 32*1024 { return ErrOversizedData } // Transactions can't be negative. This may never happen using RLP decoded // transactions but may occur if you create a transaction using the RPC. if tx.Value().Sign() < 0 { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. if pool.currentMaxGas < tx.Gas() { return ErrGasLimit } // Make sure the transaction is signed properly from, err := types.Sender(pool.signer, tx) if err != nil { return ErrInvalidSender } // Drop non-local transactions under our own minimal accepted gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering if pool.currentState.GetNonce(from) > tx.Nonce() { return ErrNonceTooLow } // Transactor should have enough funds to cover the costs // cost == V + GP * GL if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { return ErrInsufficientFunds } intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) if err != nil { return err } if tx.Gas() < intrGas { return ErrIntrinsicGas } return nil }
在以太坊中一个message的gas的计算方式是被规定好的: 1、如果是合约创建且是家园版本,固定消耗为53000gas; 2、如果是普通交易,固定消耗是21000gas; 3、非0值数据消耗:固定消耗+非0值数量*68,以64位能表示的最大值为封顶,超过则报错; 4、0值数据消耗:固定消耗+0值数量*4,同样以64位能表示的最大值为封顶,超过则报错。
func IntrinsicGas(data []byte, contractCreation, homestead bool) (uint64, error) { // Set the starting gas for the raw transaction var gas uint64 if contractCreation && homestead { gas = params.TxGasContractCreation } else { gas = params.TxGas } // Bump the required gas by the amount of transactional data if len(data) > 0 { // Zero and non-zero bytes are priced differently var nz uint64 for _, byt := range data { if byt != 0 { nz++ } } // Make sure we don't exceed uint64 for all data combinations if (math.MaxUint64-gas)/params.TxDataNonZeroGas < nz { return 0, vm.ErrOutOfGas } gas += nz * params.TxDataNonZeroGas z := uint64(len(data)) - nz if (math.MaxUint64-gas)/params.TxDataZeroGas < z { return 0, vm.ErrOutOfGas } gas += z * params.TxDataZeroGas } return gas, nil }
三、向交易池添加交易
1、添加交易TxPool.add() add()方法用于将本地或远端的交易加入到交易池,这个方法的基本逻辑是: 1)检查交易是否收到过,重复接受的交易直接丢弃; 2)验证交易是否有效; 3)如果交易池满了,待插入的交易的价值比交易池中任意一个都低,则直接丢弃; 4)如果待插入的交易序号在pending列表中已经存在,且待插入的交易价值大于或等于原交易的110%,则替换原交易; 5)如果待插入的交易序号在pending列表中没有,则直接放入queue列表。如果对应的序号已经有交易了,则如果新交易的价值大于或等于原交易的110%,替换原交易;
注意:这里pool.config.GlobalSlots为所有可执行交易的总数,即pending列表总数,默认4096;pool.config.GlobalQueue为不可执行交易总数,即queue列表总数,默认1024;
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // 如果交易已经获取,则直接丢弃 hash := tx.Hash() if pool.all.Get(hash) != nil { log.Trace("Discarding already known transaction", "hash", hash) return false, fmt.Errorf("known transaction: %x", hash) } // 如果交易无法通过验证,则直接丢弃 if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxCounter.Inc(1) return false, err } // 如果交易池满了,待插入的交易的价值比交易池中任意一个都低,则丢弃没有价值的交易 if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { //如果待插入的交易价值比当前最便宜的还要低,则直接丢弃 if !local && pool.priced.Underpriced(tx, pool.locals) { log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) underpricedTxCounter.Inc(1) return false, ErrUnderpriced } // 如果待插入的交易价值不是最差的,则腾出空间,返回即将丢弃的交易并删除 drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) underpricedTxCounter.Inc(1) pool.removeTx(tx.Hash(), false) } } // 如果插入pending已有的交易,必须交易价值大于或等于原交易的110%,方可替换 from, _ := types.Sender(pool.signer, tx) // already validated if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { // 丢弃计数器加1 pendingDiscardCounter.Inc(1) return false, ErrReplaceUnderpriced } // New transaction is better, replace old one if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed() // pending列表替换计数器加1 pendingReplaceCounter.Inc(1) } pool.all.Add(tx) pool.priced.Put(tx) pool.journalTx(from, tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // 通知其他队交易池增加新交易感兴趣的子系统:广播和矿工 go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}}) return old != nil, nil } // 新交易无法替换,则加入queue列表 replace, err := pool.enqueueTx(hash, tx) if err != nil { return false, err } // Mark local addresses and journal local transactions if local { pool.locals.add(from) } pool.journalTx(from, tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replace, nil }
TxPool.add()的调用时机: 1)命令行发送交易
EthApiBackend.SendTx ——> TxPool.AddLocal ——> TxPool.addTx ——> TxPool.add
2)交易池重新整理的过程中
TxPool.reset ——> TxPool.addTxLocked ——> TxPool.add
3)收到远程节点广播的交易时
AddRemotes ——> TxPool.addRemotes ——> TxPool.addTxs ——> TxPool.addTxLocked ——> TxPool.add
这里有addLocal和addRemote的区别,其中有第二个参数来设定该交易是local还是remote。local的交易在打包时有优先权,在删除时有豁免权,还会以文件的形式保存在磁盘上。
func (pool *TxPool) AddLocal(tx *types.Transaction) error { return pool.addTx(tx, !pool.config.NoLocals) } func (pool *TxPool) AddRemote(tx *types.Transaction) error { return pool.addTx(tx, false) }
2、交易加入queue列表:TxPool.enqueueTx
主要流程: 1)将交易插入queue中,如果待插入的交易序号在queue列表中已经有一个交易,那么待插入的交易价值大于原交易价值的110%,则替换原交易; 2)如果新交易替换成功,则从all列表中删除这个被替换的交易 3)更新all列表
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) { // Try to insert the transaction into the future queue from, _ := types.Sender(pool.signer, tx) // already validated if pool.queue[from] == nil { pool.queue[from] = newTxList(false) } inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump) if !inserted { // An older transaction was better, discard this queuedDiscardCounter.Inc(1) return false, ErrReplaceUnderpriced } // Discard any previous transaction and mark this if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed() queuedReplaceCounter.Inc(1) } if pool.all.Get(hash) == nil { pool.all.Add(tx) pool.priced.Put(tx) } return old != nil, nil }
调用时机: 1、调用TxPool.add函数,新交易没有加入pending而直接放入queue
TxPool.add ——> TxPool.enqueueTx
2、调用TxPool.removeTx,删除pending列表中的交易,将后面的交易移入queue列表;
TxPool.removeTx ——> TxPool.enqueueTx
3、调用TxPool.demoteYnexecuteables删除无效交易,同时将后续的交易移入queue列表;
TxPool.demoteUnexpectables ——> TxPool.enqueueTx
下一章:以太坊源码解读(15)交易池升级、降级和重置
1、交易池降级:TxPool.demoteUnexecutables 主要功能是从pending中移除无效的交易,同时将后续的交易都移动到queue。 1)丢弃nonce值过低的交易:list. ...