Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replication: fetch replication log in a loop #350

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions crates/replication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,26 +216,36 @@ impl Replicator {
pub async fn sync_from_http(&self) -> anyhow::Result<usize> {
tracing::trace!("Syncing frames from HTTP");

let frames = match self.fetch_log_entries(false).await {
Ok(frames) => Ok(frames),
Err(e) => {
if let Some(status) = e.downcast_ref::<tonic::Status>() {
if status.code() == tonic::Code::FailedPrecondition {
self.fetch_log_entries(true).await
let mut applied_frames = 0;
loop {
let frames = match self.fetch_log_entries(false).await {
Ok(frames) => Ok(frames),
Err(e) => {
if let Some(status) = e.downcast_ref::<tonic::Status>() {
if status.code() == tonic::Code::FailedPrecondition {
self.fetch_log_entries(true).await
} else {
Err(e)
}
} else {
Err(e)
}
} else {
Err(e)
}
}?;

if frames.is_empty() {
break;
}
}?;

let len = frames.len();
self.next_offset.fetch_add(len as u64, Ordering::Relaxed);
self.frames_sender.send(Frames::Vec(frames)).await?;
self.injector.step()?;
Ok(len)
let len = frames.len();
self.next_offset.fetch_add(len as u64, Ordering::Relaxed);
self.frames_sender.send(Frames::Vec(frames)).await?;
self.injector.step()?;

applied_frames += len;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error path gets more complicated with a loop approach: what if we successfully applied 2048 frames, but then failed to replicate the next batch? Current approach just returns an error, but "next_offset" still points after the frames we partially applied. Maybe we should retry a few times, and only return an error if the attempts failed. And if we decide not to retry, which makes the code simpler, there's also another issue: sync_from_http already returns a Result<usize>, and that brings the question what to return if we applied some frames, but failed to apply some more recent ones. We should either return the number of partial frames and report "success", because some frames were applied after all, or create a new specialized error case, like PartialUpdate(usize). I think it's reasonable to return success on partial application, but let's discuss. /cc @penberg @LucioFranco @MarinPostma

Side note - there's a preexisting error here, because we bump next_offset in line 235 before sending frames. If sending fails, we end up with next_offset already updated, and the frames won't be fetched anymore. I'll fix that asap as a separate patch, but meanwhile, the new code should also only update next_offset after frames are successfully sent.
Back to error handling - I think we need to things:

  1. Retry if getting frames failed, but some previous frames were already applied in this call - to prevent partial updates.
  2. Clearly comment in the code that it's now possible to get a partial result
  3. Decide if we return an Ok(usize) on partial application of frames, or some special error code. I think Ok(usize) is fair

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the next_offset update after injector.step() as in the patch you pushed.

The major downside I can see with returning Ok(usize) is that the user of the library would not be able to tell if it was a partial update. That's fine for sync because it runs in a loop anyway, but for sync_oneshot it would be nice to know it the sync was a partial, so the user can retry.

What do you think about returning something like an Ok(ReplicatedFrames) for sync_oneshot instead of an specialized error?

// Could be a struct with a boolean flag or enum
enum ReplicatedFrames {
    Partial(usize),
    Full(usize),
}

impl ReplicatedFrames {
    /// Number of frames that were synced successfully
    pub fn frames_synced(&self) -> usize {
        match self {
            ReplicatedFrames::Partial(frames) => *frames,
            ReplicatedFrames::Full(frames) => *frames,
        }
    }

    /// A partial sync indicates that more frames _might_ be available to fetch
    pub fn sync_partially(&self) -> bool {
        matches!(self, ReplicatedFrames::Partial(_))
    }
}

Ok(applied_frames)
}

async fn fetch_log_entries(&self, send_hello: bool) -> anyhow::Result<Vec<Frame>> {
Expand Down
Loading