diff --git a/Cargo.lock b/Cargo.lock index 4cbb377b..80179813 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1128,6 +1128,7 @@ dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", + "serde", "winapi", ] @@ -1558,8 +1559,18 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + +[[package]] +name = "darling" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +dependencies = [ + "darling_core 0.20.3", + "darling_macro 0.20.3", ] [[package]] @@ -1576,17 +1587,42 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "darling_core" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2 1.0.66", + "quote 1.0.32", + "strsim", + "syn 2.0.27", +] + [[package]] name = "darling_macro" version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ - "darling_core", + "darling_core 0.13.4", "quote 1.0.32", "syn 1.0.109", ] +[[package]] +name = "darling_macro" +version = "0.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +dependencies = [ + "darling_core 0.20.3", + "quote 1.0.32", + "syn 2.0.27", +] + [[package]] name = "dashmap" version = "5.4.0" @@ -2386,7 +2422,7 @@ dependencies = [ "rustc_version 0.2.3", "serde", "serde_json", - "serde_with", + "serde_with 1.14.0", "url", "void", ] @@ -3374,6 +3410,7 @@ checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" dependencies = [ "equivalent", "hashbrown 0.14.0", + "serde", ] [[package]] @@ -4304,6 +4341,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serde_with 3.3.0", "strum", "strum_macros", "thiserror", @@ -5719,7 +5757,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" dependencies = [ "serde", - "serde_with_macros", + "serde_with_macros 1.5.2", +] + +[[package]] +name = "serde_with" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca3b16a3d82c4088f343b7480a93550b3eabe1a358569c2dfe38bbcead07237" +dependencies = [ + "base64 0.21.0", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.0.0", + "serde", + "serde_json", + "serde_with_macros 3.3.0", + "time", ] [[package]] @@ -5728,12 +5783,24 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" dependencies = [ - "darling", + "darling 0.13.4", "proc-macro2 1.0.66", "quote 1.0.32", "syn 1.0.109", ] +[[package]] +name = "serde_with_macros" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e6be15c453eb305019bfa438b1593c731f36a289a7853f7707ee29e870b3b3c" +dependencies = [ + "darling 0.20.3", + "proc-macro2 1.0.66", + "quote 1.0.32", + "syn 2.0.27", +] + [[package]] name = "sha-1" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index b6de593e..4955e901 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ sink-gcp-pubsub = ["google-cloud-pubsub", "google-cloud-googleapis", "google-clo sink-gcp-cloudfunction = ["reqwest", "jsonwebtoken"] sink-redis = ["r2d2_redis"] sink-elasticsearch = ["elasticsearch"] -source-utxorpc = ["tonic","futures"] +source-utxorpc = ["tonic", "futures"] [dependencies] pallas = "0.19.0-alpha.1" @@ -78,4 +78,5 @@ jsonwebtoken = { version = "8.3.0", optional = true } file-rotate = { version = "0.7.5", optional = true } tonic = { version = "0.9.2", features = ["tls", "tls-roots"], optional = true} futures = { version = "0.3.28", optional = true } +serde_with = { version = "3.3.0", features = ["macros"] } diff --git a/src/filters/match_pattern/eval.rs b/src/filters/match_pattern/eval.rs index 3fbe167f..ce5b54dc 100644 --- a/src/filters/match_pattern/eval.rs +++ b/src/filters/match_pattern/eval.rs @@ -1,12 +1,14 @@ -use pallas::ledger::addresses::Address; +use pallas::{ledger::addresses::Address, network::miniprotocols::Point}; use thiserror::Error; -use utxorpc::proto::cardano::v1::{Asset, Multiasset, PlutusData, Tx, TxInput, TxOutput}; +use utxorpc::proto::cardano::v1::{ + Asset, Multiasset, PlutusData, Tx, TxInput, TxOutput, Withdrawal, +}; use crate::framework::Record; use super::{ - AddressPattern, AssetPattern, DatumPattern, InputPattern, OutputPattern, QuantityPattern, - TxPredicate, UtxoRefPattern, + AddressPattern, AssetPattern, BlockPattern, DatumPattern, InputPattern, OutputPattern, + QuantityPattern, TxPredicate, UtxoRefPattern, WithdrawalPattern, }; fn eval_quantity_matches(value: u64, pattern: &QuantityPattern) -> EvalResult { @@ -14,7 +16,7 @@ fn eval_quantity_matches(value: u64, pattern: &QuantityPattern) -> EvalResult { QuantityPattern::Equals(expected) => value.eq(expected), QuantityPattern::RangeInclusive(a, b) => value.ge(a) && value.le(b), QuantityPattern::Greater(a) => value.gt(a), - QuantityPattern::GreaterOrEqual(_) => value.ge(a), + QuantityPattern::GreaterOrEqual(a) => value.ge(a), QuantityPattern::Lower(a) => value.lt(a), QuantityPattern::LowerOrEqual(b) => value.le(b), }; @@ -23,13 +25,13 @@ fn eval_quantity_matches(value: u64, pattern: &QuantityPattern) -> EvalResult { } fn eval_block_matches(point: &Point, pattern: &BlockPattern) -> EvalResult { - if let Some(slot_after) = block_pattern.slot_after { + if let Some(slot_after) = pattern.slot_after { if point.slot_or_default() <= slot_after { return Ok(false); } } - if let Some(slot_before) = block_pattern.slot_before { + if let Some(slot_before) = pattern.slot_before { if point.slot_or_default() >= slot_before { return Ok(false); } @@ -42,21 +44,23 @@ fn eval_address_matches(addr: &[u8], pattern: &AddressPattern) -> EvalResult { let addr = Address::from_bytes(addr).map_err(|_| Error::inconclusive("can't parse address bytes"))?; - match (pattern, addr) { - (AddressPattern::Exact(expected), _) => address.eq(expected), + let is_match = match (pattern, &addr) { + (AddressPattern::Exact(expected), _) => addr.eq(expected), (AddressPattern::Payment(expected), Address::Shelley(shelley)) => { - Ok(shelley.payment().eq(expected)) + shelley.payment().eq(expected) } (AddressPattern::Delegation(expected), Address::Shelley(shelley)) => { - Ok(shelley.delegation().eq(expected)) + shelley.delegation().eq(expected) } - _ => Ok(false), - } + _ => false, + }; + + Ok(is_match) } fn eval_datum_matches(datum_hash: &[u8], pattern: &DatumPattern) -> EvalResult { if let Some(expected) = pattern.hash { - let eval = datum_hash.eq(&expected); + let eval = datum_hash.as_ref().eq(expected.as_ref()); if !eval { return Ok(false); @@ -67,15 +71,15 @@ fn eval_datum_matches(datum_hash: &[u8], pattern: &DatumPattern) -> EvalResult { } fn eval_asset_matches(policy: &[u8], asset: &Asset, pattern: &AssetPattern) -> EvalResult { - if Some(pattern) = &pattern.policy { - let eval = policy.eq(&pattern); + if let Some(pattern) = &pattern.policy { + let eval = policy.eq(pattern.as_slice()); if !eval { return Ok(false); } } - if Some(pattern) = &pattern.quantity { + if let Some(pattern) = &pattern.quantity { let eval = eval_quantity_matches(asset.output_coin, pattern)?; if !eval { @@ -118,7 +122,7 @@ fn eval_output_matches(output: &TxOutput, pattern: &OutputPattern) -> EvalResult } if let Some(pattern) = &pattern.assets { - let eval = eval_some_asset_matches(&output.assets, patter)?; + let eval = eval_some_asset_matches(&output.assets, pattern)?; if !eval { return Ok(false); @@ -128,7 +132,7 @@ fn eval_output_matches(output: &TxOutput, pattern: &OutputPattern) -> EvalResult Ok(true) } -fn eval_some_output_matches(tx: &Tx, pattern: &OutputPattern) -> Result { +fn eval_some_output_matches(tx: &Tx, pattern: &OutputPattern) -> EvalResult { for output in tx.outputs.iter() { let eval = eval_output_matches(output, pattern)?; @@ -140,9 +144,21 @@ fn eval_some_output_matches(tx: &Tx, pattern: &OutputPattern) -> Result EvalResult { +fn eval_withdrawal_matches(withdrawal: &Withdrawal, pattern: &WithdrawalPattern) -> EvalResult { + if let Some(pattern) = &pattern.quantity { + let eval = eval_quantity_matches(withdrawal.coin, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + +fn eval_some_withdrawal_matches(tx: &Tx, pattern: &WithdrawalPattern) -> EvalResult { for withdrawal in tx.withdrawals.iter() { - let eval = eval_output_matches(withdrawal, pattern)?; + let eval = eval_withdrawal_matches(withdrawal, pattern)?; if eval { return Ok(true); @@ -168,8 +184,8 @@ fn eval_some_collateral_matches(tx: &Tx, pattern: &InputPattern) -> EvalResult { fn eval_collateral_return_matches(tx: &Tx, pattern: &OutputPattern) -> EvalResult { if let Some(collateral) = &tx.collateral { - if let Some(return_) = collateral.collateral_return { - let eval = eval_output_matches(&return_, pattern)?; + if let Some(return_) = &collateral.collateral_return { + let eval = eval_output_matches(return_, pattern)?; if eval { return Ok(true); @@ -180,9 +196,21 @@ fn eval_collateral_return_matches(tx: &Tx, pattern: &OutputPattern) -> EvalResul Ok(false) } +fn eval_total_collateral_matches(tx: &Tx, pattern: &QuantityPattern) -> EvalResult { + if let Some(collateral) = &tx.collateral { + let eval = eval_quantity_matches(collateral.total_collateral, pattern)?; + + if eval { + return Ok(true); + } + } + + Ok(false) +} + fn eval_input_utxoref_matches(input: &TxInput, pattern: &UtxoRefPattern) -> EvalResult { if let Some(pattern) = &pattern.tx_hash { - let eval = input.tx_hash.as_ref().eq(&pattern); + let eval = input.tx_hash.as_ref().eq(pattern.as_ref()); if !eval { return Ok(false); @@ -201,7 +229,7 @@ fn eval_input_utxoref_matches(input: &TxInput, pattern: &UtxoRefPattern) -> Eval } fn eval_input_matches(input: &TxInput, pattern: &InputPattern) -> EvalResult { - if let Some(pattern) = pattern.utxo { + if let Some(pattern) = &pattern.utxo { let eval = eval_input_utxoref_matches(input, &pattern)?; if !eval { @@ -210,7 +238,7 @@ fn eval_input_matches(input: &TxInput, pattern: &InputPattern) -> EvalResult { } if let Some(pattern) = &pattern.from { - let output = input.as_output.ok_or(Error::no_input_ref())?; + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; let eval = eval_address_matches(&output.address, &pattern)?; @@ -220,7 +248,7 @@ fn eval_input_matches(input: &TxInput, pattern: &InputPattern) -> EvalResult { } if let Some(pattern) = &pattern.datum { - let output = input.as_output.ok_or(Error::no_input_ref())?; + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; let eval = eval_datum_matches(&output.datum_hash, &pattern)?; @@ -230,9 +258,9 @@ fn eval_input_matches(input: &TxInput, pattern: &InputPattern) -> EvalResult { } if let Some(pattern) = &pattern.assets { - let output = input.as_output.ok_or(Error::no_input_ref())?; + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; - let eval = eval_some_asset_matches(&output.assets, patter)?; + let eval = eval_some_asset_matches(&output.assets, pattern)?; if !eval { return Ok(false); @@ -256,7 +284,7 @@ fn eval_some_input_matches(tx: &Tx, pattern: &InputPattern) -> EvalResult { fn eval_some_input_address_matches(tx: &Tx, pattern: &AddressPattern) -> EvalResult { for input in tx.inputs.iter() { - let output = input.as_output.ok_or(Error::no_input_ref())?; + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; let eval = eval_address_matches(&output.address, pattern)?; @@ -270,13 +298,15 @@ fn eval_some_input_address_matches(tx: &Tx, pattern: &AddressPattern) -> EvalRes fn eval_some_input_asset_matches(tx: &Tx, pattern: &AssetPattern) -> EvalResult { for input in tx.inputs.iter() { - let output = input.as_output.ok_or(Error::no_input_ref())?; + let output = input.as_output.as_ref().ok_or(Error::no_input_ref())?; - for multiasset in output.assets { - let eval = eval_asset_matches(&multiasset.policy_id, asset, pattern)?; + for multiasset in output.assets.iter() { + for asset in multiasset.assets.iter() { + let eval = eval_asset_matches(&multiasset.policy_id, asset, pattern)?; - if eval { - return Ok(true); + if eval { + return Ok(true); + } } } } @@ -297,7 +327,7 @@ fn eval_some_output_address_matches(tx: &Tx, pattern: &AddressPattern) -> EvalRe } fn eval_tx_any_of(predicates: &[TxPredicate], point: &Point, tx: &Tx) -> EvalResult { - for p in x { + for p in predicates.iter() { if eval_tx(p, point, tx)? { return Ok(true); }; @@ -307,8 +337,8 @@ fn eval_tx_any_of(predicates: &[TxPredicate], point: &Point, tx: &Tx) -> EvalRes } fn eval_tx_all_of(predicates: &[TxPredicate], point: &Point, tx: &Tx) -> EvalResult { - for p in x { - if !p.tx_match(point, tx)? { + for p in predicates.iter() { + if !eval_tx(p, point, tx)? { return Ok(false); }; } @@ -354,13 +384,13 @@ fn eval_tx(predicate: &TxPredicate, point: &Point, tx: &Tx) -> EvalResult { #[derive(Error, Debug)] pub enum Error { - #[error("predicate evaluation is inconclusive {}")] + #[error("predicate evaluation is inconclusive {0}")] Inconclusive(String), } impl Error { pub fn inconclusive(msg: impl Into) -> Self { - Self::inconclusive(msg.into()) + Self::Inconclusive(msg.into()) } pub fn no_input_ref() -> Self { @@ -370,7 +400,7 @@ impl Error { pub type EvalResult = Result; -pub fn eval(predicate: &TxPredicate, point: &Point, record: Record) -> EvalResult { +pub fn eval(predicate: &TxPredicate, point: &Point, record: &Record) -> EvalResult { match record { Record::ParsedTx(tx) => eval_tx(predicate, point, &tx), _ => Err(Error::inconclusive( diff --git a/src/filters/match_pattern/mod.rs b/src/filters/match_pattern/mod.rs index 77d0c3a6..595bac3c 100644 --- a/src/filters/match_pattern/mod.rs +++ b/src/filters/match_pattern/mod.rs @@ -1,14 +1,14 @@ +use std::str::FromStr; + use gasket::framework::*; use pallas::{ crypto::hash::Hash, - ledger::addresses::{ - Address, PaymentKeyHash, ShelleyDelegationPart, ShelleyPaymentPart, StakeAddress, - StakeKeyHash, - }, + ledger::addresses::{Address, ShelleyDelegationPart, ShelleyPaymentPart}, network::miniprotocols::Point, }; use serde::Deserialize; -use tracing::error; +use serde_with::DeserializeFromStr; +use tracing::{error, warn}; mod eval; @@ -17,11 +17,20 @@ use crate::framework::*; #[derive(Stage)] #[stage(name = "filter-match-pattern", unit = "ChainEvent", worker = "Worker")] pub struct Stage { - predicate: Predicate, + predicate: TxPredicate, pub input: FilterInputPort, pub output: FilterOutputPort, + #[metric] + pass_count: gasket::metrics::Counter, + + #[metric] + drop_count: gasket::metrics::Counter, + + #[metric] + inconclusive_count: gasket::metrics::Counter, + #[metric] ops_count: gasket::metrics::Counter, } @@ -34,27 +43,36 @@ impl From<&Stage> for Worker { } } -gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { - let out = match unit { - ChainEvent::Apply(point, record) => match record { - Record::ParsedTx(tx) => { - if stage.predicate.tx_match(point, tx)? { - Ok(Some(unit.to_owned())) - } else { - Ok(None) - } - }, - _ => { - error!("The MatchPattern filter is valid only with the ParsedTx record"); - Err(WorkerError::Panic) +fn eval_record(stage: &Stage, point: &Point, record: &Record) -> Option { + match eval::eval(&stage.predicate, point, record) { + Ok(pass) => { + if pass { + stage.pass_count.inc(1); + Some(record.clone()) + } else { + stage.drop_count.inc(1); + None } - }, - _ => Ok(Some(unit.to_owned())) - }?; + } + Err(eval::Error::Inconclusive(msg)) => { + warn!(msg); + stage.inconclusive_count.inc(1); + None + } + } +} +gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { stage.ops_count.inc(1); - out + if let Some(record) = unit.record() { + eval_record(stage, unit.point(), record) + .map(|x| unit.new_record(x)) + .map(|x| vec![x]) + .unwrap_or(vec![]) + } else { + vec![unit.clone()] + } }); #[derive(Clone, Debug)] @@ -87,6 +105,12 @@ pub struct DatumPattern { hash: Option>, } +#[derive(Clone, Debug)] +pub struct WithdrawalPattern { + quantity: Option, + // reward account pattern? +} + #[derive(Clone, Debug)] pub struct AssetPattern { policy: Option>, @@ -119,12 +143,11 @@ pub struct OutputPattern { #[derive(Clone, Debug)] pub struct MetadataPattern { label: Option, - key: Option, - value: Option, + key: Option, + value: Option, } -#[derive(Clone, Debug)] -#[serde(rename_all = "snake_case")] +#[derive(Clone, Debug, DeserializeFromStr)] pub enum TxPredicate { HashEquals(Option>), IsValid(Option), @@ -145,13 +168,21 @@ pub enum TxPredicate { SomeCollateralMatches(InputPattern), CollateralReturnMatches(OutputPattern), TotalCollateralMatches(QuantityPattern), - SomeWithdrawalMatches(OutputPattern), + SomeWithdrawalMatches(WithdrawalPattern), SomeAddressMatches(AddressPattern), Not(Box), AnyOf(Vec), AllOf(Vec), } +impl FromStr for TxPredicate { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + todo!() + } +} + #[derive(Deserialize)] pub struct Config { pub predicate: TxPredicate, @@ -162,6 +193,9 @@ impl Config { let stage = Stage { predicate: self.predicate, ops_count: Default::default(), + pass_count: Default::default(), + drop_count: Default::default(), + inconclusive_count: Default::default(), input: Default::default(), output: Default::default(), }; diff --git a/src/framework/mod.rs b/src/framework/mod.rs index 6e8c2c71..f2ba6487 100644 --- a/src/framework/mod.rs +++ b/src/framework/mod.rs @@ -155,6 +155,14 @@ impl ChainEvent { Ok(out) } + + pub fn new_record(&self, new: Record) -> Self { + match self { + ChainEvent::Apply(p, _) => ChainEvent::Apply(p.clone(), new), + ChainEvent::Undo(p, _) => ChainEvent::Undo(p.clone(), new), + ChainEvent::Reset(p) => ChainEvent::Reset(p.clone()), + } + } } fn point_to_json(point: Point) -> JsonValue { diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index ed4e79a6..52a7ad7a 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -4,11 +4,11 @@ use serde::Deserialize; use crate::framework::*; //pub mod assert; +mod assert; mod common; mod noop; mod stdout; mod terminal; -mod assert; #[cfg(feature = "sink-file-rotate")] mod file_rotate;