diff --git a/src/filters/deno/mod.rs b/src/filters/deno/mod.rs index 28402f5b..46c320fc 100644 --- a/src/filters/deno/mod.rs +++ b/src/filters/deno/mod.rs @@ -69,9 +69,9 @@ pub struct Worker { runtime: WrappedRuntime, } -const SYNC_CALL_SNIPPET: &'static str = r#"Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record()));"#; +const SYNC_CALL_SNIPPET: &str = r#"Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record()));"#; -const ASYNC_CALL_SNIPPET: &'static str = r#"mapEvent(Deno[Deno.internal].core.ops.op_pop_record()).then(x => Deno[Deno.internal].core.ops.op_put_record(x));"#; +const ASYNC_CALL_SNIPPET: &str = r#"mapEvent(Deno[Deno.internal].core.ops.op_pop_record()).then(x => Deno[Deno.internal].core.ops.op_put_record(x));"#; impl Worker { async fn map_record( diff --git a/src/filters/dsl.rs b/src/filters/dsl.rs index 76edd76a..76bb5fce 100644 --- a/src/filters/dsl.rs +++ b/src/filters/dsl.rs @@ -231,7 +231,7 @@ pub struct Worker; impl From<&Stage> for Worker { fn from(_: &Stage) -> Self { - Worker::default() + Self } } diff --git a/src/filters/json.rs b/src/filters/json.rs index 29078ee7..1a7aa0ab 100644 --- a/src/filters/json.rs +++ b/src/filters/json.rs @@ -20,7 +20,7 @@ pub struct Worker; impl From<&Stage> for Worker { fn from(_: &Stage) -> Self { - Worker::default() + Self } } diff --git a/src/filters/legacy_v1/map.rs b/src/filters/legacy_v1/map.rs index bd2513e1..eff7791e 100644 --- a/src/filters/legacy_v1/map.rs +++ b/src/filters/legacy_v1/map.rs @@ -194,7 +194,7 @@ impl EventWriter<'_> { record.output_count = outputs.len(); record.total_output = outputs.iter().map(|o| o.amount).sum(); - let inputs: Vec<_> = tx.inputs().iter().map(|x| TxInputRecord::from(x)).collect(); + let inputs: Vec<_> = tx.inputs().iter().map(TxInputRecord::from).collect(); record.input_count = inputs.len(); @@ -207,11 +207,7 @@ impl EventWriter<'_> { record.mint_count = mints.len(); - let collateral_inputs: Vec<_> = tx - .collateral() - .iter() - .map(|x| TxInputRecord::from(x)) - .collect(); + let collateral_inputs: Vec<_> = tx.collateral().iter().map(TxInputRecord::from).collect(); record.collateral_input_count = collateral_inputs.len(); @@ -280,7 +276,7 @@ impl EventWriter<'_> { record.plutus_data = tx .plutus_data() .iter() - .map(|x| PlutusDatumRecord::from(x)) + .map(PlutusDatumRecord::from) .collect::>() .into(); diff --git a/src/filters/legacy_v1/mod.rs b/src/filters/legacy_v1/mod.rs index 4163454b..d8d5247c 100644 --- a/src/filters/legacy_v1/mod.rs +++ b/src/filters/legacy_v1/mod.rs @@ -31,7 +31,7 @@ pub struct Worker; impl From<&Stage> for Worker { fn from(_: &Stage) -> Self { - Worker::default() + Self } } @@ -48,7 +48,7 @@ gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { &mut buffer, ); - writer.crawl_cbor(&cbor)?; + writer.crawl_cbor(cbor)?; } ChainEvent::Reset(point) => { let mut writer = EventWriter::new( diff --git a/src/filters/noop.rs b/src/filters/noop.rs index e6ff472e..a15ff0f3 100644 --- a/src/filters/noop.rs +++ b/src/filters/noop.rs @@ -20,7 +20,7 @@ pub struct Worker; impl From<&Stage> for Worker { fn from(_: &Stage) -> Self { - Worker::default() + Self } } diff --git a/src/filters/parse_cbor.rs b/src/filters/parse_cbor.rs index e4776316..1df9b0f0 100644 --- a/src/filters/parse_cbor.rs +++ b/src/filters/parse_cbor.rs @@ -33,7 +33,7 @@ pub struct Worker; impl From<&Stage> for Worker { fn from(_: &Stage) -> Self { - Worker::default() + Self } } diff --git a/src/filters/split_block.rs b/src/filters/split_block.rs index 413b841f..f15cb166 100644 --- a/src/filters/split_block.rs +++ b/src/filters/split_block.rs @@ -39,7 +39,7 @@ pub struct Worker; impl From<&Stage> for Worker { fn from(_: &Stage) -> Self { - Worker::default() + Self } } diff --git a/src/filters/wasm.rs b/src/filters/wasm.rs index d1cd650d..48c9beb0 100644 --- a/src/filters/wasm.rs +++ b/src/filters/wasm.rs @@ -20,7 +20,7 @@ pub struct Worker; impl From<&Stage> for Worker { fn from(_: &Stage) -> Self { - Worker::default() + Self } } diff --git a/src/sinks/file_rotate.rs b/src/sinks/file_rotate.rs index 98d96d79..8eb527d9 100644 --- a/src/sinks/file_rotate.rs +++ b/src/sinks/file_rotate.rs @@ -97,7 +97,8 @@ pub struct Stage { #[derive(Debug, Deserialize, Clone)] pub enum Format { - JSONL, + #[serde(rename = "JSONL")] + Jsonl, } #[derive(Default, Debug, Deserialize)] diff --git a/src/sinks/gcp_cloudfunction.rs b/src/sinks/gcp_cloudfunction.rs index 2045f705..f3483cbf 100644 --- a/src/sinks/gcp_cloudfunction.rs +++ b/src/sinks/gcp_cloudfunction.rs @@ -23,14 +23,16 @@ struct Claims { pub iat: u64, pub exp: u64, } + impl Claims { - pub fn new(audience: &String, credentials: &Credentials) -> Self { + pub fn new(audience: &str, credentials: &Credentials) -> Self { let iat = jsonwebtoken::get_current_timestamp(); let exp = iat + 60; + Self { iss: credentials.client_email.clone(), aud: credentials.token_uri.clone(), - target_audience: audience.clone(), + target_audience: audience.to_owned(), iat, exp, } @@ -47,6 +49,7 @@ struct Credentials { pub token_uri: String, pub private_key: String, } + impl TryFrom for Credentials { type Error = Error; @@ -77,6 +80,7 @@ pub struct GCPAuth { audience: String, token: Option, } + impl GCPAuth { pub fn try_new(audience: String) -> Result { let client = reqwest::ClientBuilder::new() @@ -267,21 +271,25 @@ impl From for Error { Error::Config(value.to_string()) } } + impl From for Error { fn from(value: std::io::Error) -> Self { Error::Config(value.to_string()) } } + impl From for Error { fn from(value: serde_json::Error) -> Self { Error::Config(value.to_string()) } } + impl From for Error { fn from(value: reqwest::Error) -> Self { Error::Custom(value.to_string()) } } + impl From for Error { fn from(value: jsonwebtoken::errors::Error) -> Self { Error::Custom(value.to_string()) diff --git a/src/sinks/rabbitmq.rs b/src/sinks/rabbitmq.rs index 841a911c..60bfef76 100644 --- a/src/sinks/rabbitmq.rs +++ b/src/sinks/rabbitmq.rs @@ -43,7 +43,7 @@ impl gasket::framework::Worker for Worker { self.channel .basic_publish( &stage.config.exchange, - &stage.config.routing_key.clone().unwrap_or(String::new()), + &stage.config.routing_key.clone().unwrap_or_default(), BasicPublishOptions::default(), &payload, BasicProperties::default(), diff --git a/src/sinks/stdout.rs b/src/sinks/stdout.rs index 10fdb55f..4cad7861 100644 --- a/src/sinks/stdout.rs +++ b/src/sinks/stdout.rs @@ -48,7 +48,6 @@ impl gasket::framework::Worker for Worker { #[derive(Stage)] #[stage(name = "sink-stdout", unit = "ChainEvent", worker = "Worker")] pub struct Stage { - config: Config, cursor: Cursor, pub input: MapperInputPort, @@ -66,7 +65,6 @@ pub struct Config; impl Config { pub fn bootstrapper(self, ctx: &Context) -> Result { let stage = Stage { - config: self, cursor: ctx.cursor.clone(), ops_count: Default::default(), latest_block: Default::default(), diff --git a/src/sinks/terminal/mod.rs b/src/sinks/terminal/mod.rs index d5cf81a0..1ec7050c 100644 --- a/src/sinks/terminal/mod.rs +++ b/src/sinks/terminal/mod.rs @@ -65,10 +65,10 @@ impl gasket::framework::Worker for Worker { let line = match unit { ChainEvent::Apply(_, record) => { - LogLine::new_apply(&record, width, &stage.config.adahandle_policy) + LogLine::new_apply(record, width, &stage.config.adahandle_policy) } ChainEvent::Undo(_, record) => { - LogLine::new_undo(&record, width, &stage.config.adahandle_policy) + LogLine::new_undo(record, width, &stage.config.adahandle_policy) } ChainEvent::Reset(point) => LogLine::new_reset(point.clone()), }; diff --git a/src/sources/n2c.rs b/src/sources/n2c.rs index 7c4844bf..d823e6fc 100644 --- a/src/sources/n2c.rs +++ b/src/sources/n2c.rs @@ -89,7 +89,7 @@ impl Worker { ) -> Result<(), WorkerError> { match next { NextResponse::RollForward(cbor, tip) => { - let block = MultiEraBlock::decode(&cbor).or_panic()?; + let block = MultiEraBlock::decode(cbor).or_panic()?; let slot = block.slot(); let hash = block.hash(); @@ -114,7 +114,7 @@ impl Worker { stage .output - .send(ChainEvent::reset(point.clone()).into()) + .send(ChainEvent::reset(point.clone())) .await .or_panic()?; diff --git a/src/sources/n2n.rs b/src/sources/n2n.rs index 31d375a0..f46bb2f3 100644 --- a/src/sources/n2n.rs +++ b/src/sources/n2n.rs @@ -128,7 +128,7 @@ impl Worker { stage .output - .send(ChainEvent::reset(point.clone()).into()) + .send(ChainEvent::reset(point.clone())) .await .or_panic()?; diff --git a/src/sources/utxorpc.rs b/src/sources/utxorpc.rs index 9c9c51a4..f72d913a 100644 --- a/src/sources/utxorpc.rs +++ b/src/sources/utxorpc.rs @@ -142,9 +142,11 @@ impl Worker { } async fn next_dump_history(&mut self) -> Result>, WorkerError> { - let mut dump_history_request = DumpHistoryRequest::default(); - dump_history_request.start_token = self.block_ref.clone(); - dump_history_request.max_items = self.max_items_per_page; + let dump_history_request = DumpHistoryRequest { + start_token: self.block_ref.clone(), + max_items: self.max_items_per_page, + ..Default::default() + }; let result = self .client @@ -156,11 +158,11 @@ impl Worker { self.block_ref = result.next_token; if !result.block.is_empty() { - let actions: Vec = result.block.into_iter().map(|b| Action::Apply(b)).collect(); + let actions: Vec = result.block.into_iter().map(Action::Apply).collect(); return Ok(WorkSchedule::Unit(actions)); } - return Ok(WorkSchedule::Idle); + Ok(WorkSchedule::Idle) } } @@ -185,14 +187,10 @@ impl gasket::framework::Worker for Worker { }; } - let block_ref = if let Some((slot, hash)) = point { - let mut block_ref = BlockRef::default(); - block_ref.index = slot; - block_ref.hash = hash.into(); - Some(block_ref) - } else { - None - }; + let block_ref = point.map(|(slot, hash)| BlockRef { + index: slot, + hash: hash.into(), + }); let max_items_per_page = stage.config.max_items_per_page.unwrap_or(20); @@ -206,10 +204,10 @@ impl gasket::framework::Worker for Worker { async fn schedule(&mut self, _: &mut Stage) -> Result>, WorkerError> { if self.block_ref.is_some() { - return Ok(self.next_dump_history().await?); + return self.next_dump_history().await; } - Ok(self.next_stream().await?) + self.next_stream().await } async fn execute(&mut self, unit: &Vec, stage: &mut Stage) -> Result<(), WorkerError> {