Skip to content

Commit

Permalink
feat: introduce sql db sink (#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Jan 27, 2024
1 parent d1b4f33 commit ab9afa4
Show file tree
Hide file tree
Showing 8 changed files with 580 additions and 8 deletions.
407 changes: 399 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sink-gcp-cloudfunction = ["reqwest", "jsonwebtoken"]
sink-redis = ["r2d2_redis"]
sink-elasticsearch = ["elasticsearch"]
source-utxorpc = ["tonic", "futures"]
sql = ["sqlx"]

[dependencies]
pallas = "0.21.0"
Expand Down Expand Up @@ -73,10 +74,13 @@ file-rotate = { version = "0.7.5", optional = true }
tonic = { version = "0.9.2", features = ["tls", "tls-roots"], optional = true }
futures = { version = "0.3.28", optional = true }

# sql
sqlx = { version = "0.7", features = ["runtime-tokio", "tls-native-tls", "any", "sqlite"], optional = true }

# aws
aws-config = { version = "^1.1", optional = true }
aws-types = { version = "^1.1", optional = true }
aws-sdk-s3 = { version = "^1.1", optional = true }
aws-sdk-sqs = { version = "^1.1", optional = true }
aws-sdk-lambda = { version = "^1.1", optional = true }
handlebars = "5.1.1"
1 change: 1 addition & 0 deletions examples/sql_db/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.db
4 changes: 4 additions & 0 deletions examples/sql_db/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
create_db:
sqlite3 mydatabase.db < init.sql

.PHONY: create_db
17 changes: 17 additions & 0 deletions examples/sql_db/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]

[intersect]
type = "Point"
value = [4493860, "ce7f821d2140419fea1a7900cf71b0c0a0e94afbb1f814a6717cff071c3b6afc"]

[[filters]]
type = "SplitBlock"

[sink]
type = "SqlDb"
connection = "sqlite:./mydatabase.db"
apply_template = "INSERT INTO txs (slot, cbor) VALUES ('{{point.slot}}', X'{{record.hex}}');"
undo_template = "DELETE FROM txs WHERE slot = {{point.slot}}"
reset_template = "DELETE FROM txs WHERE slot > {{point.slot}}"
6 changes: 6 additions & 0 deletions examples/sql_db/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE txs (
slot INTEGER NOT NULL,
cbor BLOB
);

CREATE INDEX idx_txs_slot ON txs(slot);
21 changes: 21 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ mod redis;
#[cfg(feature = "sink-elasticsearch")]
mod elasticsearch;

#[cfg(feature = "sql")]
mod sql_db;

pub enum Bootstrapper {
Terminal(terminal::Stage),
Stdout(stdout::Stage),
Expand Down Expand Up @@ -80,6 +83,9 @@ pub enum Bootstrapper {

#[cfg(feature = "sink-elasticsearch")]
ElasticSearch(elasticsearch::Stage),

#[cfg(feature = "sql")]
SqlDb(sql_db::Stage),
}

impl Bootstrapper {
Expand Down Expand Up @@ -122,6 +128,9 @@ impl Bootstrapper {

#[cfg(feature = "sink-elasticsearch")]
Bootstrapper::ElasticSearch(p) => &mut p.input,

#[cfg(feature = "sql")]
Bootstrapper::SqlDb(p) => &mut p.input,
}
}

Expand Down Expand Up @@ -164,6 +173,9 @@ impl Bootstrapper {

#[cfg(feature = "sink-elasticsearch")]
Bootstrapper::ElasticSearch(p) => &mut p.cursor,

#[cfg(feature = "sql")]
Bootstrapper::SqlDb(p) => &mut p.cursor,
}
}

Expand Down Expand Up @@ -206,6 +218,9 @@ impl Bootstrapper {

#[cfg(feature = "sink-elasticsearch")]
Bootstrapper::ElasticSearch(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "sql")]
Bootstrapper::SqlDb(x) => gasket::runtime::spawn_stage(x, policy),
}
}
}
Expand Down Expand Up @@ -250,6 +265,9 @@ pub enum Config {

#[cfg(feature = "sink-elasticsearch")]
ElasticSearch(elasticsearch::Config),

#[cfg(feature = "sql")]
SqlDb(sql_db::Config),
}

impl Config {
Expand Down Expand Up @@ -292,6 +310,9 @@ impl Config {

#[cfg(feature = "sink-elasticsearch")]
Config::ElasticSearch(c) => Ok(Bootstrapper::ElasticSearch(c.bootstrapper(ctx)?)),

#[cfg(feature = "sql")]
Config::SqlDb(c) => Ok(Bootstrapper::SqlDb(c.bootstrapper(ctx)?)),
}
}
}
128 changes: 128 additions & 0 deletions src/sinks/sql_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use gasket::framework::*;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use tracing::debug;

use crate::framework::*;

pub struct Worker {
db: sqlx::Pool<sqlx::Any>,
}

fn hbs_data(point: Point, record: Option<Record>) -> serde_json::Value {
serde_json::json!({
"point": match point {
Point::Origin => serde_json::Value::Null,
Point::Specific(slot, hash) => serde_json::json!({
"slot": slot,
"hash": hex::encode(hash),
}),
},
"record": serde_json::Value::from(record),
})
}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
let db = sqlx::AnyPool::connect(&stage.config.connection)
.await
.or_retry()?;

Ok(Self { db })
}

async fn schedule(
&mut self,
stage: &mut Stage,
) -> Result<WorkSchedule<ChainEvent>, WorkerError> {
let msg = stage.input.recv().await.or_panic()?;
Ok(WorkSchedule::Unit(msg.payload))
}

async fn execute(&mut self, unit: &ChainEvent, stage: &mut Stage) -> Result<(), WorkerError> {
let point = unit.point().clone();

let template = match unit {
ChainEvent::Apply(p, r) => {
let data = hbs_data(p.clone(), Some(r.clone()));
stage.templates.render("apply", &data)
}
ChainEvent::Undo(p, r) => {
let data = hbs_data(p.clone(), Some(r.clone()));
stage.templates.render("undo", &data)
}
ChainEvent::Reset(p) => {
let data = hbs_data(p.clone(), None);
stage.templates.render("reset", &data)
}
};

let statement = template.or_panic()?;

let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?;
debug!(rows = result.rows_affected(), "sql statement executed");

stage.ops_count.inc(1);
stage.latest_block.set(point.slot_or_default() as i64);
stage.cursor.send(point.clone().into()).await.or_panic()?;

Ok(())
}
}

#[derive(Stage)]
#[stage(name = "sql", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
config: Config,
templates: handlebars::Handlebars<'static>,

pub input: MapperInputPort,
pub cursor: SinkCursorPort,

#[metric]
ops_count: gasket::metrics::Counter,

#[metric]
latest_block: gasket::metrics::Gauge,
}

#[derive(Default, Debug, Deserialize)]
pub struct Config {
/// eg: sqlite::memory:
pub connection: String,
pub apply_template: String,
pub undo_template: String,
pub reset_template: String,
}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
sqlx::any::install_default_drivers();

let mut templates = handlebars::Handlebars::new();

templates
.register_template_string("apply", &self.apply_template)
.map_err(Error::config)?;

templates
.register_template_string("undo", &self.undo_template)
.map_err(Error::config)?;

templates
.register_template_string("reset", &self.reset_template)
.map_err(Error::config)?;

let stage = Stage {
config: self,
templates,
ops_count: Default::default(),
latest_block: Default::default(),
input: Default::default(),
cursor: Default::default(),
};

Ok(stage)
}
}

0 comments on commit ab9afa4

Please sign in to comment.