Skip to content

Commit

Permalink
fix: db locked error
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-eiger committed Apr 24, 2024
1 parent d84349b commit fdd73d5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 35 deletions.
11 changes: 5 additions & 6 deletions subgraph-radio/benches/attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ mod attestation {
.expect("Failed to connect to the in-memory database"),
);

black_box(
sqlx::migrate!("../migrations")
.run(&pool)
.await
.expect("Could not run migration"),
);
sqlx::migrate!("../migrations")
.run(&pool)
.await
.expect("Could not run migration");
black_box(());

let attestations = vec![
black_box(Attestation::new(
Expand Down
75 changes: 46 additions & 29 deletions subgraph-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::env;
use std::path::Path;

use std::str::FromStr;
use std::sync::{atomic::Ordering, mpsc::Receiver, Arc};
use std::time::Duration;

Expand All @@ -15,6 +15,7 @@ use graphcast_sdk::{
WakuMessage,
};

use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::SqlitePool;
use tokio::time::{interval, timeout};
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -58,47 +59,63 @@ pub struct RadioOperator {
pub db: SqlitePool,
}

impl RadioOperator {
/// Create a radio operator with radio configurations, persisted data,
/// graphcast agent, and control flow
pub async fn new(config: &Config, agent: GraphcastAgent) -> RadioOperator {
debug!("Connecting to database");
async fn setup_database_connection(db_url: &str) -> Result<SqlitePool, sqlx::Error> {
let connect_options = SqliteConnectOptions::from_str(db_url)?
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(std::time::Duration::from_secs(5));

let db = match &config.radio_setup().sqlite_file_path {
Some(path) => {
let cwd = env::current_dir().unwrap();
let absolute_path = cwd.join(path);
let pool = SqlitePool::connect_with(connect_options).await?;

if !Path::new(&absolute_path).exists() {
std::fs::File::create(&absolute_path)
.expect("Failed to create the database file");
debug!("Database file created at {}", absolute_path.display());
}
Ok(pool)
}

let db_url = format!("sqlite://{}", absolute_path.display());
async fn connect_to_database(config: &Config) -> Result<SqlitePool, Box<dyn std::error::Error>> {
debug!("Connecting to database");

SqlitePool::connect(&db_url)
.await
.expect("Could not connect to the SQLite database")
let db_url = match &config.radio_setup().sqlite_file_path {
Some(path) => {
let cwd = std::env::current_dir().unwrap();
let absolute_path = cwd.join(path);

if !std::path::Path::new(&absolute_path).exists() {
std::fs::File::create(&absolute_path).expect("Failed to create the database file");
debug!("Database file created at {}", absolute_path.display());
}
None => SqlitePool::connect("sqlite::memory:")
.await
.expect("Failed to connect to the in-memory database"),
};

debug!("Check for database migration");
sqlx::migrate!("../migrations")
.run(&db)
format!("sqlite://{}", absolute_path.display())
}
None => String::from("sqlite::memory:"),
};

let db_pool = setup_database_connection(&db_url).await?;

debug!("Check for database migration");
sqlx::migrate!("../migrations")
.run(&db_pool)
.await
.expect("Could not run migration");

Ok(db_pool)
}

impl RadioOperator {
/// Create a radio operator with radio configurations, persisted data,
/// graphcast agent, and control flow
pub async fn new(config: &Config, agent: GraphcastAgent) -> RadioOperator {
debug!("Connecting to database");

let db = connect_to_database(config)
.await
.expect("Could not run migration");
.expect("Failed to connect to database");

debug!("Initializing Graphcast Agent");
let graphcast_agent = Arc::new(agent);

debug!("Set global static instance of graphcast_agent");
_ = GRAPHCAST_AGENT.set(graphcast_agent.clone());

config.validate_indexer_address().await;
//config.validate_indexer_address().await;

//TODO: Refactor indexer management server validation to SDK, similar to graph node status endpoint
if let Some(url) = &config.graph_stack.indexer_management_server_endpoint {
Expand Down

0 comments on commit fdd73d5

Please sign in to comment.