Skip to content

Commit

Permalink
Make pindexer more easily embeddable as a library (#4712)
Browse files Browse the repository at this point in the history
## Describe your changes

We don't expose cometindex as a library, but we do pindexer, and this
adds a few tweaks to make pindexer possible to run inside an external
piece of Rust code. This is useful, because it allows people to colocate
their indexer and their block explorer, for example.

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > Doesn't touch the core protocol.
  • Loading branch information
cronokirby authored Jul 23, 2024
1 parent 2ff25a6 commit a2ffd8a
Show file tree
Hide file tree
Showing 16 changed files with 60 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 12 additions & 11 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = {workspace = true}
clap = {workspace = true}
cometindex = {workspace = true}
penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-stake = {workspace = true, default-features = false}
penumbra-app = {workspace = true, default-features = false}
penumbra-num = {workspace = true, default-features = false}
penumbra-asset = {workspace = true, default-features = false}
penumbra-proto = {workspace = true, default-features = false}
tokio = {workspace = true, features = ["full"]}
serde_json = {workspace = true}
sqlx = { workspace = true, features = ["chrono"] }
cometindex = { workspace = true }
penumbra-shielded-pool = { workspace = true, default-features = false }
penumbra-stake = { workspace = true, default-features = false }
penumbra-app = { workspace = true, default-features = false }
penumbra-num = { workspace = true, default-features = false }
penumbra-asset = { workspace = true, default-features = false }
penumbra-proto = { workspace = true, default-features = false }
tokio = { workspace = true, features = ["full"] }
anyhow = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
tracing = {workspace = true}
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent};
use sqlx::types::chrono::DateTime;
use sqlx::{types::chrono::DateTime, PgPool};

#[derive(Debug)]
pub struct Block {}
Expand Down Expand Up @@ -36,6 +36,7 @@ CREATE TABLE IF NOT EXISTS block_details (
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
let pe = pb::EventBlockRoot::from_event(event.as_ref())?;
let timestamp = pe.timestamp.expect("Block has no timestamp");
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use cometindex::{AppView, Indexer};
pub use cometindex::{opt::Options, AppView, ContextualizedEvent, Indexer, PgPool, PgTransaction};

mod indexer_ext;
pub use indexer_ext::IndexerExt;
Expand Down
5 changes: 3 additions & 2 deletions crates/bin/pindexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use anyhow::Result;
use clap::Parser as _;
use pindexer::block::Block;
use pindexer::{Indexer, IndexerExt as _};
use pindexer::{Indexer, IndexerExt as _, Options};

#[tokio::main]
async fn main() -> Result<()> {
Indexer::new()
Indexer::new(Options::parse())
.with_default_tracing()
.with_default_penumbra_app_views()
.with_index(Block {})
Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/shielded_pool/fmd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};
use penumbra_proto::{core::component::shielded_pool::v1 as pb, event::ProtoEvent};

#[derive(Debug)]
Expand Down Expand Up @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS shielded_pool_fmd_clue_set (
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
let pe = pb::EventBroadcastClue::from_event(event.as_ref())?;

Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/delegation_txs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};
use penumbra_num::Amount;
use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};

Expand Down Expand Up @@ -54,6 +54,7 @@ impl AppView for DelegationTxs {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<()> {
let pe = pb::EventDelegate::from_event(event.as_ref())?;

Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/missed_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};

use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};

Expand Down Expand Up @@ -52,6 +52,7 @@ impl AppView for MissedBlocks {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
let pe = pb::EventValidatorMissedBlock::from_event(event.as_ref())?;
let ik_bytes = pe
Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/slashings.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};

use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};
use penumbra_stake::IdentityKey;
Expand Down Expand Up @@ -52,6 +52,7 @@ impl AppView for Slashings {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
let pe = pb::EventSlashingPenaltyApplied::from_event(event.as_ref())?;
let ik = IdentityKey::try_from(
Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/undelegation_txs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};
use penumbra_num::Amount;
use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent};

