Skip to content

Commit

Permalink
Merge pull request #1878 from tursodatabase/improve-wal-sync
Browse files Browse the repository at this point in the history
Improve WAL sync logic
  • Loading branch information
penberg authored Dec 13, 2024
2 parents c8a5006 + 19e7800 commit f8954db
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 80 deletions.
3 changes: 3 additions & 0 deletions libsql/examples/offline_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ async fn main() {

let conn = db.connect().unwrap();

println!("Syncing database from remote...");
db.sync().await.unwrap();

conn.execute(
r#"
CREATE TABLE IF NOT EXISTS guest_book_entries (
Expand Down
55 changes: 0 additions & 55 deletions libsql/examples/offline_writes_pull.rs

This file was deleted.

54 changes: 29 additions & 25 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,29 +391,36 @@ impl Database {
use crate::sync::SyncError;
use crate::Error;

match self.try_push().await {
Ok(rep) => Ok(rep),
Err(Error::Sync(err)) => {
// Retry the sync because we are ahead of the server and we need to push some older
// frames.
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
err.downcast_ref::<SyncError>()
{
tracing::debug!("got InvalidPushFrameNo, retrying push");
self.try_push().await
} else {
Err(Error::Sync(err))
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

let durable_frame_no = sync_ctx.durable_frame_num();
let max_frame_no = conn.wal_frame_count();

if max_frame_no > durable_frame_no {
match self.try_push(&mut sync_ctx, &conn).await {
Ok(rep) => Ok(rep),
Err(Error::Sync(err)) => {
// Retry the sync because we are ahead of the server and we need to push some older
// frames.
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
err.downcast_ref::<SyncError>()
{
tracing::debug!("got InvalidPushFrameNo, retrying push");
self.try_push(&mut sync_ctx, &conn).await
} else {
Err(Error::Sync(err))
}
}
Err(e) => Err(e),
}
Err(e) => Err(e),
} else {
self.try_pull(&mut sync_ctx, &conn).await
}
}

#[cfg(feature = "sync")]
async fn try_push(&self) -> Result<crate::database::Replicated> {
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
let page_size = {
let rows = conn
.query("PRAGMA page_size", crate::params::Params::None)?
Expand All @@ -424,9 +431,11 @@ impl Database {
};

let max_frame_no = conn.wal_frame_count();

if max_frame_no == 0 {
return self.try_pull(&mut sync_ctx).await;
return Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 0,
});
}

let generation = sync_ctx.generation(); // TODO: Probe from WAL.
Expand All @@ -452,10 +461,6 @@ impl Database {

sync_ctx.write_metadata().await?;

if start_frame_no > end_frame_no {
return self.try_pull(&mut sync_ctx).await;
}

// TODO(lucio): this can underflow if the server previously returned a higher max_frame_no
// than what we have stored here.
let frame_count = end_frame_no - start_frame_no + 1;
Expand All @@ -466,10 +471,9 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn try_pull(&self, sync_ctx: &mut SyncContext) -> Result<crate::database::Replicated> {
async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
let generation = sync_ctx.generation();
let mut frame_no = sync_ctx.durable_frame_num() + 1;
let conn = self.connect()?;
conn.wal_insert_begin()?;

let mut err = None;
Expand Down

0 comments on commit f8954db

Please sign in to comment.