Skip to content

Commit

Permalink
libsql: attach databases from other namespaces as readonly
Browse files Browse the repository at this point in the history
With this proof-of-concept patch, other namespaces hosted
on the same sqld machine are now attached in readonly mode,
so that users can read from other databases when connected
to a particular one.

TODO:
 * only attach databases that were explicitly requested
 * allow detaching
 * hide it as an opt-in flag: abuse risk
  • Loading branch information
psarna committed Dec 14, 2023
1 parent 1fbc2b1 commit 71c5a77
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 1 deletion.
1 change: 1 addition & 0 deletions libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ http-body = "0.4"
url = { version = "2.3", features = ["serde"] }
uuid = { version = "1.3", features = ["v4", "serde"] }
zerocopy = { version = "0.7.28", features = ["derive"] }
arc-swap = "1.6.0"

[dev-dependencies]
arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] }
Expand Down
75 changes: 74 additions & 1 deletion libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use arc_swap::ArcSwapOption;
use libsql_sys::wal::wrapper::{WalWrapper, WrapWal, WrappedWal};
use libsql_sys::wal::{Wal, WalManager};
use metrics::{histogram, increment_counter};
use parking_lot::{Mutex, RwLock};
use rusqlite::ffi::SQLITE_BUSY;
use rusqlite::{DatabaseName, ErrorCode, OpenFlags, StatementStatus, TransactionState};
use std::collections::HashSet;
use tokio::sync::{watch, Notify};
use tokio::time::{Duration, Instant};

Expand Down Expand Up @@ -40,6 +42,8 @@ pub struct MakeLibSqlConn<T: WalManager> {
auto_checkpoint: u32,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
state: Arc<TxnState<T::Wal>>,
// Information whether database1 is attached to database2
attachments: ArcSwapOption<HashSet<(String, String)>>,
/// In wal mode, closing the last database takes time, and causes other databases creation to
/// return sqlite busy. To mitigate that, we hold on to one connection
_db: Option<LibSqlConnection<T::Wal>>,
Expand Down Expand Up @@ -74,6 +78,7 @@ where
_db: None,
state: Default::default(),
wal_manager,
attachments: ArcSwapOption::from(None),
};

let db = this.try_create_db().await?;
Expand Down Expand Up @@ -125,6 +130,7 @@ where
},
self.current_frame_no_receiver.clone(),
self.state.clone(),
self.attachments.load_full(),
)
.await
}
Expand All @@ -141,6 +147,30 @@ where
async fn create(&self) -> Result<Self::Connection, Error> {
self.make_connection().await
}

fn register_attached(&self, db_name: String, attached_db_name: String) {
self.attachments.rcu(|attachments| {
let mut attachments = if let Some(attachments) = attachments {
HashSet::clone(attachments)
} else {
HashSet::new()
};
attachments.insert((db_name.clone(), attached_db_name.clone()));
Some(Arc::new(attachments))
});
}

fn unregister_attached(&self, db_name: String, attached_db_name: String) {
self.attachments.rcu(|attachments| {
if let Some(attachments) = attachments {
let mut attachments = HashSet::clone(attachments);
attachments.remove(&(db_name.clone(), attached_db_name.clone()));
Some(Arc::new(attachments))
} else {
None
}
});
}
}

pub struct LibSqlConnection<T> {
Expand Down Expand Up @@ -180,7 +210,7 @@ impl<W: Wal> WrapWal<W> for InhibitCheckpointWalWrapper {
_buf: &mut [u8],
) -> libsql_sys::wal::Result<(u32, u32)> {
tracing::warn!(
"chackpoint inhibited: this connection is not allowed to perform checkpoints"
"checkpoint inhibited: this connection is not allowed to perform checkpoints"
);
Err(rusqlite::ffi::Error::new(SQLITE_BUSY))
}
Expand Down Expand Up @@ -245,6 +275,43 @@ where
libsql_sys::Connection::open(path.join("data"), flags, wal_manager, auto_checkpoint)
}

fn attach_databases<W: Wal + Send + 'static>(
conn: &libsql_sys::Connection<W>,
path: &Path,
attachments: &HashSet<(String, String)>,
) {
let db_name = path
.file_name()
.map(std::ffi::OsStr::to_str)
.flatten()
.unwrap_or("");
tracing::info!("DB name: {db_name}");

let dbs_path = path
.parent()
.unwrap_or_else(|| std::path::Path::new(".."))
.canonicalize()
.unwrap_or_else(|_| std::path::PathBuf::from(".."));
// Linear, under the assumption ATTACH is rare (it better be,
// or the amount of open descriptors will quickly blow up).
for (db1_name, db2_name) in attachments {
tracing::info!("attach entry: {db1_name} -> {db2_name}");
if db1_name == db_name {
let attached_path = dbs_path.join(&db2_name).join("data");
tracing::info!("Attaching {} to {db_name}", attached_path.display());
let query = format!(
"ATTACH DATABASE 'file:{}?mode=ro' AS \"{db2_name}\"",
attached_path.display()
);
if let Err(e) = conn.execute(&query, ()) {
tracing::warn!("Failed to attach database {}: {e}", attached_path.display(),);
} else {
tracing::debug!("Attached {} as {db2_name}", attached_path.display());
}
}
}
}

impl<W> LibSqlConnection<W>
where
W: Wal + Send + 'static,
Expand All @@ -258,6 +325,7 @@ where
builder_config: QueryBuilderConfig,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
state: Arc<TxnState<W>>,
attachments: Option<Arc<HashSet<(String, String)>>>,
) -> crate::Result<Self>
where
T: WalManager<Wal = W> + Send + 'static,
Expand All @@ -276,6 +344,11 @@ where
)?;
conn.conn
.pragma_update(None, "max_page_count", max_db_size)?;

if let Some(attachments) = attachments.as_ref() {
attach_databases(&conn.conn, path.as_ref(), attachments);
}

Ok(conn)
})
.await
Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ pub trait MakeConnection: Send + Sync + 'static {
/// Create a new connection of type Self::Connection
async fn create(&self) -> Result<Self::Connection, Error>;

/// Update the information on attached databases
fn register_attached(&self, _db_name: String, _attached_db_name: String) {}
fn unregister_attached(&self, _db_name: String, _attached_db_name: String) {}

fn throttled(
self,
conccurency: usize,
Expand Down

0 comments on commit 71c5a77

Please sign in to comment.