Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Revert "Same-thread frame injection" #775

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 111 additions & 140 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ hyper = { version = "0.14.23", features = ["http2"] }
hyper-tungstenite = "0.10"
itertools = "0.10.5"
jsonwebtoken = "8.2.0"
memmap = "0.7.0"
mimalloc = { version = "0.1.36", default-features = false }
nix = { version = "0.26.2", features = ["fs"] }
once_cell = "1.17.0"
Expand Down
Binary file removed sqld/assets/test/simple_wallog
Binary file not shown.
2 changes: 1 addition & 1 deletion sqld/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
std::env::set_var("PROTOC", protobuf_src::protoc());

let mut config = Config::new();
config.bytes([".wal_log"]);
config.bytes([".wal_log", ".proxy.ProgramReq.namespace"]);
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.type_attribute(".proxy", "#[cfg_attr(test, derive(arbitrary::Arbitrary))]")
Expand Down
7 changes: 6 additions & 1 deletion sqld/proto/replication_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ message LogOffset {
message HelloRequest {}

message HelloResponse {
string log_id = 3;
/// Uuid of the current generation
string generation_id = 1;
/// First frame_no in the current generation
uint64 generation_start_index = 2;
/// Uuid of the database being replicated
string database_id = 3;
}

message Frame {
Expand Down
3 changes: 0 additions & 3 deletions sqld/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ pub enum Error {
ConflictingRestoreParameters,
#[error("Failed to fork database: {0}")]
Fork(#[from] ForkError),
#[error("Fatal replication error")]
FatalReplicationError,
}

trait ResponseError: std::error::Error {
Expand Down Expand Up @@ -134,7 +132,6 @@ impl IntoResponse for Error {
LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST),
ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST),
Fork(e) => e.into_response(),
FatalReplicationError => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions sqld/src/namespace/fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::time::Duration;
use tokio_stream::StreamExt;

use crate::database::PrimaryDatabase;
use crate::replication::frame::FrameBorrowed;
use crate::replication::frame::Frame;
use crate::replication::primary::frame_stream::FrameStream;
use crate::replication::{LogReadError, ReplicationLogger};
use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE};
Expand Down Expand Up @@ -41,7 +41,7 @@ impl From<tokio::task::JoinError> for ForkError {
}
}

async fn write_frame(frame: &FrameBorrowed, temp_file: &mut tokio::fs::File) -> Result<()> {
async fn write_frame(frame: Frame, temp_file: &mut tokio::fs::File) -> Result<()> {
let page_no = frame.header().page_no;
let page_pos = (page_no - 1) as usize * LIBSQL_PAGE_SIZE as usize;
temp_file.seek(SeekFrom::Start(page_pos as u64)).await?;
Expand Down Expand Up @@ -128,7 +128,7 @@ impl ForkTask<'_> {
match res {
Ok(frame) => {
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
write_frame(&frame, &mut data_file).await?;
write_frame(frame, &mut data_file).await?;
}
Err(LogReadError::SnapshotRequired) => {
let snapshot = loop {
Expand All @@ -147,7 +147,7 @@ impl ForkTask<'_> {
for frame in iter {
let frame = frame.map_err(ForkError::LogRead)?;
next_frame_no = next_frame_no.max(frame.header().frame_no + 1);
write_frame(&frame, &mut data_file).await?;
write_frame(frame, &mut data_file).await?;
}
}
Err(LogReadError::Ahead) => {
Expand Down
11 changes: 7 additions & 4 deletions sqld/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,27 +561,30 @@ impl Namespace<ReplicaDatabase> {
DatabaseConfigStore::load(&db_path).context("Could not load database config")?,
);

let mut join_set = JoinSet::new();
let replicator = Replicator::new(
db_path.clone(),
config.channel.clone(),
config.uri.clone(),
name.clone(),
&mut join_set,
reset,
)
.await?;
let applied_frame_no_receiver = replicator.current_frame_no_notifier.subscribe();
let mut join_set = JoinSet::new();
join_set.spawn(replicator.run());

let applied_frame_no_receiver = replicator.current_frame_no_notifier.clone();

let stats = make_stats(
&db_path,
&mut join_set,
config.stats_sender.clone(),
name.clone(),
applied_frame_no_receiver.clone(),
replicator.current_frame_no_notifier.clone(),
)
.await?;

join_set.spawn(replicator.run());

let connection_maker = MakeWriteProxyConn::new(
db_path.clone(),
config.extensions.clone(),
Expand Down
149 changes: 38 additions & 111 deletions sqld/src/replication/frame.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::alloc::Layout;
use std::borrow::Cow;
use std::fmt;
use std::mem::size_of;
use std::ops::{Deref, DerefMut};
use std::mem::{size_of, transmute};
use std::ops::Deref;

use bytemuck::{bytes_of, from_bytes, Pod, Zeroable};
use bytes::Bytes;
use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable};
use bytes::{Bytes, BytesMut};

use crate::LIBSQL_PAGE_SIZE;

Expand All @@ -28,18 +28,10 @@ pub struct FrameHeader {
}

#[derive(Clone, serde::Serialize, serde::Deserialize)]
/// The shared version of a replication frame.
/// The owned version of a replication frame.
/// Cloning this is cheap.
pub struct Frame {
inner: Bytes,
}

impl TryFrom<&[u8]> for Frame {
type Error = anyhow::Error;

fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
Ok(FrameMut::try_from(data)?.into())
}
data: Bytes,
}

impl fmt::Debug for Frame {
Expand All @@ -51,129 +43,64 @@ impl fmt::Debug for Frame {
}
}

/// Owned version of a frame, on the heap
pub struct FrameMut {
inner: Box<FrameBorrowed>,
}

impl TryFrom<&[u8]> for FrameMut {
type Error = anyhow::Error;

fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
anyhow::ensure!(
data.len() == size_of::<FrameBorrowed>(),
"invalid frame size"
);
// frames are relatively large (~4ko), we want to avoid allocating them on the stack and
// then copying them to the heap, and instead copy them to the heap directly.
let inner = unsafe {
let layout = Layout::new::<FrameBorrowed>();
let ptr = std::alloc::alloc(layout);
ptr.copy_from(data.as_ptr(), data.len());
Box::from_raw(ptr as *mut FrameBorrowed)
};

Ok(Self { inner })
}
}
impl Frame {
/// size of a single frame
pub const SIZE: usize = size_of::<FrameHeader>() + LIBSQL_PAGE_SIZE as usize;

impl From<FrameMut> for Frame {
fn from(value: FrameMut) -> Self {
// transmute the FrameBorrowed into a Box<[u8; _]>. This is safe because the alignment of
// [u8] divides the alignment of FrameBorrowed
let data = unsafe {
Vec::from_raw_parts(
Box::into_raw(value.inner) as *mut u8,
size_of::<FrameBorrowed>(),
size_of::<FrameBorrowed>(),
)
};

Self {
inner: Bytes::from(data),
}
}
}
pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize);
let mut buf = BytesMut::with_capacity(Self::SIZE);
buf.extend_from_slice(bytes_of(header));
buf.extend_from_slice(data);

impl From<FrameBorrowed> for FrameMut {
fn from(inner: FrameBorrowed) -> Self {
Self {
inner: Box::new(inner),
}
Self { data: buf.freeze() }
}
}

impl Frame {
pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self {
FrameBorrowed::from_parts(header, data).into()
pub fn try_from_bytes(data: Bytes) -> anyhow::Result<Self> {
anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size");
Ok(Self { data })
}

pub fn bytes(&self) -> Bytes {
self.inner.clone()
}
}

impl From<FrameBorrowed> for Frame {
fn from(value: FrameBorrowed) -> Self {
FrameMut::from(value).into()
self.data.clone()
}
}

/// The borrowed version of Frame
#[repr(C)]
#[derive(Pod, Zeroable, Copy, Clone)]
#[repr(transparent)]
pub struct FrameBorrowed {
header: FrameHeader,
page: [u8; LIBSQL_PAGE_SIZE as usize],
data: [u8],
}

impl FrameBorrowed {
/// Returns the bytes for this frame. Includes the header bytes.
pub fn as_slice(&self) -> &[u8] {
bytes_of(self)
}

/// returns this frame's page data.
pub fn page(&self) -> &[u8] {
&self.page
pub fn header(&self) -> Cow<FrameHeader> {
let data = &self.data[..size_of::<FrameHeader>()];
try_from_bytes(data)
.map(Cow::Borrowed)
.unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data)))
}

pub fn header(&self) -> &FrameHeader {
&self.header
/// Returns the bytes for this frame. Includes the header bytes.
pub fn as_slice(&self) -> &[u8] {
&self.data
}

pub fn header_mut(&mut self) -> &mut FrameHeader {
&mut self.header
pub fn from_bytes(data: &[u8]) -> &Self {
assert_eq!(data.len(), Frame::SIZE);
// SAFETY: &FrameBorrowed is equivalent to &[u8]
unsafe { transmute(data) }
}

pub fn from_parts(header: &FrameHeader, page: &[u8]) -> Self {
assert_eq!(page.len(), LIBSQL_PAGE_SIZE as usize);

FrameBorrowed {
header: *header,
page: page.try_into().unwrap(),
}
/// returns this frame's page data.
pub fn page(&self) -> &[u8] {
&self.data[size_of::<FrameHeader>()..]
}
}

impl Deref for Frame {
type Target = FrameBorrowed;

fn deref(&self) -> &Self::Target {
from_bytes(&self.inner)
}
}

impl Deref for FrameMut {
type Target = FrameBorrowed;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}

impl DerefMut for FrameMut {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut()
FrameBorrowed::from_bytes(&self.data)
}
}
Loading
Loading