Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use an unbounded channel to process transactions in indexer #69

Merged
merged 3 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct DbArgs {
pub connection_timeout: u64,
#[arg(long, env, default_value_t = 10)]
pub acquire_timeout: u64,
#[arg(long, env, default_value_t = 60)]
#[arg(long, env, default_value_t = 10)]
pub idle_timeout: u64,
#[arg(long, env)]
pub database_url: String,
Expand Down
2 changes: 0 additions & 2 deletions indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ solana-program = "1.14"
anchor-lang = "0.26.0"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.7.1+solana.1.16.1" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc", tag = "v1.7.1+solana.1.16.1" }
dashmap = "5.4.0"
spl-token = "=3.5.0"
solana-client = "1.14"
backoff = { version = "0.4.0", features = ["tokio"] }


[dependencies.hub-core]
package = "holaplex-hub-core"
version = "0.5.6"
Expand Down
76 changes: 69 additions & 7 deletions indexer/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,30 @@ use hub_core::{
backon::{ExponentialBuilder, Retryable},
prelude::*,
producer::Producer,
tokio,
tokio::{
self,
sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
},
task::{self, JoinSet},
},
};
use solana_client::rpc_client::RpcClient;
use yellowstone_grpc_client::GeyserGrpcClientError;
use yellowstone_grpc_proto::prelude::SubscribeRequest;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, SubscribeRequest, SubscribeUpdateTransaction,
};

use crate::{processor::Processor, Args, GeyserGrpcConnector};

#[derive(Clone)]
pub struct MessageHandler {
connector: GeyserGrpcConnector,
processor: Processor,
tx: UnboundedSender<SubscribeUpdateTransaction>,
rx: Arc<Mutex<UnboundedReceiver<SubscribeUpdateTransaction>>>,
parallelism: usize,
}

impl MessageHandler {
Expand All @@ -26,6 +38,7 @@ impl MessageHandler {
dragon_mouth_endpoint,
dragon_mouth_x_token,
solana_endpoint,
parallelism,
db,
} = args;

Expand All @@ -35,28 +48,45 @@ impl MessageHandler {

let rpc = Arc::new(RpcClient::new(solana_endpoint));
let connector = GeyserGrpcConnector::new(dragon_mouth_endpoint, dragon_mouth_x_token);

let (tx, rx) = mpsc::unbounded_channel();
let processor = Processor::new(db, rpc, producer);

Ok(Self {
connector,
processor,
tx,
rx: Arc::new(Mutex::new(rx)),
parallelism,
})
}

async fn connect(&self, request: SubscribeRequest) -> Result<()> {
(|| async {
let (mut subscribe_tx, mut stream) = self.connector.subscribe().await?;

let mut hashmap = std::collections::HashMap::new();
subscribe_tx
.send(request.clone())
.await
.map_err(GeyserGrpcClientError::SubscribeSendError)?;

while let Some(message) = stream.next().await {
self.processor.process(message).await?;
match message {
Ok(msg) => match msg.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
hashmap.entry(tx.slot).or_insert(Vec::new()).push(tx);
},
Some(UpdateOneof::Slot(slot)) => {
if let Some(transactions) = hashmap.remove(&slot.slot) {
for tx in transactions {
self.tx.send(tx)?;
}
}
},
_ => {},
},
Err(error) => bail!("stream error: {:?}", error),
};
}

Ok(())
})
.retry(
Expand All @@ -81,19 +111,51 @@ impl MessageHandler {
});

let mpl_bubblegum_stream = tokio::spawn({
let handler = self.clone();
async move {
self.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID))
handler
.connect(GeyserGrpcConnector::build_request(mpl_bubblegum::ID))
.await
}
});

let processor = self.processor;

let process_task = task::spawn(async move {
let mut set = JoinSet::new();

loop {
let processor = processor.clone();
let mut rx = self.rx.lock().await;

while set.len() >= self.parallelism {
match set.join_next().await {
Some(Err(e)) => {
return Result::<(), Error>::Err(anyhow!(
"failed to join task {:?}",
e
));
},
Some(Ok(_)) | None => (),
}
}

if let Some(tx) = rx.recv().await {
set.spawn(processor.process_transaction(tx));
}
}
});

tokio::select! {
Err(e) = spl_token_stream => {
bail!("spl token stream error: {:?}", e)
},
Err(e) = mpl_bubblegum_stream => {
bail!("mpl bumblegum stream error: {:?}", e)
}
Err(e) = process_task => {
bail!("Receiver err: {:?}", e)
}
}
}
}
3 changes: 3 additions & 0 deletions indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub struct Args {
#[arg(long, env)]
pub solana_endpoint: String,

#[arg(long, short = 'p', env, default_value_t = 8)]
pub parallelism: usize,

#[command(flatten)]
pub db: db::DbArgs,
}
45 changes: 4 additions & 41 deletions indexer/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{convert::TryInto, sync::Arc};

use anchor_lang::AnchorDeserialize;
use backoff::ExponentialBackoff;
use dashmap::DashMap;
use holaplex_hub_nfts_solana_core::{
db::Connection,
proto::{
Expand All @@ -13,25 +12,19 @@ use holaplex_hub_nfts_solana_core::{
CollectionMint, CompressionLeaf,
};
use holaplex_hub_nfts_solana_entity::compression_leafs;
use hub_core::{prelude::*, producer::Producer, tokio::task};
use hub_core::{prelude::*, producer::Producer};
use mpl_bubblegum::utils::get_asset_id;
use solana_client::rpc_client::RpcClient;
use solana_program::program_pack::Pack;
use solana_sdk::{pubkey::Pubkey, signature::Signature};
use spl_token::{instruction::TokenInstruction, state::Account};
use yellowstone_grpc_proto::{
prelude::{
subscribe_update::UpdateOneof, Message, SubscribeUpdate, SubscribeUpdateTransaction,
},
tonic::Status,
};
use yellowstone_grpc_proto::prelude::{Message, SubscribeUpdateTransaction};

#[derive(Clone)]
pub struct Processor {
db: Connection,
rpc: Arc<RpcClient>,
producer: Producer<SolanaNftEvents>,
dashmap: DashMap<u64, Vec<SubscribeUpdateTransaction>>,
}

impl Processor {
Expand All @@ -40,38 +33,7 @@ impl Processor {
rpc: Arc<RpcClient>,
producer: Producer<SolanaNftEvents>,
) -> Self {
Self {
db,
rpc,
producer,
dashmap: DashMap::new(),
}
}

pub(crate) async fn process(&self, message: Result<SubscribeUpdate, Status>) -> Result<()> {
match message {
Ok(msg) => match msg.update_oneof {
Some(UpdateOneof::Transaction(tx)) => {
self.dashmap.entry(tx.slot).or_insert(Vec::new()).push(tx);
},
Some(UpdateOneof::Slot(slot)) => {
if let Some((_, transactions)) = self.dashmap.remove(&slot.slot) {
let handles: Vec<_> = transactions
.into_iter()
.map(|tx| task::spawn(self.clone().process_transaction(tx)))
.collect();

for handle in handles {
handle.await??
}
}
},
_ => {},
},
Err(error) => bail!("stream error: {:?}", error),
};

Ok(())
Self { db, rpc, producer }
}

pub(crate) async fn process_transaction(self, tx: SubscribeUpdateTransaction) -> Result<()> {
Expand Down Expand Up @@ -140,6 +102,7 @@ impl Processor {
if compression_leaf.is_none() {
return Ok(());
}

let compression_leaf = compression_leaf.context("Compression leaf not found")?;

let collection_mint_id = compression_leaf.id;
Expand Down