diff --git a/Cargo.lock b/Cargo.lock index 1402bba95..159187f18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4307,6 +4307,7 @@ dependencies = [ "bincode", "chrono", "dashmap", + "itertools", "log", "native-tls", "postgres-native-tls", diff --git a/core/src/traits/block_storage_interface.rs b/core/src/traits/block_storage_interface.rs index 1db772e2a..5aacd1841 100644 --- a/core/src/traits/block_storage_interface.rs +++ b/core/src/traits/block_storage_interface.rs @@ -1,4 +1,5 @@ use crate::structures::produced_block::ProducedBlock; +use anyhow::Result; use async_trait::async_trait; use solana_rpc_client_api::config::RpcBlockConfig; use solana_sdk::slot_history::Slot; @@ -7,11 +8,12 @@ use std::{ops::Range, sync::Arc}; #[async_trait] pub trait BlockStorageInterface: Send + Sync { // will save a block - async fn save(&self, block: ProducedBlock); + async fn save(&self, block: ProducedBlock) -> Result<()>; // will get a block - async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Option; + async fn get(&self, slot: Slot, config: RpcBlockConfig) -> Result; // will get range of slots that are stored in the storage async fn get_slot_range(&self) -> Range; } pub type BlockStorageImpl = Arc; +pub const BLOCK_NOT_FOUND: &str = "Block not found"; diff --git a/history/Cargo.toml b/history/Cargo.toml index 4f3671e60..160dad054 100644 --- a/history/Cargo.toml +++ b/history/Cargo.toml @@ -25,4 +25,5 @@ log = {workspace = true} chrono = {workspace = true} bincode = {workspace = true} base64 = {workspace = true} +itertools = {workspace = true} tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } \ No newline at end of file diff --git a/history/src/block_stores/inmemory_block_store.rs b/history/src/block_stores/inmemory_block_store.rs index d97b28c0a..bfacd419a 100644 --- a/history/src/block_stores/inmemory_block_store.rs +++ b/history/src/block_stores/inmemory_block_store.rs @@ -1,7 +1,8 @@ use async_trait::async_trait; use solana_lite_rpc_core::{ - commitment_utils::Commitment, structures::produced_block::ProducedBlock, - traits::block_storage_interface::BlockStorageInterface, + commitment_utils::Commitment, + structures::produced_block::ProducedBlock, + traits::block_storage_interface::{BlockStorageInterface, BLOCK_NOT_FOUND}, }; use solana_rpc_client_api::config::RpcBlockConfig; use solana_sdk::slot_history::Slot; @@ -52,12 +53,18 @@ impl InmemoryBlockStore { #[async_trait] impl BlockStorageInterface for InmemoryBlockStore { - async fn save(&self, block: ProducedBlock) { + async fn save(&self, block: ProducedBlock) -> anyhow::Result<()> { self.store(block).await; + Ok(()) } - async fn get(&self, slot: Slot, _: RpcBlockConfig) -> Option { - self.block_storage.read().await.get(&slot).cloned() + async fn get(&self, slot: Slot, _: RpcBlockConfig) -> anyhow::Result { + self.block_storage + .read() + .await + .get(&slot) + .cloned() + .ok_or(anyhow::Error::msg(BLOCK_NOT_FOUND)) } async fn get_slot_range(&self) -> Range { diff --git a/history/src/block_stores/multiple_strategy_block_store.rs b/history/src/block_stores/multiple_strategy_block_store.rs index 2e80dc124..5f9b7fb99 100644 --- a/history/src/block_stores/multiple_strategy_block_store.rs +++ b/history/src/block_stores/multiple_strategy_block_store.rs @@ -4,11 +4,12 @@ // Fetches legacy blocks from faithful use crate::block_stores::inmemory_block_store::InmemoryBlockStore; +use anyhow::{bail, Result}; use async_trait::async_trait; use solana_lite_rpc_core::{ commitment_utils::Commitment, structures::produced_block::ProducedBlock, - traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface}, + traits::block_storage_interface::{BlockStorageImpl, BlockStorageInterface, BLOCK_NOT_FOUND}, }; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client_api::config::RpcBlockConfig; @@ -42,7 +43,7 @@ impl MultipleStrategyBlockStorage { } } - pub async fn get_in_memory_block(&self, slot: Slot) -> Option { + pub async fn get_in_memory_block(&self, slot: Slot) -> anyhow::Result { self.inmemory_for_storage .get( slot, @@ -60,46 +61,47 @@ impl MultipleStrategyBlockStorage { #[async_trait] impl BlockStorageInterface for MultipleStrategyBlockStorage { - async fn save(&self, block: ProducedBlock) { + async fn save(&self, block: ProducedBlock) -> Result<()> { let slot = block.slot; let commitment = Commitment::from(block.commitment_config); match commitment { Commitment::Confirmed | Commitment::Processed => { - self.inmemory_for_storage.save(block).await; + self.inmemory_for_storage.save(block).await?; } Commitment::Finalized => { let block_in_mem = self.get_in_memory_block(block.slot).await; match block_in_mem { - Some(block_in_mem) => { + Ok(block_in_mem) => { // check if inmemory blockhash is same as finalized, update it if they are not // we can have two machines with same identity publishing two different blocks on same slot if block_in_mem.blockhash != block.blockhash { - self.inmemory_for_storage.save(block.clone()).await; + self.inmemory_for_storage.save(block.clone()).await?; } } - None => self.inmemory_for_storage.save(block.clone()).await, + Err(_) => self.inmemory_for_storage.save(block.clone()).await?, } - self.persistent_block_storage.save(block).await; + self.persistent_block_storage.save(block).await?; } }; if slot > self.last_confirmed_slot.load(Ordering::Relaxed) { self.last_confirmed_slot.store(slot, Ordering::Relaxed); } + Ok(()) } async fn get( &self, slot: solana_sdk::slot_history::Slot, config: RpcBlockConfig, - ) -> Option { + ) -> Result { let last_confirmed_slot = self.last_confirmed_slot.load(Ordering::Relaxed); if slot > last_confirmed_slot { - None + bail!(BLOCK_NOT_FOUND); } else { let range = self.inmemory_for_storage.get_slot_range().await; if range.contains(&slot) { let block = self.inmemory_for_storage.get(slot, config).await; - if block.is_some() { + if block.is_ok() { return block; } } @@ -113,15 +115,15 @@ impl BlockStorageInterface for MultipleStrategyBlockStorage { .get_block_with_config(slot, config) .await { - Ok(block) => Some(ProducedBlock::from_ui_block( + Ok(block) => Ok(ProducedBlock::from_ui_block( block, slot, CommitmentConfig::finalized(), )), - Err(_) => None, + Err(_) => bail!(BLOCK_NOT_FOUND), } } else { - None + bail!(BLOCK_NOT_FOUND); } } } diff --git a/history/src/block_stores/postgres_block_store.rs b/history/src/block_stores/postgres_block_store.rs index 8b1378917..a45edc3bd 100644 --- a/history/src/block_stores/postgres_block_store.rs +++ b/history/src/block_stores/postgres_block_store.rs @@ -1 +1,101 @@ +use std::sync::Arc; +use anyhow::Result; +use async_trait::async_trait; +use itertools::Itertools; +use solana_lite_rpc_core::{ + structures::{epoch::EpochCache, produced_block::ProducedBlock}, + traits::block_storage_interface::BlockStorageInterface, +}; +use solana_rpc_client_api::config::RpcBlockConfig; +use solana_sdk::{slot_history::Slot, stake_history::Epoch}; +use tokio::sync::RwLock; + +use crate::postgres::{ + postgres_block::PostgresBlock, postgres_session::PostgresSessionCache, + postgres_transaction::PostgresTransaction, +}; + +#[derive(Default, Clone, Copy)] +pub struct PostgresData { + from_slot: Slot, + to_slot: Slot, + current_epoch: Epoch, +} + +pub struct PostgresBlockStore { + session_cache: PostgresSessionCache, + epoch_cache: EpochCache, + postgres_data: Arc>, +} + +impl PostgresBlockStore { + pub async fn start_new_epoch(&self, schema: &String) -> Result<()> { + // create schema for new epoch + let session = self + .session_cache + .get_session() + .await + .expect("should get new postgres session"); + + let statement = format!("CREATE SCHEMA {};", schema); + session.execute(&statement, &[]).await?; + + // Create blocks table + let statement = PostgresBlock::create_statement(schema); + session.execute(&statement, &[]).await?; + + // create transaction table + let statement = PostgresTransaction::create_statement(schema); + session.execute(&statement, &[]).await?; + Ok(()) + } +} + +#[async_trait] +impl BlockStorageInterface for PostgresBlockStore { + async fn save(&self, block: ProducedBlock) -> Result<()> { + let PostgresData { current_epoch, .. } = { *self.postgres_data.read().await }; + + let slot = block.slot; + let transactions = block + .transactions + .iter() + .map(|x| PostgresTransaction::new(x, slot)) + .collect_vec(); + let postgres_block = PostgresBlock::from(&block); + + let epoch = self.epoch_cache.get_epoch_at_slot(slot); + let schema = format!("EPOCH_{}", epoch.epoch); + if current_epoch == 0 || current_epoch < epoch.epoch { + self.postgres_data.write().await.current_epoch = epoch.epoch; + self.start_new_epoch(&schema).await?; + } + + const NUMBER_OF_TRANSACTION: usize = 20; + + // save transaction + let chunks = transactions.chunks(NUMBER_OF_TRANSACTION); + let session = self + .session_cache + .get_session() + .await + .expect("should get new postgres session"); + for chunk in chunks { + PostgresTransaction::save_transactions(&session, &schema, chunk).await?; + } + postgres_block.save(&session, &schema).await?; + Ok(()) + } + + async fn get(&self, slot: Slot, _config: RpcBlockConfig) -> Result { + let range = self.get_slot_range().await; + if range.contains(&slot) {} + todo!() + } + + async fn get_slot_range(&self) -> std::ops::Range { + let lk = self.postgres_data.read().await; + lk.from_slot..lk.to_slot + 1 + } +} diff --git a/history/src/postgres/postgres_block.rs b/history/src/postgres/postgres_block.rs index 659bc280b..74bb3c475 100644 --- a/history/src/postgres/postgres_block.rs +++ b/history/src/postgres/postgres_block.rs @@ -1,29 +1,23 @@ -use super::postgres_session::SchemaSize; -use solana_lite_rpc_core::{ - commitment_utils::Commitment, encoding::BASE64, structures::produced_block::ProducedBlock, -}; +use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::ProducedBlock}; +use tokio_postgres::types::ToSql; + +use super::postgres_session::PostgresSession; #[derive(Debug)] pub struct PostgresBlock { - pub leader_id: Option, + pub slot: i64, pub blockhash: String, pub block_height: i64, - pub slot: i64, pub parent_slot: i64, pub block_time: i64, - pub commitment_config: i8, pub previous_blockhash: String, pub rewards: Option, } -impl SchemaSize for PostgresBlock { - const DEFAULT_SIZE: usize = 4 * 8; - const MAX_SIZE: usize = Self::DEFAULT_SIZE + 8; -} +const NB_ARUMENTS: usize = 7; -impl From for PostgresBlock { - fn from(value: ProducedBlock) -> Self { - let commitment = Commitment::from(&value.commitment_config); +impl From<&ProducedBlock> for PostgresBlock { + fn from(value: &ProducedBlock) -> Self { let rewards = value .rewards .as_ref() @@ -31,15 +25,59 @@ impl From for PostgresBlock { .unwrap_or(None); Self { - leader_id: value.leader_id, - blockhash: value.blockhash, + blockhash: value.blockhash.clone(), block_height: value.block_height as i64, slot: value.slot as i64, parent_slot: value.parent_slot as i64, block_time: value.block_time as i64, - commitment_config: commitment as i8, - previous_blockhash: value.previous_blockhash, + previous_blockhash: value.previous_blockhash.clone(), rewards, } } } + +impl PostgresBlock { + pub fn create_statement(schema: &String) -> String { + format!( + " + CREATE TABLE {}.BLOCKS ( + slot BIGINT PRIMARY KEY, + blockhash STRING NOT NULL, + leader_id STRING, + block_height BIGINT NOT NULL, + parent_slot BIGINT NOT NULL, + block_time BIGINT NOT NULL, + previous_blockhash STRING NOT NULL, + rewards STRING, + ); + ", + schema + ) + } + + pub async fn save( + &self, + postgres_session: &PostgresSession, + schema: &String, + ) -> anyhow::Result<()> { + let mut query = format!( + r#" + INSERT INTO {}.BLOCKS (slot, blockhash, block_height, parent_slot, block_time, previous_blockhash, rewards) VALUES + "#, + schema + ); + + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NB_ARUMENTS); + args.push(&self.slot); + args.push(&self.blockhash); + args.push(&self.block_height); + args.push(&self.parent_slot); + args.push(&self.block_time); + args.push(&self.previous_blockhash); + args.push(&self.rewards); + + PostgresSession::multiline_query(&mut query, NB_ARUMENTS, 1, &[]); + postgres_session.execute(&query, &args).await?; + Ok(()) + } +} diff --git a/history/src/postgres/postgres_transaction.rs b/history/src/postgres/postgres_transaction.rs index 935760ff5..b559c4a5c 100644 --- a/history/src/postgres/postgres_transaction.rs +++ b/history/src/postgres/postgres_transaction.rs @@ -1,10 +1,13 @@ use solana_lite_rpc_core::{encoding::BASE64, structures::produced_block::TransactionInfo}; +use solana_sdk::slot_history::Slot; +use tokio_postgres::types::ToSql; -use super::postgres_session::SchemaSize; +use super::postgres_session::PostgresSession; #[derive(Debug)] pub struct PostgresTransaction { pub signature: String, + pub slot: i64, pub err: Option, pub cu_requested: Option, pub prioritization_fees: Option, @@ -13,13 +16,10 @@ pub struct PostgresTransaction { pub message: String, } -impl SchemaSize for PostgresTransaction { - const DEFAULT_SIZE: usize = 88 + (3 * 8) + 2; - const MAX_SIZE: usize = Self::DEFAULT_SIZE + (3 * 8); -} +const NB_ARUMENTS: usize = 8; -impl From<&TransactionInfo> for PostgresTransaction { - fn from(value: &TransactionInfo) -> Self { +impl PostgresTransaction { + pub fn new(value: &TransactionInfo, slot: Slot) -> Self { Self { signature: value.signature.clone(), err: value @@ -32,6 +32,81 @@ impl From<&TransactionInfo> for PostgresTransaction { cu_consumed: value.cu_consumed.map(|x| x as i64), recent_blockhash: value.recent_blockhash.clone(), message: value.message.clone(), + slot: slot as i64, + } + } + + pub fn create_statement(schema: &String) -> String { + format!( + "\ + CREATE TABLE {}.TRANSACTIONS ( + signature CHAR(88) NOT NULL, + slot BIGINT, + err STRING, + cu_requested BIGINT, + prioritization_fees BIGINT, + cu_consumed BIGINT, + recent_blockhash STRING NOT NULL, + message STRING NOT NULL, + PRIMARY KEY (signature) + CONSTRAINT fk_transactions FOREIGN KEY (slot) REFERENCES {}.BLOCKS(slot); + ); + ", + schema, schema + ) + } + + pub async fn save_transactions( + postgres_session: &PostgresSession, + schema: &String, + transactions: &[Self], + ) -> anyhow::Result<()> { + let mut args: Vec<&(dyn ToSql + Sync)> = + Vec::with_capacity(NB_ARUMENTS * transactions.len()); + + for tx in transactions.iter() { + let PostgresTransaction { + signature, + slot, + err, + cu_requested, + prioritization_fees, + cu_consumed, + recent_blockhash, + message, + } = tx; + + args.push(signature); + args.push(slot); + args.push(err); + args.push(cu_requested); + args.push(prioritization_fees); + args.push(cu_consumed); + args.push(recent_blockhash); + args.push(message); } + + let mut query = format!( + r#" + INSERT INTO {}.TRANSACTIONS + (signature, slot, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message) + VALUES + "#, + schema + ); + + PostgresSession::multiline_query(&mut query, NB_ARUMENTS, transactions.len(), &[]); + postgres_session.execute(&query, &args).await?; + Ok(()) + } + + pub async fn get( + postgres_session: PostgresSession, + schema: &String, + slot: Slot, + ) -> Vec { + let statement = format!("SELECT signature, err, cu_requested, prioritization_fees, cu_consumed, recent_blockhash, message FROM {}.TRANSACTIONS WHERE SLOT = {}", schema, slot); + let _ = postgres_session.client.query(&statement, &[]).await; + todo!() } } diff --git a/history/tests/inmemory_block_store_tests.rs b/history/tests/inmemory_block_store_tests.rs index 543d39a83..8e2dbeb56 100644 --- a/history/tests/inmemory_block_store_tests.rs +++ b/history/tests/inmemory_block_store_tests.rs @@ -31,26 +31,33 @@ async fn inmemory_block_store_tests() { for i in 1..11 { store .save(create_test_block(i, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); } // check if 10 blocks are added for i in 1..11 { - assert!(store.get(i, RpcBlockConfig::default()).await.is_some()); + assert!(store.get(i, RpcBlockConfig::default()).await.ok().is_some()); } // add 11th block store .save(create_test_block(11, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); // can get 11th block - assert!(store.get(11, RpcBlockConfig::default()).await.is_some()); + assert!(store + .get(11, RpcBlockConfig::default()) + .await + .ok() + .is_some()); // first block is removed - assert!(store.get(1, RpcBlockConfig::default()).await.is_none()); + assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none()); // cannot add old blocks store .save(create_test_block(1, CommitmentConfig::finalized())) - .await; - assert!(store.get(1, RpcBlockConfig::default()).await.is_none()); + .await + .unwrap(); + assert!(store.get(1, RpcBlockConfig::default()).await.ok().is_none()); } diff --git a/history/tests/multiple_strategy_block_store_tests.rs b/history/tests/multiple_strategy_block_store_tests.rs index 6a9085013..3e0304ebc 100644 --- a/history/tests/multiple_strategy_block_store_tests.rs +++ b/history/tests/multiple_strategy_block_store_tests.rs @@ -37,63 +37,78 @@ async fn test_in_multiple_stategy_block_store() { block_storage .save(create_test_block(1235, CommitmentConfig::confirmed())) - .await; + .await + .unwrap(); block_storage .save(create_test_block(1236, CommitmentConfig::confirmed())) - .await; + .await + .unwrap(); assert!(block_storage .get(1235, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(block_storage .get(1236, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(persistent_store .get(1235, RpcBlockConfig::default()) .await + .ok() .is_none()); assert!(persistent_store .get(1236, RpcBlockConfig::default()) .await + .ok() .is_none()); block_storage .save(create_test_block(1235, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); block_storage .save(create_test_block(1236, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); block_storage .save(create_test_block(1237, CommitmentConfig::finalized())) - .await; + .await + .unwrap(); assert!(block_storage .get(1235, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(block_storage .get(1236, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(block_storage .get(1237, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(persistent_store .get(1235, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(persistent_store .get(1236, RpcBlockConfig::default()) .await + .ok() .is_some()); assert!(persistent_store .get(1237, RpcBlockConfig::default()) .await + .ok() .is_some()); - assert!(block_storage.get_in_memory_block(1237).await.is_some()); + assert!(block_storage.get_in_memory_block(1237).await.ok().is_some()); // blocks are replaced by finalized blocks assert_eq!( @@ -137,5 +152,6 @@ async fn test_in_multiple_stategy_block_store() { assert!(block_storage .get(1238, RpcBlockConfig::default()) .await + .ok() .is_none()); } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index a41d2cb06..6497d4ff7 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -329,7 +329,7 @@ impl LiteRpcServer for LiteBridge { ) -> crate::rpc::Result> { let config = config.map_or(RpcBlockConfig::default(), |x| x.convert_to_current()); let block = self.history.block_storage.get(slot, config).await; - if block.is_some() { + if block.is_ok() { // TO DO Convert to UIConfirmed Block Err(jsonrpsee::core::Error::HttpNotImplemented) } else { diff --git a/lite-rpc/src/postgres_logger.rs b/lite-rpc/src/postgres_logger.rs index 986d1f73e..ae0bd7332 100644 --- a/lite-rpc/src/postgres_logger.rs +++ b/lite-rpc/src/postgres_logger.rs @@ -5,7 +5,7 @@ use log::{info, warn}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use solana_lite_rpc_core::{ structures::notifications::{ - BlockNotification, NotificationMsg, NotificationReciever, TransactionNotification, + NotificationMsg, NotificationReciever, TransactionNotification, TransactionUpdateNotification, }, AnyhowJoinHandle, @@ -86,32 +86,6 @@ impl From<&TransactionUpdateNotification> for PostgresTxUpdate { } } -#[derive(Debug)] -pub struct PostgresBlock { - pub slot: i64, // 8 bytes - pub leader_id: i64, // 8 bytes - pub parent_slot: i64, // 8 bytes - pub cluster_time: DateTime, // 8 bytes - pub local_time: Option>, -} - -impl SchemaSize for PostgresBlock { - const DEFAULT_SIZE: usize = 4 * 8; - const MAX_SIZE: usize = Self::DEFAULT_SIZE + 8; -} - -impl From for PostgresBlock { - fn from(value: BlockNotification) -> Self { - Self { - slot: value.slot as i64, - leader_id: 0, // TODO - cluster_time: value.cluster_time, - local_time: value.local_time, - parent_slot: value.parent_slot as i64, - } - } -} - #[derive(Debug)] pub struct AccountAddr { pub id: u32, @@ -180,59 +154,6 @@ async fn send_txs(postgres_session: &PostgresSession, txs: &[PostgresTx]) -> any Ok(()) } -async fn send_blocks( - postgres_session: &PostgresSession, - blocks: &[PostgresBlock], -) -> anyhow::Result<()> { - const NUMBER_OF_ARGS: usize = 5; - - if blocks.is_empty() { - return Ok(()); - } - - let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * blocks.len()); - - for block in blocks.iter() { - let PostgresBlock { - slot, - leader_id, - parent_slot, - cluster_time, - local_time, - } = block; - - args.push(slot); - args.push(leader_id); - args.push(parent_slot); - args.push(cluster_time); - args.push(local_time); - } - - let mut query = String::from( - r#" - INSERT INTO lite_rpc.Blocks - (slot, leader_id, parent_slot, cluster_time, local_time) - VALUES - "#, - ); - - PostgresSession::multiline_query(&mut query, NUMBER_OF_ARGS, blocks.len(), &[]); - - query.push_str( - r#" - ON CONFLICT (slot) DO UPDATE SET - leader_id = EXCLUDED.leader_id, - parent_slot = EXCLUDED.parent_slot, - cluster_time = EXCLUDED.cluster_time, - local_time = EXCLUDED.local_time - "#, - ); - - postgres_session.execute(&query, &args).await?; - - Ok(()) -} - async fn update_txs( postgres_session: &PostgresSession, txs: &[PostgresTxUpdate], @@ -302,11 +223,9 @@ impl PostgresLogger { info!("start postgres worker"); const TX_MAX_CAPACITY: usize = get_max_safe_inserts::(); - const BLOCK_MAX_CAPACITY: usize = get_max_safe_inserts::(); const UPDATE_MAX_CAPACITY: usize = get_max_safe_updates::(); let mut tx_batch: Vec = Vec::with_capacity(TX_MAX_CAPACITY); - let mut block_batch: Vec = Vec::with_capacity(BLOCK_MAX_CAPACITY); let mut update_batch = Vec::::with_capacity(UPDATE_MAX_CAPACITY); let mut session_establish_error = false; @@ -320,7 +239,6 @@ impl PostgresLogger { // check for capacity if tx_batch.len() >= TX_MAX_CAPACITY - || block_batch.len() >= BLOCK_MAX_CAPACITY || update_batch.len() >= UPDATE_MAX_CAPACITY { break; @@ -335,8 +253,9 @@ impl PostgresLogger { let mut tx = tx.iter().map(|x| x.into()).collect::>(); tx_batch.append(&mut tx) } - NotificationMsg::BlockNotificationMsg(block) => { - block_batch.push(block.into()) + NotificationMsg::BlockNotificationMsg(_) => { + // ignore block storage as it has been moved to persistant history. + continue; } NotificationMsg::UpdateTransactionMsg(update) => { let mut update = update.iter().map(|x| x.into()).collect(); @@ -355,7 +274,7 @@ impl PostgresLogger { } // if there's nothing to do, yield for a brief time - if tx_batch.is_empty() && block_batch.is_empty() && update_batch.is_empty() { + if tx_batch.is_empty() && update_batch.is_empty() { tokio::time::sleep(Duration::from_millis(10)).await; continue; } @@ -377,9 +296,8 @@ impl PostgresLogger { POSTGRES_SESSION_ERRORS.set(0); // write to database when a successful connection is made - let (res_txs, res_blocks, res_update) = join!( + let (res_txs, res_update) = join!( send_txs(&session, &tx_batch), - send_blocks(&session, &block_batch), update_txs(&session, &update_batch) ); @@ -392,14 +310,6 @@ impl PostgresLogger { } else { tx_batch.clear(); } - if let Err(err) = res_blocks { - warn!( - "Error sending block batch ({:?}) to postgres {err:?}", - block_batch.len() - ); - } else { - block_batch.clear(); - } if let Err(err) = res_update { warn!( "Error sending update batch ({:?}) to postgres {err:?}",