Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: db locked error #143

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions subgraph-radio/benches/attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ mod attestation {
.expect("Failed to connect to the in-memory database"),
);

black_box(
sqlx::migrate!("../migrations")
.run(&pool)
.await
.expect("Could not run migration"),
);
sqlx::migrate!("../migrations")
.run(&pool)
.await
.expect("Could not run migration");
black_box(());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you explain what motivated this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was clippy's doing (cargo clippy --fix)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why that is. probably remove black_box(()); because it is useless if nothing is contained inside


let attestations = vec![
black_box(Attestation::new(
Expand Down
73 changes: 45 additions & 28 deletions subgraph-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::env;
use std::path::Path;

use std::str::FromStr;
use std::sync::{atomic::Ordering, mpsc::Receiver, Arc};
use std::time::Duration;

Expand All @@ -15,6 +15,7 @@ use graphcast_sdk::{
WakuMessage,
};

use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode};
use sqlx::SqlitePool;
use tokio::time::{interval, timeout};
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -58,39 +59,55 @@ pub struct RadioOperator {
pub db: SqlitePool,
}

impl RadioOperator {
/// Create a radio operator with radio configurations, persisted data,
/// graphcast agent, and control flow
pub async fn new(config: &Config, agent: GraphcastAgent) -> RadioOperator {
debug!("Connecting to database");
async fn setup_database_connection(db_url: &str) -> Result<SqlitePool, sqlx::Error> {
let connect_options = SqliteConnectOptions::from_str(db_url)?
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(std::time::Duration::from_secs(5));

let db = match &config.radio_setup().sqlite_file_path {
Some(path) => {
let cwd = env::current_dir().unwrap();
let absolute_path = cwd.join(path);
let pool = SqlitePool::connect_with(connect_options).await?;

if !Path::new(&absolute_path).exists() {
std::fs::File::create(&absolute_path)
.expect("Failed to create the database file");
debug!("Database file created at {}", absolute_path.display());
}
Ok(pool)
}

let db_url = format!("sqlite://{}", absolute_path.display());
async fn connect_to_database(config: &Config) -> Result<SqlitePool, Box<dyn std::error::Error>> {
debug!("Connecting to database");

SqlitePool::connect(&db_url)
.await
.expect("Could not connect to the SQLite database")
let db_url = match &config.radio_setup().sqlite_file_path {
Some(path) => {
let cwd = std::env::current_dir().unwrap();
let absolute_path = cwd.join(path);

if !std::path::Path::new(&absolute_path).exists() {
std::fs::File::create(&absolute_path).expect("Failed to create the database file");
debug!("Database file created at {}", absolute_path.display());
}
None => SqlitePool::connect("sqlite::memory:")
.await
.expect("Failed to connect to the in-memory database"),
};

debug!("Check for database migration");
sqlx::migrate!("../migrations")
.run(&db)
format!("sqlite://{}", absolute_path.display())
}
None => String::from("sqlite::memory:"),
};

let db_pool = setup_database_connection(&db_url).await?;

debug!("Check for database migration");
sqlx::migrate!("../migrations")
.run(&db_pool)
.await
.expect("Could not run migration");

Ok(db_pool)
}

impl RadioOperator {
/// Create a radio operator with radio configurations, persisted data,
/// graphcast agent, and control flow
pub async fn new(config: &Config, agent: GraphcastAgent) -> RadioOperator {
debug!("Connecting to database");

let db = connect_to_database(config)
.await
.expect("Could not run migration");
.expect("Failed to connect to database");

debug!("Initializing Graphcast Agent");
let graphcast_agent = Arc::new(agent);
Expand Down
Loading