Skip to content

Commit

Permalink
stream replication frames
Browse files Browse the repository at this point in the history
  • Loading branch information
neubaner committed Sep 2, 2023
1 parent 160b965 commit 957e42f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
13 changes: 13 additions & 0 deletions crates/replication/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ impl Client {
Ok(frames)
}

pub async fn log_entries(
&self,
next_offset: u64,
) -> anyhow::Result<impl futures::Stream<Item = Frame> + Unpin> {
let mut client = self.replication.clone();
let frames = client
.log_entries(pb::LogOffset { next_offset })
.await?
.into_inner();

Ok(frames)
}

pub async fn execute(&self, sql: &str) -> anyhow::Result<()> {
let mut proxy = self.proxy.clone();

Expand Down
46 changes: 41 additions & 5 deletions crates/replication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub const WAL_MAGIC: u64 = u64::from_le_bytes(*b"SQLDWAL\0");
pub type FrameNo = u64;
use anyhow::Context;
pub use frame::{Frame, FrameHeader};
use futures::{Stream, StreamExt};
pub use replica::hook::{Frames, InjectorHookCtx};
use replica::snapshot::SnapshotFileHeader;
pub use replica::snapshot::TempSnapshot;
Expand Down Expand Up @@ -206,7 +207,7 @@ impl Replicator {
pub async fn sync_from_http(&self) -> anyhow::Result<usize> {
tracing::trace!("Syncing frames from HTTP");

let frames = match self.fetch_log_entries(false).await {
let mut frames = match self.fetch_log_entries(false).await {
Ok(frames) => Ok(frames),
Err(e) => {
if let Some(status) = e.downcast_ref::<tonic::Status>() {
Expand All @@ -221,14 +222,49 @@ impl Replicator {
}
}?;

// Based on tonic's default message limit
const FLUSH_BUFFER_THRESHOLD: usize = 8 * 1024 * 1024;

const FRAME_BUFFER_INITIAL_CAPACITY: usize = 64;

let mut frame_buffer = Vec::with_capacity(FRAME_BUFFER_INITIAL_CAPACITY);
let mut buffer_size = 0;
let mut len = 0;

while let Some(frame) = frames.next().await {
buffer_size += frame.as_slice().len();
frame_buffer.push(frame);

if buffer_size >= FLUSH_BUFFER_THRESHOLD {
let old_frame_buffer = core::mem::replace(
&mut frame_buffer,
Vec::with_capacity(FRAME_BUFFER_INITIAL_CAPACITY),
);
buffer_size = 0;

len += self.send_frames(old_frame_buffer).await?;
}
}

if !frame_buffer.is_empty() {
len += self.send_frames(frame_buffer).await?;
}

Ok(len)
}

async fn send_frames(&self, frames: Vec<Frame>) -> anyhow::Result<usize> {
let len = frames.len();
self.next_offset.fetch_add(len as u64, Ordering::Relaxed);
self.frames_sender.send(Frames::Vec(frames)).await?;
self.injector.step()?;
Ok(len)
}

async fn fetch_log_entries(&self, send_hello: bool) -> anyhow::Result<Vec<Frame>> {
async fn fetch_log_entries(
&self,
send_hello: bool,
) -> anyhow::Result<impl Stream<Item = Frame> + Unpin> {
let client = self
.client
.clone()
Expand All @@ -239,8 +275,8 @@ impl Replicator {
let _res = client.hello().await?;
}

client
.batch_log_entries(self.next_offset.load(Ordering::Relaxed))
.await
Ok(client
.log_entries(self.next_offset.load(Ordering::Relaxed))
.await?)
}
}

0 comments on commit 957e42f

Please sign in to comment.