Skip to content

Commit

Permalink
Merge branch 'main' of github.com:txpipe/oura
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Sep 9, 2023
2 parents 6b4ffc6 + aba91d0 commit 1b23688
Show file tree
Hide file tree
Showing 20 changed files with 80 additions and 53 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ jobs:
profile: minimal
target: ${{ matrix.target }}

- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Build | Build
uses: actions-rs/[email protected]
with:
Expand Down
13 changes: 11 additions & 2 deletions .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
on:
push:
branches:
- '**'
- "**"
tags-ignore:
- v*
- v*

name: Validate

Expand All @@ -29,6 +29,9 @@ jobs:
toolchain: ${{ matrix.rust }}
override: true

- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Run cargo check
uses: actions-rs/cargo@v1
with:
Expand All @@ -49,6 +52,9 @@ jobs:
toolchain: stable
override: true

- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Run cargo test
uses: actions-rs/cargo@v1
with:
Expand All @@ -70,6 +76,9 @@ jobs:
override: true
components: rustfmt, clippy

- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Run cargo fmt
uses: actions-rs/cargo@v1
with:
Expand Down
6 changes: 3 additions & 3 deletions src/filters/deno/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ pub struct Worker {
runtime: WrappedRuntime,
}

const SYNC_CALL_SNIPPET: &'static str = r#"Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record()));"#;
const SYNC_CALL_SNIPPET: &str = r#"Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record()));"#;

const ASYNC_CALL_SNIPPET: &'static str = r#"mapEvent(Deno[Deno.internal].core.ops.op_pop_record()).then(x => Deno[Deno.internal].core.ops.op_put_record(x));"#;
const ASYNC_CALL_SNIPPET: &str = r#"mapEvent(Deno[Deno.internal].core.ops.op_pop_record()).then(x => Deno[Deno.internal].core.ops.op_put_record(x));"#;

