Skip to content

Commit

Permalink
feat: implement filter
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Apr 7, 2024
1 parent 7ec91c6 commit 50a7d34
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
1 change: 1 addition & 0 deletions examples/into_json/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
output
24 changes: 24 additions & 0 deletions examples/into_json/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]

[intersect]
type = "Point"
value = [4493860, "ce7f821d2140419fea1a7900cf71b0c0a0e94afbb1f814a6717cff071c3b6afc"]

[[filters]]
type = "SplitBlock"

[[filters]]
type = "ParseCbor"

[[filters]]
type = "IntoJson"

[sink]
type = "FileRotate"
max_total_files = 5
output_format = "JSONL"
output_path = "./output/logs.jsonl"
max_bytes_per_file = 5_000_000
compress_files = true
5 changes: 3 additions & 2 deletions src/filters/json.rs → src/filters/into_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

use gasket::framework::*;
use serde::Deserialize;
use serde_json::Value as JsonValue;

use crate::framework::*;

#[derive(Default, Stage)]
#[stage(name = "filter-json", unit = "ChainEvent", worker = "Worker")]
#[stage(name = "into-json", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
pub input: FilterInputPort,
pub output: FilterOutputPort,
Expand All @@ -25,7 +26,7 @@ impl From<&Stage> for Worker {
}

gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let out = unit.clone();
let out = unit.clone().map_record(|r| Record::GenericJson(JsonValue::from(r)));
stage.ops_count.inc(1);
out
});
Expand Down
14 changes: 7 additions & 7 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde::Deserialize;

use crate::framework::*;

pub mod json;
pub mod into_json;
pub mod legacy_v1;
pub mod noop;
pub mod parse_cbor;
Expand All @@ -19,7 +19,7 @@ pub mod deno;
pub enum Bootstrapper {
Noop(noop::Stage),
SplitBlock(split_block::Stage),
Json(json::Stage),
IntoJson(into_json::Stage),
LegacyV1(legacy_v1::Stage),
ParseCbor(parse_cbor::Stage),
Select(select::Stage),
Expand All @@ -36,7 +36,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(p) => &mut p.input,
Bootstrapper::SplitBlock(p) => &mut p.input,
Bootstrapper::Json(p) => &mut p.input,
Bootstrapper::IntoJson(p) => &mut p.input,
Bootstrapper::LegacyV1(p) => &mut p.input,
Bootstrapper::ParseCbor(p) => &mut p.input,
Bootstrapper::Select(p) => &mut p.input,
Expand All @@ -53,7 +53,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(p) => &mut p.output,
Bootstrapper::SplitBlock(p) => &mut p.output,
Bootstrapper::Json(p) => &mut p.output,
Bootstrapper::IntoJson(p) => &mut p.output,
Bootstrapper::LegacyV1(p) => &mut p.output,
Bootstrapper::ParseCbor(p) => &mut p.output,
Bootstrapper::Select(p) => &mut p.output,
Expand All @@ -70,7 +70,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::SplitBlock(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Json(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::IntoJson(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::LegacyV1(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::ParseCbor(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Select(x) => gasket::runtime::spawn_stage(x, policy),
Expand All @@ -89,7 +89,7 @@ impl Bootstrapper {
pub enum Config {
Noop(noop::Config),
SplitBlock(split_block::Config),
Json(json::Config),
IntoJson(into_json::Config),
LegacyV1(legacy_v1::Config),
ParseCbor(parse_cbor::Config),
Select(select::Config),
Expand All @@ -106,7 +106,7 @@ impl Config {
match self {
Config::Noop(c) => Ok(Bootstrapper::Noop(c.bootstrapper(ctx)?)),
Config::SplitBlock(c) => Ok(Bootstrapper::SplitBlock(c.bootstrapper(ctx)?)),
Config::Json(c) => Ok(Bootstrapper::Json(c.bootstrapper(ctx)?)),
Config::IntoJson(c) => Ok(Bootstrapper::IntoJson(c.bootstrapper(ctx)?)),
Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)),
Config::ParseCbor(c) => Ok(Bootstrapper::ParseCbor(c.bootstrapper(ctx)?)),
Config::Select(c) => Ok(Bootstrapper::Select(c.bootstrapper(ctx)?)),
Expand Down

0 comments on commit 50a7d34

Please sign in to comment.