diff --git a/core/state_processor.go b/core/state_processor.go index 220b54b371..4aede4c1e4 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -633,6 +633,190 @@ func applyTransaction(msg types.Message, parent *types.WorkObject, config *param return receipt, err } +func ValidateQiTxInputs(tx *types.Transaction, chain ChainContext, statedb *state.StateDB, currentHeader *types.WorkObject, signer types.Signer, location common.Location, chainId big.Int) (*big.Int, error) { + if tx.Type() != types.QiTxType { + return nil, fmt.Errorf("tx %032x is not a QiTx", tx.Hash()) + } + totalQitIn := big.NewInt(0) + addresses := make(map[common.AddressBytes]struct{}) + for _, txIn := range tx.TxIn() { + utxo := statedb.GetUTXO(txIn.PreviousOutPoint.TxHash, txIn.PreviousOutPoint.Index) + if utxo == nil { + return nil, fmt.Errorf("tx %032x spends non-existent UTXO %032x:%d", tx.Hash(), txIn.PreviousOutPoint.TxHash, txIn.PreviousOutPoint.Index) + } + if utxo.Lock != nil && utxo.Lock.Cmp(currentHeader.Number(location.Context())) > 0 { + return nil, fmt.Errorf("tx %032x spends locked UTXO %032x:%d locked until %s", tx.Hash(), txIn.PreviousOutPoint.TxHash, txIn.PreviousOutPoint.Index, utxo.Lock.String()) + } + address := crypto.PubkeyBytesToAddress(txIn.PubKey, location) + entryAddr := common.BytesToAddress(utxo.Address, location) + if !address.Equal(entryAddr) { + return nil, fmt.Errorf("tx %032x spends UTXO %032x:%d with invalid pubkey, have %s want %s", tx.Hash(), txIn.PreviousOutPoint.TxHash, txIn.PreviousOutPoint.Index, address.String(), entryAddr.String()) + } + // Check for duplicate addresses. This also checks for duplicate inputs. + if _, exists := addresses[common.AddressBytes(utxo.Address)]; exists { + return nil, errors.New("Duplicate address in QiTx inputs: " + common.AddressBytes(utxo.Address).String()) + } + addresses[common.AddressBytes(utxo.Address)] = struct{}{} + + // Perform some spend processing logic + denomination := utxo.Denomination + if denomination > types.MaxDenomination { + str := fmt.Sprintf("transaction output value of %v is "+ + "higher than max allowed value of %v", + denomination, + types.MaxDenomination) + return nil, errors.New(str) + } + totalQitIn.Add(totalQitIn, types.Denominations[denomination]) + } + return totalQitIn, nil + +} + +func ValidateQiTxOutputsAndSignature(tx *types.Transaction, chain ChainContext, totalQitIn *big.Int, currentHeader *types.WorkObject, signer types.Signer, location common.Location, chainId big.Int, etxRLimit, etxPLimit int) (*big.Int, error) { + + intrinsicGas := types.CalculateIntrinsicQiTxGas(tx) + usedGas := intrinsicGas + + var ETXRCount int + var ETXPCount int + numEtxs := uint64(0) + totalQitOut := big.NewInt(0) + totalConvertQitOut := big.NewInt(0) + conversion := false + pubKeys := make([]*btcec.PublicKey, 0, len(tx.TxIn())) + addresses := make(map[common.AddressBytes]struct{}) + for _, txIn := range tx.TxIn() { + pubKey, err := btcec.ParsePubKey(txIn.PubKey) + if err != nil { + return nil, err + } + pubKeys = append(pubKeys, pubKey) + addresses[crypto.PubkeyBytesToAddress(txIn.PubKey, location).Bytes20()] = struct{}{} + } + for txOutIdx, txOut := range tx.TxOut() { + // It would be impossible for a tx to have this many outputs based on block gas limit, but cap it here anyways + if txOutIdx > types.MaxOutputIndex { + return nil, fmt.Errorf("tx [%v] exceeds max output index of %d", tx.Hash().Hex(), types.MaxOutputIndex) + } + + if txOut.Denomination > types.MaxDenomination { + str := fmt.Sprintf("transaction output value of %v is "+ + "higher than max allowed value of %v", + txOut.Denomination, + types.MaxDenomination) + return nil, errors.New(str) + } + totalQitOut.Add(totalQitOut, types.Denominations[txOut.Denomination]) + + toAddr := common.BytesToAddress(txOut.Address, location) + + // Enforce no address reuse + if _, exists := addresses[toAddr.Bytes20()]; exists { + return nil, errors.New("Duplicate address in QiTx outputs: " + toAddr.String()) + } + addresses[toAddr.Bytes20()] = struct{}{} + + if toAddr.Location().Equal(location) && toAddr.IsInQuaiLedgerScope() { // Qi->Quai conversion + conversion = true + if txOut.Denomination < params.MinQiConversionDenomination { + return nil, fmt.Errorf("tx %v emits UTXO with value %d less than minimum denomination %d", tx.Hash().Hex(), txOut.Denomination, params.MinQiConversionDenomination) + } + totalConvertQitOut.Add(totalConvertQitOut, types.Denominations[txOut.Denomination]) // Add to total conversion output for aggregation + delete(addresses, toAddr.Bytes20()) + continue + } + + if !toAddr.Location().Equal(location) { // This output creates an ETX + // Cross-region? + if toAddr.Location().CommonDom(location).Context() == common.REGION_CTX { + ETXRCount++ + } + // Cross-prime? + if toAddr.Location().CommonDom(location).Context() == common.PRIME_CTX { + ETXPCount++ + } + if ETXRCount > etxRLimit { + return nil, fmt.Errorf("tx [%v] emits too many cross-region ETXs for block. emitted: %d, limit: %d", tx.Hash().Hex(), ETXRCount, etxRLimit) + } + if ETXPCount > etxPLimit { + return nil, fmt.Errorf("tx [%v] emits too many cross-prime ETXs for block. emitted: %d, limit: %d", tx.Hash().Hex(), ETXPCount, etxPLimit) + } + primeTerminus := currentHeader.PrimeTerminus() + primeTerminusHeader := chain.GetHeaderByHash(primeTerminus) + if primeTerminusHeader == nil { + return nil, fmt.Errorf("could not find prime terminus header %032x", primeTerminus) + } + if !toAddr.IsInQiLedgerScope() { + return nil, fmt.Errorf("tx [%v] emits UTXO with To address not in the Qi ledger scope", tx.Hash().Hex()) + } + if !chain.CheckIfEtxIsEligible(primeTerminusHeader.EtxEligibleSlices(), *toAddr.Location()) { + return nil, fmt.Errorf("etx emitted by tx [%v] going to a slice that is not eligible to receive etx %v", tx.Hash().Hex(), *toAddr.Location()) + } + + // We should require some kind of extra fee here + usedGas += params.ETXGas + numEtxs++ + } + } + // Ensure the transaction does not spend more than its inputs. + if totalQitOut.Cmp(totalQitIn) == 1 { + str := fmt.Sprintf("total value of all transaction inputs for "+ + "transaction %v is %v which is less than the amount "+ + "spent of %v", tx.Hash(), totalQitIn, totalQitOut) + return nil, errors.New(str) + } + + // the fee to pay the basefee/miner is the difference between inputs and outputs + txFeeInQit := new(big.Int).Sub(totalQitIn, totalQitOut) + // Check tx against required base fee and gas + requiredGas := intrinsicGas + (numEtxs * (params.TxGas + params.ETXGas)) // Each ETX costs extra gas that is paid in the origin + if requiredGas < intrinsicGas { + // Overflow + return nil, fmt.Errorf("tx %032x has too many ETXs to calculate required gas", tx.Hash()) + } + minimumFeeInQuai := new(big.Int).Mul(big.NewInt(int64(requiredGas)), currentHeader.BaseFee()) + minimumFee := misc.QuaiToQi(currentHeader, minimumFeeInQuai) + if txFeeInQit.Cmp(minimumFee) < 0 { + return nil, fmt.Errorf("tx %032x has insufficient fee for base fee, have %d want %d", tx.Hash(), txFeeInQit.Uint64(), minimumFee.Uint64()) + } + // Miner gets remainder of fee after base fee, except in the convert case + txFeeInQit.Sub(txFeeInQit, minimumFee) + if conversion { + ETXPCount++ + if ETXPCount > etxPLimit { + return nil, fmt.Errorf("tx [%v] emits too many cross-prime ETXs for block. emitted: %d, limit: %d", tx.Hash().Hex(), ETXPCount, etxPLimit) + } + usedGas += params.ETXGas + txFeeInQit.Sub(txFeeInQit, txFeeInQit) // Fee goes entirely to gas to pay for conversion + } + + if usedGas > currentHeader.GasLimit() { + return nil, fmt.Errorf("tx %032x uses too much gas, have used %d out of %d", tx.Hash(), usedGas, currentHeader.GasLimit()) + } + + // Ensure the transaction signature is valid + var finalKey *btcec.PublicKey + if len(tx.TxIn()) > 1 { + aggKey, _, _, err := musig2.AggregateKeys( + pubKeys, false, + ) + if err != nil { + return nil, err + } + finalKey = aggKey.FinalKey + } else { + finalKey = pubKeys[0] + } + + txDigestHash := signer.Hash(tx) + if !tx.GetSchnorrSignature().Verify(txDigestHash[:], finalKey) { + return nil, fmt.Errorf("invalid signature for tx %032x digest hash %032x", tx.Hash(), txDigestHash) + } + + return txFeeInQit, nil +} + // ProcessQiTx processes a QiTx by spending the inputs and creating the outputs. // Math is performed to verify the fee provided is sufficient to cover the gas cost. // updateState is set to update the statedb in the case of the state processor, but not in the case of the txpool. @@ -794,26 +978,6 @@ func ProcessQiTx(tx *types.Transaction, chain ChainContext, updateState bool, ch return nil, nil, errors.New(str) } - // Ensure the transaction signature is valid - if checkSig { - var finalKey *btcec.PublicKey - if len(tx.TxIn()) > 1 { - aggKey, _, _, err := musig2.AggregateKeys( - pubKeys, false, - ) - if err != nil { - return nil, nil, err - } - finalKey = aggKey.FinalKey - } else { - finalKey = pubKeys[0] - } - - txDigestHash := signer.Hash(tx) - if !tx.GetSchnorrSignature().Verify(txDigestHash[:], finalKey) { - return nil, nil, errors.New("invalid signature for digest hash " + txDigestHash.String()) - } - } // the fee to pay the basefee/miner is the difference between inputs and outputs txFeeInQit := new(big.Int).Sub(totalQitIn, totalQitOut) // Check tx against required base fee and gas @@ -851,6 +1015,27 @@ func ProcessQiTx(tx *types.Transaction, chain ChainContext, updateState bool, ch txFeeInQit.Sub(txFeeInQit, txFeeInQit) // Fee goes entirely to gas to pay for conversion } + // Ensure the transaction signature is valid + if checkSig { + var finalKey *btcec.PublicKey + if len(tx.TxIn()) > 1 { + aggKey, _, _, err := musig2.AggregateKeys( + pubKeys, false, + ) + if err != nil { + return nil, nil, err + } + finalKey = aggKey.FinalKey + } else { + finalKey = pubKeys[0] + } + + txDigestHash := signer.Hash(tx) + if !tx.GetSchnorrSignature().Verify(txDigestHash[:], finalKey) { + return nil, nil, errors.New("invalid signature for digest hash " + txDigestHash.String()) + } + } + *etxRLimit -= ETXRCount *etxPLimit -= ETXPCount return txFeeInQit, etxs, nil diff --git a/core/tx_pool.go b/core/tx_pool.go index 638f81a540..f3393edcdb 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -175,6 +175,7 @@ type TxPoolConfig struct { GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts QiPoolSize uint64 // Maximum number of Qi transactions to store Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + ReorgFrequency time.Duration // Frequency of reorgs outside of new head events } // DefaultTxPoolConfig contains the default configurations for the transaction @@ -194,6 +195,7 @@ var DefaultTxPoolConfig = TxPoolConfig{ GlobalQueue: 2048, QiPoolSize: 10024, Lifetime: 3 * time.Hour, + ReorgFrequency: 2 * time.Second, } // sanitize checks the provided user configurations and changes anything that's @@ -263,6 +265,13 @@ func (config *TxPoolConfig) sanitize(logger *log.Logger) TxPoolConfig { }).Warn("Sanitizing invalid txpool lifetime") conf.Lifetime = DefaultTxPoolConfig.Lifetime } + if conf.ReorgFrequency < 1 { + logger.WithFields(log.Fields{ + "provided": conf.ReorgFrequency, + "updated": DefaultTxPoolConfig.ReorgFrequency, + }).Warn("Sanitizing invalid txpool reorg frequency") + conf.ReorgFrequency = DefaultTxPoolConfig.ReorgFrequency + } return conf } @@ -1128,78 +1137,15 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { nilSlot++ } // Reorg the pool internals if needed and return - done := pool.requestPromoteExecutables(dirtyAddrs) - if sync { - <-done - } + pool.requestPromoteExecutables(dirtyAddrs) return errs } -// addQiTx adds a Qi transaction to the Qi pool. -// If the mempool lock is already held by the caller, the caller must set grabLock to false. -// If the mempool lock is not held by the caller, the caller must set grabLock to true. -func (pool *TxPool) addQiTx(tx *types.Transaction) error { - pool.qiMu.RLock() - if _, hasTx := pool.qiPool[tx.Hash()]; hasTx { - pool.qiMu.RUnlock() - return ErrAlreadyKnown - } - pool.qiMu.RUnlock() - - currentBlock := pool.chain.CurrentBlock() - gp := types.GasPool(currentBlock.GasLimit()) - etxRLimit := len(currentBlock.Transactions()) / params.ETXRegionMaxFraction - if etxRLimit < params.ETXRLimitMin { - etxRLimit = params.ETXRLimitMin - } - etxPLimit := len(currentBlock.Transactions()) / params.ETXPrimeMaxFraction - if etxPLimit < params.ETXPLimitMin { - etxPLimit = params.ETXPLimitMin - } - fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) - if err != nil { - pool.logger.WithFields(logrus.Fields{ - "tx": tx.Hash().String(), - "err": err, - }).Debug("Invalid Qi transaction") - return err - } - txWithMinerFee, err := types.NewTxWithMinerFee(tx, nil, fee) - if err != nil { - return err - } - pool.qiMu.Lock() - if uint64(len(pool.qiPool))+1 > pool.config.QiPoolSize { - // If the pool is full, don't accept the transaction - pool.qiMu.Unlock() - pool.logger.WithFields(logrus.Fields{ - "tx": tx.Hash().String(), - "fee": fee, - }).Error("Qi tx pool is full") - return ErrTxPoolOverflow - } - pool.qiPool[tx.Hash()] = txWithMinerFee - pool.qiMu.Unlock() - pool.queueTxEvent(tx) - select { - case pool.sendersCh <- newSender{tx.Hash(), common.InternalAddress{}}: // There is no "sender" for Qi transactions, but the sig is good - default: - pool.logger.Error("sendersCh is full, skipping until there is room") - } - pool.logger.WithFields(logrus.Fields{ - "tx": tx.Hash().String(), - "fee": fee, - }).Info("Added qi tx to pool") - qiTxGauge.Add(1) - return nil -} - // addQiTx adds Qi transactions to the Qi pool. // The qiMu lock must be held by the caller. func (pool *TxPool) addQiTxsLocked(txs types.Transactions) []error { errs := make([]error, 0) currentBlock := pool.chain.CurrentBlock() - gp := types.GasPool(currentBlock.GasLimit()) etxRLimit := len(currentBlock.Transactions()) / params.ETXRegionMaxFraction if etxRLimit < params.ETXRLimitMin { etxRLimit = params.ETXRLimitMin @@ -1208,9 +1154,23 @@ func (pool *TxPool) addQiTxsLocked(txs types.Transactions) []error { if etxPLimit < params.ETXPLimitMin { etxPLimit = params.ETXPLimitMin } + transactionsWithoutErrors := make(types.Transactions, 0, len(txs)) + totalQitIns := make([]*big.Int, 0, len(txs)) for _, tx := range txs { - - fee, _, err := ProcessQiTx(tx, pool.chain, false, true, pool.chain.CurrentBlock(), pool.currentState, &gp, new(uint64), pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, &etxRLimit, &etxPLimit) + totalQitIn, err := ValidateQiTxInputs(tx, pool.chain, pool.currentState, currentBlock, pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID) + if err != nil { + pool.logger.WithFields(logrus.Fields{ + "tx": tx.Hash().String(), + "err": err, + }).Debug("Invalid Qi transaction") + errs = append(errs, err) + continue + } + transactionsWithoutErrors = append(transactionsWithoutErrors, tx) + totalQitIns = append(totalQitIns, totalQitIn) + } + for i, tx := range transactionsWithoutErrors { + fee, err := ValidateQiTxOutputsAndSignature(tx, pool.chain, totalQitIns[i], currentBlock, pool.signer, pool.chainconfig.Location, *pool.chainconfig.ChainID, etxRLimit, etxPLimit) if err != nil { pool.logger.WithFields(logrus.Fields{ "tx": tx.Hash().String(), @@ -1406,12 +1366,10 @@ func (pool *TxPool) requestReset(oldHead *types.WorkObject, newHead *types.WorkO // requestPromoteExecutables requests transaction promotion checks for the given addresses. // The returned channel is closed when the promotion checks have occurred. -func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} { +func (pool *TxPool) requestPromoteExecutables(set *accountSet) { select { case pool.reqPromoteCh <- set: - return <-pool.reorgDoneCh case <-pool.reorgShutdownCh: - return pool.reorgShutdownCh } } @@ -1438,16 +1396,16 @@ func (pool *TxPool) scheduleReorgLoop() { }).Error("Go-Quai Panicked") } }() - var ( - curDone chan struct{} // non-nil while runReorg is active - nextDone = make(chan struct{}) - launchNextRun bool - reset *txpoolResetRequest - dirtyAccounts *accountSet - queuedEvents = make(map[common.InternalAddress]*txSortedMap) - reorgCancelCh = make(chan struct{}) - queuedQiTxs = make([]*types.Transaction, 0) + curDone chan struct{} // non-nil while runReorg is active + nextDone = make(chan struct{}) + launchNextRun bool + reset *txpoolResetRequest + dirtyAccounts *accountSet + queuedEvents = make(map[common.InternalAddress]*txSortedMap) + reorgCancelCh = make(chan struct{}) + queuedQiTxs = make([]*types.Transaction, 0) + runReorgTicker = time.NewTicker(pool.config.ReorgFrequency) ) for { // Launch next background reorg if needed @@ -1485,8 +1443,10 @@ func (pool *TxPool) scheduleReorgLoop() { } else { dirtyAccounts.merge(req) } + + case <-runReorgTicker.C: + // Timer tick: launch the next reorg run launchNextRun = true - pool.reorgDoneCh <- nextDone case tx := <-pool.queueTxEventCh: // Queue up the event, but don't schedule a reorg. It's up to the caller to