Expand Down Expand Up @@ -54,6 +54,7 @@ impl AppView for UndelegationTxs {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<()> {
let pe = pb::EventUndelegate::from_event(event.as_ref())?;

Expand Down
3 changes: 2 additions & 1 deletion crates/bin/pindexer/src/stake/validator_set.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use anyhow::{anyhow, Context, Result};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction};

use penumbra_app::genesis::AppState;
use penumbra_asset::asset;
Expand Down Expand Up @@ -68,6 +68,7 @@ impl AppView for ValidatorSet {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
match event.event.kind.as_str() {
"penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => {
Expand Down
7 changes: 5 additions & 2 deletions crates/util/cometindex/examples/fmd_clues.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use anyhow::Result;
use cometindex::{async_trait, AppView, ContextualizedEvent, Indexer, PgTransaction};
use clap::Parser;
use cometindex::{async_trait, opt::Options, AppView, ContextualizedEvent, Indexer, PgTransaction};
use sqlx::PgPool;

// This example is silly because it doesn't do any "compilation" of the raw
// events, so it's only useful as an example of exercising the harness and the
Expand Down Expand Up @@ -39,6 +41,7 @@ CREATE TABLE IF NOT EXISTS fmd_clues_example (
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
// this is just an example in the integration tests, so we don't want to do any
// - queries against existing table state
Expand Down Expand Up @@ -72,7 +75,7 @@ CREATE TABLE IF NOT EXISTS fmd_clues_example (

#[tokio::main]
async fn main() -> Result<()> {
Indexer::new()
Indexer::new(Options::parse())
.with_default_tracing()
// add as many indexers as you want
.with_index(FmdCluesExample {})
Expand Down
2 changes: 2 additions & 0 deletions crates/util/cometindex/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
pub use sqlx::PgPool;
use sqlx::{Postgres, Transaction};

use crate::ContextualizedEvent;
Expand All @@ -20,5 +21,6 @@ pub trait AppView: std::fmt::Debug {
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
src_db: &PgPool,
) -> Result<(), anyhow::Error>;
}
26 changes: 20 additions & 6 deletions crates/util/cometindex/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::pin::Pin;

use anyhow::{Context as _, Result};
use clap::Parser;
use futures::{Stream, StreamExt, TryStreamExt};
use sqlx::PgPool;
use sqlx::{postgres::PgPoolOptions, PgPool};
use tap::{Tap, TapFallible, TapOptional};
use tendermint::abci;
use tracing::{debug, info};
Expand All @@ -16,9 +15,9 @@ pub struct Indexer {
}

impl Indexer {
pub fn new() -> Self {
pub fn new(opts: Options) -> Self {
Self {
opts: Options::parse(),
opts,
indexes: Vec::new(),
}
}
Expand Down Expand Up @@ -60,7 +59,22 @@ impl Indexer {
indexes,
} = self;

let src_db = PgPool::connect(&src_database_url).await?;
// Create a source db, with, for sanity, some read only settings.
// These will be overrideable by a consumer who knows what they're doing,
// but prevents basic mistakes.
// c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811
let src_db = PgPoolOptions::new()
.after_connect(|conn, _| {
Box::pin(async move {
sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
.execute(conn)
.await?;
Ok(())
})
})
.connect(&src_database_url)
.await?;

let dst_db = PgPool::connect(&dst_database_url).await?;

// Check if the destination db is initialized
Expand Down Expand Up @@ -169,7 +183,7 @@ impl Indexer {
for index in indexes {
if index.is_relevant(&event.as_ref().kind) {
tracing::debug!(?event, ?index, "relevant to index");
index.index_event(&mut dbtx, &event).await?;
index.index_event(&mut dbtx, &event, &src_db).await?;
}
}
// Mark that we got to at least this event
Expand Down
2 changes: 1 addition & 1 deletion crates/util/cometindex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod indexer;
pub mod opt;

pub use contextualized::ContextualizedEvent;
pub use index::{AppView, PgTransaction};
pub use index::{AppView, PgPool, PgTransaction};
pub use indexer::Indexer;

pub use async_trait::async_trait;
Expand Down
2 changes: 1 addition & 1 deletion crates/util/cometindex/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{Error, Result};
use clap::Parser;

/// This struct represents the command-line options
#[derive(Debug, Parser)]
#[derive(Clone, Debug, Parser)]
#[clap(
name = "cometindex",
about = "processes raw events emitted by cometbft applications",
Expand Down

0 comments on commit a2ffd8a

Please sign in to comment.