impl Worker {
async fn map_record(
Expand Down Expand Up @@ -119,7 +119,7 @@ impl gasket::framework::Worker<Stage> for Worker {
match unit {
ChainEvent::Apply(p, r) => {
let mapped = self
.map_record(stage.call_snippet.clone(), r.clone())
.map_record(stage.call_snippet, r.clone())
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/filters/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
10 changes: 3 additions & 7 deletions src/filters/legacy_v1/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl EventWriter<'_> {
record.output_count = outputs.len();
record.total_output = outputs.iter().map(|o| o.amount).sum();

let inputs: Vec<_> = tx.inputs().iter().map(|x| TxInputRecord::from(x)).collect();
let inputs: Vec<_> = tx.inputs().iter().map(TxInputRecord::from).collect();

record.input_count = inputs.len();

Expand All @@ -207,11 +207,7 @@ impl EventWriter<'_> {

record.mint_count = mints.len();

let collateral_inputs: Vec<_> = tx
.collateral()
.iter()
.map(|x| TxInputRecord::from(x))
.collect();
let collateral_inputs: Vec<_> = tx.collateral().iter().map(TxInputRecord::from).collect();

record.collateral_input_count = collateral_inputs.len();

Expand Down Expand Up @@ -280,7 +276,7 @@ impl EventWriter<'_> {
record.plutus_data = tx
.plutus_data()
.iter()
.map(|x| PlutusDatumRecord::from(x))
.map(PlutusDatumRecord::from)
.collect::<Vec<_>>()
.into();

Expand Down
4 changes: 2 additions & 2 deletions src/filters/legacy_v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand All @@ -48,7 +48,7 @@ gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
&mut buffer,
);

writer.crawl_cbor(&cbor)?;
writer.crawl_cbor(cbor)?;
}
ChainEvent::Reset(point) => {
let mut writer = EventWriter::new(
Expand Down
28 changes: 21 additions & 7 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use serde::Deserialize;

use crate::framework::*;

pub mod deno;
pub mod dsl;
pub mod json;
pub mod legacy_v1;
Expand All @@ -16,16 +15,21 @@ pub mod parse_cbor;
pub mod split_block;
pub mod wasm;

#[cfg(feature = "deno")]
pub mod deno;

pub enum Bootstrapper {
Noop(noop::Stage),
SplitBlock(split_block::Stage),
Dsl(dsl::Stage),
Json(json::Stage),
LegacyV1(legacy_v1::Stage),
Wasm(wasm::Stage),
Deno(deno::Stage),
ParseCbor(parse_cbor::Stage),
MatchPattern(match_pattern::Stage),

#[cfg(feature = "deno")]
Deno(deno::Stage),
}

impl StageBootstrapper for Bootstrapper {
Expand All @@ -37,9 +41,11 @@ impl StageBootstrapper for Bootstrapper {
Bootstrapper::Json(p) => p.input.connect(adapter),
Bootstrapper::LegacyV1(p) => p.input.connect(adapter),
Bootstrapper::Wasm(p) => p.input.connect(adapter),
Bootstrapper::Deno(p) => p.input.connect(adapter),
Bootstrapper::ParseCbor(p) => p.input.connect(adapter),
Bootstrapper::MatchPattern(p) => p.input.connect(adapter),

#[cfg(feature = "deno")]
Bootstrapper::Deno(p) => p.input.connect(adapter),
}
}

Expand All @@ -51,9 +57,11 @@ impl StageBootstrapper for Bootstrapper {
Bootstrapper::Json(p) => p.output.connect(adapter),
Bootstrapper::LegacyV1(p) => p.output.connect(adapter),
Bootstrapper::Wasm(p) => p.output.connect(adapter),
Bootstrapper::Deno(p) => p.output.connect(adapter),
Bootstrapper::ParseCbor(p) => p.output.connect(adapter),
Bootstrapper::MatchPattern(p) => p.output.connect(adapter),

#[cfg(feature = "deno")]
Bootstrapper::Deno(p) => p.output.connect(adapter),
}
}

Expand All @@ -65,9 +73,11 @@ impl StageBootstrapper for Bootstrapper {
Bootstrapper::Json(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::LegacyV1(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Wasm(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Deno(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::ParseCbor(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::MatchPattern(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "deno")]
Bootstrapper::Deno(x) => gasket::runtime::spawn_stage(x, policy),
}
}
}
Expand All @@ -81,9 +91,11 @@ pub enum Config {
Json(json::Config),
LegacyV1(legacy_v1::Config),
Wasm(wasm::Config),
Deno(deno::Config),
ParseCbor(parse_cbor::Config),
MatchPattern(match_pattern::Config),

#[cfg(feature = "deno")]
Deno(deno::Config),
}

impl Config {
Expand All @@ -95,9 +107,11 @@ impl Config {
Config::Json(c) => Ok(Bootstrapper::Json(c.bootstrapper(ctx)?)),
Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)),
Config::Wasm(c) => Ok(Bootstrapper::Wasm(c.bootstrapper(ctx)?)),
Config::Deno(c) => Ok(Bootstrapper::Deno(c.bootstrapper(ctx)?)),
Config::ParseCbor(c) => Ok(Bootstrapper::ParseCbor(c.bootstrapper(ctx)?)),
Config::MatchPattern(c) => Ok(Bootstrapper::MatchPattern(c.bootstrapper(ctx)?)),

#[cfg(feature = "deno")]
Config::Deno(c) => Ok(Bootstrapper::Deno(c.bootstrapper(ctx)?)),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/filters/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters/parse_cbor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters/split_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/sinks/file_rotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ pub struct Stage {

#[derive(Debug, Deserialize, Clone)]
pub enum Format {
JSONL,
#[serde(rename = "JSONL")]
Jsonl,
}

#[derive(Default, Debug, Deserialize)]
Expand Down
12 changes: 10 additions & 2 deletions src/sinks/gcp_cloudfunction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ struct Claims {
pub iat: u64,
pub exp: u64,
}

impl Claims {
pub fn new(audience: &String, credentials: &Credentials) -> Self {
pub fn new(audience: &str, credentials: &Credentials) -> Self {
let iat = jsonwebtoken::get_current_timestamp();
let exp = iat + 60;

Self {
iss: credentials.client_email.clone(),
aud: credentials.token_uri.clone(),
target_audience: audience.clone(),
target_audience: audience.to_owned(),
iat,
exp,
}
Expand All @@ -47,6 +49,7 @@ struct Credentials {
pub token_uri: String,
pub private_key: String,
}

impl TryFrom<serde_json::Value> for Credentials {
type Error = Error;

Expand Down Expand Up @@ -77,6 +80,7 @@ pub struct GCPAuth {
audience: String,
token: Option<String>,
}

impl GCPAuth {
pub fn try_new(audience: String) -> Result<Self, Error> {
let client = reqwest::ClientBuilder::new()
Expand Down Expand Up @@ -267,21 +271,25 @@ impl From<std::env::VarError> for Error {
Error::Config(value.to_string())
}
}

impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
Error::Config(value.to_string())
}
}

impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
Error::Config(value.to_string())
}
}

impl From<reqwest::Error> for Error {
fn from(value: reqwest::Error) -> Self {
Error::Custom(value.to_string())
}
}

impl From<jsonwebtoken::errors::Error> for Error {
fn from(value: jsonwebtoken::errors::Error) -> Self {
Error::Custom(value.to_string())
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl gasket::framework::Worker<Stage> for Worker {
self.channel
.basic_publish(
&stage.config.exchange,
&stage.config.routing_key.clone().unwrap_or(String::new()),
&stage.config.routing_key.clone().unwrap_or_default(),
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
Expand Down
2 changes: 0 additions & 2 deletions src/sinks/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ impl gasket::framework::Worker<Stage> for Worker {
#[derive(Stage)]
#[stage(name = "sink-stdout", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
config: Config,
cursor: Cursor,

pub input: MapperInputPort,
Expand All @@ -66,7 +65,6 @@ pub struct Config;
impl Config {
pub fn bootstrapper(self, ctx: &Context) -> Result<Stage, Error> {
let stage = Stage {
config: self,
cursor: ctx.cursor.clone(),
ops_count: Default::default(),
latest_block: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/terminal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ impl gasket::framework::Worker<Stage> for Worker {

let line = match unit {
ChainEvent::Apply(_, record) => {
LogLine::new_apply(&record, width, &stage.config.adahandle_policy)
LogLine::new_apply(record, width, &stage.config.adahandle_policy)
}
ChainEvent::Undo(_, record) => {
LogLine::new_undo(&record, width, &stage.config.adahandle_policy)
LogLine::new_undo(record, width, &stage.config.adahandle_policy)
}
ChainEvent::Reset(point) => LogLine::new_reset(point.clone()),
};
Expand Down
4 changes: 2 additions & 2 deletions src/sources/n2c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Worker {
) -> Result<(), WorkerError> {
match next {
NextResponse::RollForward(cbor, tip) => {
let block = MultiEraBlock::decode(&cbor).or_panic()?;
let block = MultiEraBlock::decode(cbor).or_panic()?;
let slot = block.slot();
let hash = block.hash();

Expand All @@ -114,7 +114,7 @@ impl Worker {

stage
.output
.send(ChainEvent::reset(point.clone()).into())
.send(ChainEvent::reset(point.clone()))
.await
.or_panic()?;

Expand Down
Loading

0 comments on commit 1b23688

Please sign in to comment.