From 1c5948daec652f00fb52a8eb9d4a2cdf2def2e4e Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 13 Oct 2023 10:29:49 -0700 Subject: [PATCH] Revert "Same-thread frame injection (#695)" This reverts commit ebb7ae932ae0ed50433c9ba90c2cdc0101cff6ba. --- Cargo.lock | 251 ++++++++---------- sqld/Cargo.toml | 1 + sqld/assets/test/simple_wallog | Bin 28904 -> 0 bytes sqld/build.rs | 2 +- sqld/proto/replication_log.proto | 7 +- sqld/src/error.rs | 3 - sqld/src/namespace/fork.rs | 8 +- sqld/src/namespace/mod.rs | 11 +- sqld/src/replication/frame.rs | 149 +++-------- sqld/src/replication/primary/logger.rs | 47 ++-- sqld/src/replication/replica/error.rs | 6 +- sqld/src/replication/replica/injector.rs | 51 ++++ .../replication/replica/injector/headers.rs | 47 ---- sqld/src/replication/replica/injector/hook.rs | 162 ----------- sqld/src/replication/replica/injector/mod.rs | 251 ------------------ sqld/src/replication/replica/meta.rs | 160 +++++------ sqld/src/replication/replica/mod.rs | 2 + sqld/src/replication/replica/replicator.rs | 198 +++++++++----- sqld/src/replication/replica/snapshot.rs | 50 ++++ sqld/src/replication/snapshot.rs | 82 +++--- sqld/src/rpc/replication_log.rs | 16 +- ...es__dumps__load_namespace_from_no_txn.snap | 2 +- 22 files changed, 527 insertions(+), 979 deletions(-) delete mode 100644 sqld/assets/test/simple_wallog create mode 100644 sqld/src/replication/replica/injector.rs delete mode 100644 sqld/src/replication/replica/injector/headers.rs delete mode 100644 sqld/src/replication/replica/injector/hook.rs delete mode 100644 sqld/src/replication/replica/injector/mod.rs create mode 100644 sqld/src/replication/replica/snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index cdb03085..44e946e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,9 +50,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.1.2" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" dependencies = [ "memchr", ] @@ -155,9 +155,9 @@ checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" [[package]] name = "arbitrary" -version = "1.3.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2e1373abdaa212b704512ec2bd8b26bd0b7d5c3f70117411a5d9a451383c859" +checksum = "e2d098ff73c1ca148721f37baad5ea6a465a13f9573aba8641fbbbae8164a54e" dependencies = [ "derive_arbitrary", ] @@ -214,7 +214,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -225,7 +225,7 @@ version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -278,7 +278,7 @@ dependencies = [ "hex", "http", "hyper", - "ring 0.16.20", + "ring", "time", "tokio", "tower", @@ -747,7 +747,7 @@ dependencies = [ "lazycell", "peeking_take_while", "prettyplease", - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "regex", "rustc-hash", @@ -769,7 +769,7 @@ dependencies = [ "log", "peeking_take_while", "prettyplease", - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "regex", "rustc-hash", @@ -896,7 +896,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -959,7 +959,7 @@ dependencies = [ "io-lifetimes", "ipnet", "maybe-owned", - "rustix 0.35.15", + "rustix 0.35.14", "winapi-util", "windows-sys 0.36.1", "winx", @@ -985,7 +985,7 @@ dependencies = [ "io-extras", "io-lifetimes", "ipnet", - "rustix 0.35.15", + "rustix 0.35.14", ] [[package]] @@ -996,7 +996,7 @@ checksum = "c3a0524f7c4cff2ea547ae2b652bf7a348fd3e48f76556dc928d8b45ab2f1d50" dependencies = [ "cap-primitives", "once_cell", - "rustix 0.35.15", + "rustix 0.35.14", "winx", ] @@ -1080,7 +1080,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0862016ff20d69b84ef8247369fabf5c008a7417002411897d40ee1f4532b873" dependencies = [ "heck", - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -1430,7 +1430,7 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53e0efad4403bfc52dc201159c4b842a246a14b98c64b55dfd0f2d89729dfeb8" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -1546,10 +1546,11 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.5" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" +checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" dependencies = [ + "errno-dragonfly", "libc", "windows-sys 0.48.0", ] @@ -1651,7 +1652,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a267b6a9304912e018610d53fe07115d8b530b160e85db4d2d3a59f3ddde1aec" dependencies = [ "io-lifetimes", - "rustix 0.35.15", + "rustix 0.35.14", "windows-sys 0.36.1", ] @@ -1709,7 +1710,7 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -2154,9 +2155,9 @@ dependencies = [ [[package]] name = "insta" -version = "1.34.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d64600be34b2fcfc267740a243fa7744441bb4947a619ac4e5bb6507f35fbfc" +checksum = "1aa511b2e298cd49b1856746f6bb73e17036bcd66b25f5e92cdcdbec9bd75686" dependencies = [ "console", "lazy_static", @@ -2209,7 +2210,7 @@ checksum = "0d508111813f9af3afd2f92758f77e4ed2cc9371b642112c6a48d22eb73105c5" dependencies = [ "hermit-abi 0.2.6", "io-lifetimes", - "rustix 0.35.15", + "rustix 0.35.14", "windows-sys 0.36.1", ] @@ -2220,7 +2221,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi 0.3.3", - "rustix 0.38.19", + "rustix 0.38.17", "windows-sys 0.48.0", ] @@ -2270,9 +2271,9 @@ dependencies = [ [[package]] name = "jobserver" -version = "0.1.27" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" dependencies = [ "libc", ] @@ -2294,7 +2295,7 @@ checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ "base64 0.21.4", "pem", - "ring 0.16.20", + "ring", "serde", "serde_json", "simple_asn1", @@ -2320,9 +2321,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.149" +version = "0.2.148" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" [[package]] name = "libloading" @@ -2336,9 +2337,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.8" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" [[package]] name = "libmimalloc-sys" @@ -2456,9 +2457,9 @@ checksum = "d4d2456c373231a208ad294c33dc5bff30051eafd954cd4caae83a712b12854d" [[package]] name = "linux-raw-sys" -version = "0.4.10" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" +checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db" [[package]] name = "lock_api" @@ -2537,7 +2538,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" dependencies = [ - "rustix 0.38.19", + "rustix 0.38.17", ] [[package]] @@ -2612,7 +2613,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -2735,9 +2736,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" dependencies = [ "autocfg", "libm", @@ -2921,7 +2922,7 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -2962,7 +2963,7 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "syn 2.0.38", ] @@ -2987,9 +2988,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c" dependencies = [ "unicode-ident", ] @@ -3054,7 +3055,7 @@ checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", "itertools 0.11.0", - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -3123,7 +3124,7 @@ version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", ] [[package]] @@ -3247,14 +3248,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.0" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d119d7c7ca818f8a53c300863d4f87566aac09943aef5b355bb83969dae75d87" +checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.1", - "regex-syntax 0.8.1", + "regex-automata 0.3.9", + "regex-syntax 0.7.5", ] [[package]] @@ -3268,13 +3269,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.1" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "465c6fc0621e4abc4187a2bda0937bfd4f722c2730b29562e19689ea796c9a4b" +checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.1", + "regex-syntax 0.7.5", ] [[package]] @@ -3289,12 +3290,6 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" -[[package]] -name = "regex-syntax" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56d84fdd47036b038fc80dd333d10b6aab10d5d31f4a366e20014def75328d33" - [[package]] name = "reqwest" version = "0.11.22" @@ -3344,26 +3339,12 @@ dependencies = [ "cc", "libc", "once_cell", - "spin 0.5.2", - "untrusted 0.7.1", + "spin", + "untrusted", "web-sys", "winapi", ] -[[package]] -name = "ring" -version = "0.17.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9babe80d5c16becf6594aa32ad2be8fe08498e7ae60b77de8df700e67f191d7e" -dependencies = [ - "cc", - "getrandom", - "libc", - "spin 0.9.8", - "untrusted 0.9.0", - "windows-sys 0.48.0", -] - [[package]] name = "rusqlite" version = "0.29.0" @@ -3400,9 +3381,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.35.15" +version = "0.35.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413c4d41e2f1b0814c63567d11618483de0bd64f451b452f2ca43896579486ba" +checksum = "6380889b07a03b5ecf1d44dc9ede6fd2145d84b502a2a9ca0b03c48e0cc3220f" dependencies = [ "bitflags 1.3.2", "errno 0.2.8", @@ -3416,14 +3397,14 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.19" +version = "0.38.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" +checksum = "f25469e9ae0f3d0047ca8b93fc56843f38e6774f0914a107ff8b41be8be8e0b7" dependencies = [ "bitflags 2.4.0", - "errno 0.3.5", + "errno 0.3.4", "libc", - "linux-raw-sys 0.4.10", + "linux-raw-sys 0.4.8", "windows-sys 0.48.0", ] @@ -3434,7 +3415,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" dependencies = [ "log", - "ring 0.16.20", + "ring", "sct", "webpki", ] @@ -3446,7 +3427,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", - "ring 0.16.20", + "ring", "rustls-webpki 0.101.6", "sct", ] @@ -3478,8 +3459,8 @@ version = "0.100.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3" dependencies = [ - "ring 0.16.20", - "untrusted 0.7.1", + "ring", + "untrusted", ] [[package]] @@ -3488,8 +3469,8 @@ version = "0.101.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" dependencies = [ - "ring 0.16.20", - "untrusted 0.7.1", + "ring", + "untrusted", ] [[package]] @@ -3543,8 +3524,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring 0.16.20", - "untrusted 0.7.1", + "ring", + "untrusted", ] [[package]] @@ -3572,26 +3553,26 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.20" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" +checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" [[package]] name = "serde" -version = "1.0.189" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" +checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.189" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" +checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -3700,9 +3681,9 @@ dependencies = [ [[package]] name = "similar" -version = "2.3.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aeaf503862c419d66959f5d7ca015337d864e9c49485d771b732e2a20453597" +checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" [[package]] name = "simple_asn1" @@ -3775,12 +3756,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "sqld" version = "0.21.9" @@ -3819,6 +3794,7 @@ dependencies = [ "jsonwebtoken", "libsql", "libsql-client", + "memmap", "metrics", "metrics-exporter-prometheus", "mimalloc", @@ -3927,7 +3903,7 @@ version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "unicode-ident", ] @@ -3938,7 +3914,7 @@ version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "unicode-ident", ] @@ -3981,7 +3957,7 @@ dependencies = [ "cap-fs-ext", "cap-std", "io-lifetimes", - "rustix 0.35.15", + "rustix 0.35.14", "windows-sys 0.36.1", "winx", ] @@ -4001,7 +3977,7 @@ dependencies = [ "cfg-if", "fastrand 2.0.1", "redox_syscall 0.3.5", - "rustix 0.38.19", + "rustix 0.38.17", "windows-sys 0.48.0", ] @@ -4029,7 +4005,7 @@ version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -4091,9 +4067,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ "backtrace", "bytes", @@ -4125,7 +4101,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -4249,7 +4225,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ "prettyplease", - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "prost-build", "quote 1.0.33", "syn 2.0.38", @@ -4368,7 +4344,7 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", ] @@ -4527,12 +4503,6 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" -[[package]] -name = "untrusted" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" - [[package]] name = "url" version = "2.4.1" @@ -4649,7 +4619,7 @@ dependencies = [ "io-lifetimes", "is-terminal 0.3.0", "once_cell", - "rustix 0.35.15", + "rustix 0.35.14", "system-interface", "tracing", "wasi-common", @@ -4667,7 +4637,7 @@ dependencies = [ "cap-rand", "cap-std", "io-extras", - "rustix 0.35.15", + "rustix 0.35.14", "thiserror", "tracing", "wasmtime", @@ -4694,7 +4664,7 @@ dependencies = [ "bumpalo", "log", "once_cell", - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", "wasm-bindgen-shared", @@ -4728,7 +4698,7 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 2.0.38", "wasm-bindgen-backend", @@ -4743,9 +4713,9 @@ checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] name = "wasm-encoder" -version = "0.34.1" +version = "0.33.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f14a94e06a3e2ed1af4e80cac712fed883142019ebe33c3899fd1b5e8550df9d" +checksum = "34180c89672b3e4825c3a8db4b61a674f1447afd5fe2445b2d22c3d8b6ea086c" dependencies = [ "leb128", ] @@ -4811,7 +4781,7 @@ dependencies = [ "directories-next", "file-per-thread-logger", "log", - "rustix 0.35.15", + "rustix 0.35.14", "serde", "sha2", "toml", @@ -4867,7 +4837,7 @@ checksum = "e5f54abc960b4a055ba16b942cbbd1da641e0ad44cc97a7608f3d43c069b120e" dependencies = [ "cc", "cfg-if", - "rustix 0.35.15", + "rustix 0.35.14", "wasmtime-asm-macros", "windows-sys 0.36.1", ] @@ -4906,7 +4876,7 @@ checksum = "fe057012a0ba6cee3685af1e923d6e0a6cb9baf15fb3ffa4be3d7f712c7dec42" dependencies = [ "object 0.29.0", "once_cell", - "rustix 0.35.15", + "rustix 0.35.14", ] [[package]] @@ -4937,7 +4907,7 @@ dependencies = [ "memoffset 0.6.5", "paste", "rand", - "rustix 0.35.15", + "rustix 0.35.14", "thiserror", "wasmtime-asm-macros", "wasmtime-environ", @@ -4982,9 +4952,9 @@ dependencies = [ [[package]] name = "wast" -version = "66.0.1" +version = "66.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49d1457e95d4b8e1f72bd50f5ed804931f94cf1b5449697255aef466e46fa4b0" +checksum = "0da7529bb848d58ab8bf32230fc065b363baee2bd338d5e58c589a1e7d83ad07" dependencies = [ "leb128", "memchr", @@ -4994,11 +4964,11 @@ dependencies = [ [[package]] name = "wat" -version = "1.0.76" +version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "964639e3c731f12b7bf6be78f0b2c3e646321acab18e7cb9f18e44c6720bb4fa" +checksum = "4780374047c65b6b6e86019093fe80c18b66825eb684df778a4e068282a780e7" dependencies = [ - "wast 66.0.1", + "wast 66.0.0", ] [[package]] @@ -5013,12 +4983,12 @@ dependencies = [ [[package]] name = "webpki" -version = "0.22.4" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" +checksum = "07ecc0cd7cac091bf682ec5efa18b1cff79d617b84181f38b3951dbe135f607f" dependencies = [ - "ring 0.17.3", - "untrusted 0.9.0", + "ring", + "untrusted", ] [[package]] @@ -5045,7 +5015,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.19", + "rustix 0.38.17", ] [[package]] @@ -5071,7 +5041,7 @@ checksum = "ba5796f53b429df7d44cfdaae8f6d9cd981d82aec3516561352ca9c5e73ee185" dependencies = [ "anyhow", "heck", - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "shellexpand", "syn 1.0.109", @@ -5084,7 +5054,7 @@ version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b830eb7203d48942fb8bc8bb105f76e7d09c33a082d638e990e02143bb2facd" dependencies = [ - "proc-macro2 1.0.69", + "proc-macro2 1.0.68", "quote 1.0.33", "syn 1.0.109", "wiggle-generate", @@ -5395,10 +5365,11 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "2.0.9+zstd.1.5.5" +version = "2.0.8+zstd.1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" +checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" dependencies = [ "cc", + "libc", "pkg-config", ] diff --git a/sqld/Cargo.toml b/sqld/Cargo.toml index 9ef5e1d2..1c9f9d49 100644 --- a/sqld/Cargo.toml +++ b/sqld/Cargo.toml @@ -30,6 +30,7 @@ hyper = { version = "0.14.23", features = ["http2"] } hyper-tungstenite = "0.10" itertools = "0.10.5" jsonwebtoken = "8.2.0" +memmap = "0.7.0" mimalloc = { version = "0.1.36", default-features = false } nix = { version = "0.26.2", features = ["fs"] } once_cell = "1.17.0" diff --git a/sqld/assets/test/simple_wallog b/sqld/assets/test/simple_wallog deleted file mode 100644 index 42e5b3a914a0ad8f9cb3600fb11e4c2af98990a7..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 28904 zcmeI)O(+Cm9LMpQXLd-^xCkfXpzOh0ialEFZ4TZdN_lCO@>0yfLCRSi+;EZ{U7WZo zB@&U79F(_<(jK@-o@YIy5yw5)?fY+LpJ$$(d79^QdwyHn-RZUW#XTda<~wJg;q_?u zwDq=f`SD^W7|+)oq1xiF3rCNsX6$Nc;c25TW$Ll+PH)^#x?|Hblf%B-C?65RXmMTn zwVEP5jlNYq648Ct1XF~DeszUWJGED&Zg#E=c~A%-fB*srAb#I|=btU3G3AZQS>Lpw?;#SW^gH3$l@x%Pbb>()j=~sL9AV0{| zYcUT32q1s}0tg_000IagfB*sr{4asvhyTN}>y`b2%ad>}U^?%HJSYSZKmY**5I_I{ z1Q0*~0R#}pK>=Zk0!y!}=6toOd?~gb&IN??CUXJ#K_P$u0tg_000IagfB*srAb>zF z30Ojg0xA~>{up4eqkb{Ib#oui1uQ3(&IM#cA%Fk^2q1s}0tg_000IagfIv#? y(OQ4+%RyCtBAg4@&eL};AiER-2q1s}0tg_000IagfB*srWT}8<3)B817x)B5eNwRi diff --git a/sqld/build.rs b/sqld/build.rs index b4519612..e719001c 100644 --- a/sqld/build.rs +++ b/sqld/build.rs @@ -7,7 +7,7 @@ fn main() -> Result<(), Box> { std::env::set_var("PROTOC", protobuf_src::protoc()); let mut config = Config::new(); - config.bytes([".wal_log"]); + config.bytes([".wal_log", ".proxy.ProgramReq.namespace"]); tonic_build::configure() .protoc_arg("--experimental_allow_proto3_optional") .type_attribute(".proxy", "#[cfg_attr(test, derive(arbitrary::Arbitrary))]") diff --git a/sqld/proto/replication_log.proto b/sqld/proto/replication_log.proto index 8ac5db45..b7f2ef89 100644 --- a/sqld/proto/replication_log.proto +++ b/sqld/proto/replication_log.proto @@ -8,7 +8,12 @@ message LogOffset { message HelloRequest {} message HelloResponse { - string log_id = 3; + /// Uuid of the current generation + string generation_id = 1; + /// First frame_no in the current generation + uint64 generation_start_index = 2; + /// Uuid of the database being replicated + string database_id = 3; } message Frame { diff --git a/sqld/src/error.rs b/sqld/src/error.rs index 84aae8cb..aab36bed 100644 --- a/sqld/src/error.rs +++ b/sqld/src/error.rs @@ -81,8 +81,6 @@ pub enum Error { ConflictingRestoreParameters, #[error("Failed to fork database: {0}")] Fork(#[from] ForkError), - #[error("Fatal replication error")] - FatalReplicationError, } trait ResponseError: std::error::Error { @@ -134,7 +132,6 @@ impl IntoResponse for Error { LoadDumpExistingDb => self.format_err(StatusCode::BAD_REQUEST), ConflictingRestoreParameters => self.format_err(StatusCode::BAD_REQUEST), Fork(e) => e.into_response(), - FatalReplicationError => self.format_err(StatusCode::INTERNAL_SERVER_ERROR), } } } diff --git a/sqld/src/namespace/fork.rs b/sqld/src/namespace/fork.rs index b49a6eba..b45d385c 100644 --- a/sqld/src/namespace/fork.rs +++ b/sqld/src/namespace/fork.rs @@ -10,7 +10,7 @@ use tokio::time::Duration; use tokio_stream::StreamExt; use crate::database::PrimaryDatabase; -use crate::replication::frame::FrameBorrowed; +use crate::replication::frame::Frame; use crate::replication::primary::frame_stream::FrameStream; use crate::replication::{LogReadError, ReplicationLogger}; use crate::{BLOCKING_RT, LIBSQL_PAGE_SIZE}; @@ -41,7 +41,7 @@ impl From for ForkError { } } -async fn write_frame(frame: &FrameBorrowed, temp_file: &mut tokio::fs::File) -> Result<()> { +async fn write_frame(frame: Frame, temp_file: &mut tokio::fs::File) -> Result<()> { let page_no = frame.header().page_no; let page_pos = (page_no - 1) as usize * LIBSQL_PAGE_SIZE as usize; temp_file.seek(SeekFrom::Start(page_pos as u64)).await?; @@ -128,7 +128,7 @@ impl ForkTask<'_> { match res { Ok(frame) => { next_frame_no = next_frame_no.max(frame.header().frame_no + 1); - write_frame(&frame, &mut data_file).await?; + write_frame(frame, &mut data_file).await?; } Err(LogReadError::SnapshotRequired) => { let snapshot = loop { @@ -147,7 +147,7 @@ impl ForkTask<'_> { for frame in iter { let frame = frame.map_err(ForkError::LogRead)?; next_frame_no = next_frame_no.max(frame.header().frame_no + 1); - write_frame(&frame, &mut data_file).await?; + write_frame(frame, &mut data_file).await?; } } Err(LogReadError::Ahead) => { diff --git a/sqld/src/namespace/mod.rs b/sqld/src/namespace/mod.rs index 647a9e3c..c5e76ee3 100644 --- a/sqld/src/namespace/mod.rs +++ b/sqld/src/namespace/mod.rs @@ -561,27 +561,30 @@ impl Namespace { DatabaseConfigStore::load(&db_path).context("Could not load database config")?, ); + let mut join_set = JoinSet::new(); let replicator = Replicator::new( db_path.clone(), config.channel.clone(), config.uri.clone(), name.clone(), + &mut join_set, reset, ) .await?; - let applied_frame_no_receiver = replicator.current_frame_no_notifier.subscribe(); - let mut join_set = JoinSet::new(); - join_set.spawn(replicator.run()); + + let applied_frame_no_receiver = replicator.current_frame_no_notifier.clone(); let stats = make_stats( &db_path, &mut join_set, config.stats_sender.clone(), name.clone(), - applied_frame_no_receiver.clone(), + replicator.current_frame_no_notifier.clone(), ) .await?; + join_set.spawn(replicator.run()); + let connection_maker = MakeWriteProxyConn::new( db_path.clone(), config.extensions.clone(), diff --git a/sqld/src/replication/frame.rs b/sqld/src/replication/frame.rs index 938635b3..c0f68ce2 100644 --- a/sqld/src/replication/frame.rs +++ b/sqld/src/replication/frame.rs @@ -1,10 +1,10 @@ -use std::alloc::Layout; +use std::borrow::Cow; use std::fmt; -use std::mem::size_of; -use std::ops::{Deref, DerefMut}; +use std::mem::{size_of, transmute}; +use std::ops::Deref; -use bytemuck::{bytes_of, from_bytes, Pod, Zeroable}; -use bytes::Bytes; +use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable}; +use bytes::{Bytes, BytesMut}; use crate::LIBSQL_PAGE_SIZE; @@ -28,18 +28,10 @@ pub struct FrameHeader { } #[derive(Clone, serde::Serialize, serde::Deserialize)] -/// The shared version of a replication frame. +/// The owned version of a replication frame. /// Cloning this is cheap. pub struct Frame { - inner: Bytes, -} - -impl TryFrom<&[u8]> for Frame { - type Error = anyhow::Error; - - fn try_from(data: &[u8]) -> Result { - Ok(FrameMut::try_from(data)?.into()) - } + data: Bytes, } impl fmt::Debug for Frame { @@ -51,108 +43,57 @@ impl fmt::Debug for Frame { } } -/// Owned version of a frame, on the heap -pub struct FrameMut { - inner: Box, -} - -impl TryFrom<&[u8]> for FrameMut { - type Error = anyhow::Error; - - fn try_from(data: &[u8]) -> Result { - anyhow::ensure!( - data.len() == size_of::(), - "invalid frame size" - ); - // frames are relatively large (~4ko), we want to avoid allocating them on the stack and - // then copying them to the heap, and instead copy them to the heap directly. - let inner = unsafe { - let layout = Layout::new::(); - let ptr = std::alloc::alloc(layout); - ptr.copy_from(data.as_ptr(), data.len()); - Box::from_raw(ptr as *mut FrameBorrowed) - }; - - Ok(Self { inner }) - } -} +impl Frame { + /// size of a single frame + pub const SIZE: usize = size_of::() + LIBSQL_PAGE_SIZE as usize; -impl From for Frame { - fn from(value: FrameMut) -> Self { - // transmute the FrameBorrowed into a Box<[u8; _]>. This is safe because the alignment of - // [u8] divides the alignment of FrameBorrowed - let data = unsafe { - Vec::from_raw_parts( - Box::into_raw(value.inner) as *mut u8, - size_of::(), - size_of::(), - ) - }; - - Self { - inner: Bytes::from(data), - } - } -} + pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self { + assert_eq!(data.len(), LIBSQL_PAGE_SIZE as usize); + let mut buf = BytesMut::with_capacity(Self::SIZE); + buf.extend_from_slice(bytes_of(header)); + buf.extend_from_slice(data); -impl From for FrameMut { - fn from(inner: FrameBorrowed) -> Self { - Self { - inner: Box::new(inner), - } + Self { data: buf.freeze() } } -} -impl Frame { - pub fn from_parts(header: &FrameHeader, data: &[u8]) -> Self { - FrameBorrowed::from_parts(header, data).into() + pub fn try_from_bytes(data: Bytes) -> anyhow::Result { + anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size"); + Ok(Self { data }) } pub fn bytes(&self) -> Bytes { - self.inner.clone() - } -} - -impl From for Frame { - fn from(value: FrameBorrowed) -> Self { - FrameMut::from(value).into() + self.data.clone() } } /// The borrowed version of Frame -#[repr(C)] -#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(transparent)] pub struct FrameBorrowed { - header: FrameHeader, - page: [u8; LIBSQL_PAGE_SIZE as usize], + data: [u8], } impl FrameBorrowed { - /// Returns the bytes for this frame. Includes the header bytes. - pub fn as_slice(&self) -> &[u8] { - bytes_of(self) - } - - /// returns this frame's page data. - pub fn page(&self) -> &[u8] { - &self.page + pub fn header(&self) -> Cow { + let data = &self.data[..size_of::()]; + try_from_bytes(data) + .map(Cow::Borrowed) + .unwrap_or_else(|_| Cow::Owned(pod_read_unaligned(data))) } - pub fn header(&self) -> &FrameHeader { - &self.header + /// Returns the bytes for this frame. Includes the header bytes. + pub fn as_slice(&self) -> &[u8] { + &self.data } - pub fn header_mut(&mut self) -> &mut FrameHeader { - &mut self.header + pub fn from_bytes(data: &[u8]) -> &Self { + assert_eq!(data.len(), Frame::SIZE); + // SAFETY: &FrameBorrowed is equivalent to &[u8] + unsafe { transmute(data) } } - pub fn from_parts(header: &FrameHeader, page: &[u8]) -> Self { - assert_eq!(page.len(), LIBSQL_PAGE_SIZE as usize); - - FrameBorrowed { - header: *header, - page: page.try_into().unwrap(), - } + /// returns this frame's page data. + pub fn page(&self) -> &[u8] { + &self.data[size_of::()..] } } @@ -160,20 +101,6 @@ impl Deref for Frame { type Target = FrameBorrowed; fn deref(&self) -> &Self::Target { - from_bytes(&self.inner) - } -} - -impl Deref for FrameMut { - type Target = FrameBorrowed; - - fn deref(&self) -> &Self::Target { - self.inner.as_ref() - } -} - -impl DerefMut for FrameMut { - fn deref_mut(&mut self) -> &mut Self::Target { - self.inner.as_mut() + FrameBorrowed::from_bytes(&self.data) } } diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index bcf0c6f4..b191e164 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -23,7 +23,7 @@ use crate::libsql_bindings::ffi::{ PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK, }; use crate::libsql_bindings::wal_hook::WalHook; -use crate::replication::frame::{Frame, FrameHeader, FrameMut}; +use crate::replication::frame::{Frame, FrameHeader}; use crate::replication::snapshot::{find_snapshot_file, LogCompactor, SnapshotFile}; use crate::replication::{FrameNo, SnapshotCallback, CRC_64_GO_ISO, WAL_MAGIC}; use crate::LIBSQL_PAGE_SIZE; @@ -388,14 +388,14 @@ impl LogFile { let file_end = file.metadata()?.len(); let header = if file_end == 0 { - let log_id = Uuid::new_v4(); + let db_id = Uuid::new_v4(); LogFileHeader { version: 2, start_frame_no: 0, magic: WAL_MAGIC, page_size: LIBSQL_PAGE_SIZE as i32, start_checksum: 0, - log_id: log_id.as_u128(), + db_id: db_id.as_u128(), frame_count: 0, sqld_version: Version::current().0, } @@ -467,9 +467,8 @@ impl LogFile { } /// Returns an iterator over the WAL frame headers - pub(crate) fn frames_iter( - &self, - ) -> anyhow::Result> + '_> { + #[allow(dead_code)] + fn frames_iter(&self) -> anyhow::Result> + '_> { let mut current_frame_offset = 0; Ok(std::iter::from_fn(move || { if current_frame_offset >= self.header.frame_count { @@ -477,17 +476,14 @@ impl LogFile { } let read_byte_offset = Self::absolute_byte_offset(current_frame_offset); current_frame_offset += 1; - Some( - self.read_frame_byte_offset_mut(read_byte_offset) - .map(|f| f.into()), - ) + Some(self.read_frame_byte_offset(read_byte_offset)) })) } /// Returns an iterator over the WAL frame headers - pub fn rev_frames_iter_mut( + pub fn rev_frames_iter( &self, - ) -> anyhow::Result> + '_> { + ) -> anyhow::Result> + '_> { let mut current_frame_offset = self.header.frame_count; Ok(std::iter::from_fn(move || { @@ -496,7 +492,7 @@ impl LogFile { } current_frame_offset -= 1; let read_byte_offset = Self::absolute_byte_offset(current_frame_offset); - let frame = self.read_frame_byte_offset_mut(read_byte_offset); + let frame = self.read_frame_byte_offset(read_byte_offset); Some(frame) })) } @@ -568,9 +564,9 @@ impl LogFile { return Err(LogReadError::Ahead); } - let frame = self.read_frame_byte_offset_mut(self.byte_offset(frame_no)?.unwrap())?; + let frame = self.read_frame_byte_offset(self.byte_offset(frame_no)?.unwrap())?; - Ok(frame.into()) + Ok(frame) } fn should_compact(&self) -> bool { @@ -634,11 +630,12 @@ impl LogFile { Ok(()) } - fn read_frame_byte_offset_mut(&self, offset: u64) -> anyhow::Result { + fn read_frame_byte_offset(&self, offset: u64) -> anyhow::Result { let mut buffer = BytesMut::zeroed(LogFile::FRAME_SIZE); self.file.read_exact_at(&mut buffer, offset)?; + let buffer = buffer.freeze(); - FrameMut::try_from(&*buffer) + Frame::try_from_bytes(buffer) } fn last_commited_frame_no(&self) -> Option { @@ -707,8 +704,8 @@ pub struct LogFileHeader { /// Initial checksum value for the rolling CRC checksum /// computed with the 64 bits CRC_64_GO_ISO pub start_checksum: u64, - /// Uuid of the this log. - pub log_id: u128, + /// Uuid of the database associated with this log. + pub db_id: u128, /// Frame_no of the first frame in the log pub start_frame_no: FrameNo, /// entry count in file @@ -836,11 +833,7 @@ impl ReplicationLogger { Ok(Self { generation: Generation::new(generation_start_frame_no.unwrap_or(0)), - compactor: LogCompactor::new( - &db_path, - Uuid::from_u128(log_file.header.log_id), - callback, - )?, + compactor: LogCompactor::new(&db_path, log_file.header.db_id, callback)?, log_file: RwLock::new(log_file), db_path, closed_signal, @@ -889,8 +882,8 @@ impl ReplicationLogger { Self::from_log_file(data_path, log_file, callback, auto_checkpoint) } - pub fn log_id(&self) -> Uuid { - Uuid::from_u128((self.log_file.read()).header().log_id) + pub fn database_id(&self) -> anyhow::Result { + Ok(Uuid::from_u128((self.log_file.read()).header().db_id)) } /// Write pages to the log, without updating the file header. @@ -944,7 +937,7 @@ impl ReplicationLogger { } let last_frame = { - let mut frames_iter = log_file.rev_frames_iter_mut()?; + let mut frames_iter = log_file.rev_frames_iter()?; let Some(last_frame_res) = frames_iter.next() else { // the log file is empty, nothing to compact return Ok(false); diff --git a/sqld/src/replication/replica/error.rs b/sqld/src/replication/replica/error.rs index fc5ce521..dbcf7644 100644 --- a/sqld/src/replication/replica/error.rs +++ b/sqld/src/replication/replica/error.rs @@ -1,7 +1,9 @@ #[derive(Debug, thiserror::Error)] pub enum ReplicationError { - #[error("Primary has incompatible log")] - LogIncompatible, + #[error("Replica is ahead of primary")] + Lagging, + #[error("Trying to replicate incompatible databases")] + DbIncompatible, #[error("{0}")] Other(#[from] anyhow::Error), } diff --git a/sqld/src/replication/replica/injector.rs b/sqld/src/replication/replica/injector.rs new file mode 100644 index 00000000..28bdd333 --- /dev/null +++ b/sqld/src/replication/replica/injector.rs @@ -0,0 +1,51 @@ +use std::path::Path; + +use crate::DEFAULT_AUTO_CHECKPOINT; +use rusqlite::OpenFlags; + +use crate::replication::replica::hook::{SQLITE_CONTINUE_REPLICATION, SQLITE_EXIT_REPLICATION}; + +use super::hook::{InjectorHook, InjectorHookCtx, INJECTOR_METHODS}; + +pub struct FrameInjector { + conn: sqld_libsql_bindings::Connection, +} + +impl FrameInjector { + pub fn new(db_path: &Path, hook_ctx: InjectorHookCtx) -> anyhow::Result { + let conn = sqld_libsql_bindings::Connection::open( + db_path, + OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_URI + | OpenFlags::SQLITE_OPEN_NO_MUTEX, + &INJECTOR_METHODS, + hook_ctx, + // It's ok to leave auto-checkpoint to default, since replicas don't use bottomless. + DEFAULT_AUTO_CHECKPOINT, + )?; + + Ok(Self { conn }) + } + + pub fn step(&mut self) -> anyhow::Result { + self.conn.pragma_update(None, "writable_schema", "on")?; + let res = self.conn.execute("create table __dummy__ (dummy);", ()); + + match res { + Ok(_) => panic!("replication hook was not called"), + Err(e) => { + if let Some(e) = e.sqlite_error() { + if e.extended_code == SQLITE_EXIT_REPLICATION { + self.conn.pragma_update(None, "writable_schema", "reset")?; + return Ok(false); + } + if e.extended_code == SQLITE_CONTINUE_REPLICATION { + return Ok(true); + } + } + anyhow::bail!(e); + } + } + } +} diff --git a/sqld/src/replication/replica/injector/headers.rs b/sqld/src/replication/replica/injector/headers.rs deleted file mode 100644 index 0973d65b..00000000 --- a/sqld/src/replication/replica/injector/headers.rs +++ /dev/null @@ -1,47 +0,0 @@ -use std::marker::PhantomData; - -use rusqlite::ffi::PgHdr; - -pub struct Headers<'a> { - ptr: *mut PgHdr, - _pth: PhantomData<&'a ()>, -} - -impl<'a> Headers<'a> { - // safety: ptr is guaranteed to be valid for 'a - pub(crate) unsafe fn new(ptr: *mut PgHdr) -> Self { - Self { - ptr, - _pth: PhantomData, - } - } - - pub(crate) fn as_ptr(&mut self) -> *mut PgHdr { - self.ptr - } - - pub(crate) fn all_applied(&self) -> bool { - let mut current = self.ptr; - while !current.is_null() { - unsafe { - // WAL appended - if (*current).flags & 0x040 == 0 { - return false; - } - current = (*current).pDirty; - } - } - - true - } -} - -impl Drop for Headers<'_> { - fn drop(&mut self) { - let mut current = self.ptr; - while !current.is_null() { - let h: Box = unsafe { Box::from_raw(current as _) }; - current = h.pDirty; - } - } -} diff --git a/sqld/src/replication/replica/injector/hook.rs b/sqld/src/replication/replica/injector/hook.rs deleted file mode 100644 index 273cb5bd..00000000 --- a/sqld/src/replication/replica/injector/hook.rs +++ /dev/null @@ -1,162 +0,0 @@ -use std::ffi::{c_int, CStr}; - -use rusqlite::ffi::{libsql_wal as Wal, PgHdr}; -use sqld_libsql_bindings::ffi::types::XWalFrameFn; -use sqld_libsql_bindings::init_static_wal_method; -use sqld_libsql_bindings::wal_hook::WalHook; - -use crate::replication::frame::FrameBorrowed; -use crate::LIBSQL_PAGE_SIZE; - -use super::headers::Headers; -use super::FrameBuffer; - -// Those are custom error codes returned by the replicator hook. -pub const LIBSQL_INJECT_FATAL: c_int = 200; -/// Injection succeeded, left on a open txn state -pub const LIBSQL_INJECT_OK_TXN: c_int = 201; -/// Injection succeeded -pub const LIBSQL_INJECT_OK: c_int = 202; - -pub struct InjectorHookCtx { - /// shared frame buffer - buffer: FrameBuffer, - /// currently in a txn - is_txn: bool, -} - -impl InjectorHookCtx { - pub fn new(buffer: FrameBuffer) -> Self { - Self { - buffer, - is_txn: false, - } - } - - fn inject_pages( - &mut self, - sync_flags: i32, - orig: XWalFrameFn, - wal: *mut Wal, - ) -> anyhow::Result<()> { - self.is_txn = true; - let buffer = self.buffer.lock(); - let (mut headers, size_after) = make_page_header(buffer.iter().map(|f| &**f)); - - let ret = unsafe { - orig( - wal, - LIBSQL_PAGE_SIZE as _, - headers.as_ptr(), - size_after, - (size_after != 0) as _, - sync_flags, - ) - }; - - if ret == 0 { - debug_assert!(headers.all_applied()); - if size_after != 0 { - self.is_txn = false; - } - tracing::trace!("applied frame batch"); - - Ok(()) - } else { - anyhow::bail!("failed to apply pages"); - } - } -} - -/// Turn a list of `WalFrame` into a list of PgHdr. -/// The caller has the responsibility to free the returned headers. -/// return (headers, last_frame_no, size_after) -fn make_page_header<'a>(frames: impl Iterator) -> (Headers<'a>, u32) { - let mut first_pg: *mut PgHdr = std::ptr::null_mut(); - let mut current_pg; - let mut size_after = 0; - - let mut headers_count = 0; - let mut prev_pg: *mut PgHdr = std::ptr::null_mut(); - let mut frames = frames.peekable(); - while let Some(frame) = frames.next() { - // the last frame in a batch marks the end of the txn - if frames.peek().is_none() { - size_after = frame.header().size_after; - } - - let page = PgHdr { - pPage: std::ptr::null_mut(), - pData: frame.page().as_ptr() as _, - pExtra: std::ptr::null_mut(), - pCache: std::ptr::null_mut(), - pDirty: std::ptr::null_mut(), - pPager: std::ptr::null_mut(), - pgno: frame.header().page_no, - pageHash: 0, - flags: 0x02, // PGHDR_DIRTY - it works without the flag, but why risk it - nRef: 0, - pDirtyNext: std::ptr::null_mut(), - pDirtyPrev: std::ptr::null_mut(), - }; - headers_count += 1; - current_pg = Box::into_raw(Box::new(page)); - if first_pg.is_null() { - first_pg = current_pg; - } - if !prev_pg.is_null() { - unsafe { - (*prev_pg).pDirty = current_pg; - } - } - prev_pg = current_pg; - } - - tracing::trace!("built {headers_count} page headers"); - - let headers = unsafe { Headers::new(first_pg) }; - (headers, size_after) -} - -init_static_wal_method!(INJECTOR_METHODS, InjectorHook); - -/// The injector hook hijacks a call to xframes, and replace the content of the call with it's own -/// frames. -/// The Caller must first call `set_frames`, passing the frames to be injected, then trigger a call -/// to xFrames from the libsql connection (see dummy write in `injector`), and can then collect the -/// result on the injection with `take_result` -pub enum InjectorHook {} - -unsafe impl WalHook for InjectorHook { - type Context = InjectorHookCtx; - - fn on_frames( - wal: &mut Wal, - _page_size: c_int, - _page_headers: *mut PgHdr, - _size_after: u32, - _is_commit: c_int, - sync_flags: c_int, - orig: XWalFrameFn, - ) -> c_int { - let wal_ptr = wal as *mut _; - let ctx = Self::wal_extract_ctx(wal); - let ret = ctx.inject_pages(sync_flags, orig, wal_ptr); - if let Err(e) = ret { - tracing::error!("fatal replication error: {e}"); - return LIBSQL_INJECT_FATAL; - } - - ctx.buffer.lock().clear(); - - if !ctx.is_txn { - LIBSQL_INJECT_OK - } else { - LIBSQL_INJECT_OK_TXN - } - } - - fn name() -> &'static CStr { - CStr::from_bytes_with_nul(b"frame_injector_hook\0").unwrap() - } -} diff --git a/sqld/src/replication/replica/injector/mod.rs b/sqld/src/replication/replica/injector/mod.rs deleted file mode 100644 index 0360214c..00000000 --- a/sqld/src/replication/replica/injector/mod.rs +++ /dev/null @@ -1,251 +0,0 @@ -use std::collections::VecDeque; -use std::path::Path; -use std::sync::Arc; - -use parking_lot::Mutex; -use rusqlite::OpenFlags; -use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS; - -use crate::error::Error; -use crate::replication::frame::Frame; -use crate::{replication::FrameNo, DEFAULT_AUTO_CHECKPOINT}; - -use hook::{ - InjectorHookCtx, INJECTOR_METHODS, LIBSQL_INJECT_FATAL, LIBSQL_INJECT_OK, LIBSQL_INJECT_OK_TXN, -}; - -use self::hook::InjectorHook; - -mod headers; -mod hook; - -#[derive(Debug)] -pub enum InjectError {} - -pub type FrameBuffer = Arc>>; - -pub struct Injector { - /// The injector is in a transaction state - is_txn: bool, - /// Buffer for holding current transaction frames - buffer: FrameBuffer, - /// Maximum capacity of the frame buffer - capacity: usize, - /// Injector connection - // connection must be dropped before the hook context - connection: Arc>>, -} - -/// Methods from this trait are called before and after performing a frame injection. -/// This trait trait is used to record the last committed frame_no to the log. -/// The implementer can persist the pre and post commit frame no, and compare them in the event of -/// a crash; if the pre and post commit frame_no don't match, then the log may be corrupted. -impl Injector { - pub fn new(path: &Path, buffer_capacity: usize) -> crate::Result { - let buffer = FrameBuffer::default(); - let ctx = InjectorHookCtx::new(buffer.clone()); - std::fs::create_dir_all(path)?; - - { - // create the replication table if it doesn't exist. We need to do that without hooks. - let connection = sqld_libsql_bindings::Connection::open( - path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, - &TRANSPARENT_METHODS, - // safety: hook is dropped after connection - (), - u32::MAX, - )?; - - connection.execute("CREATE TABLE IF NOT EXISTS libsql_temp_injection (x)", ())?; - } - - let connection = sqld_libsql_bindings::Connection::open( - path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_NO_MUTEX, - &INJECTOR_METHODS, - // safety: hook is dropped after connection - ctx, - DEFAULT_AUTO_CHECKPOINT, - )?; - - connection.execute("CREATE TABLE IF NOT EXISTS libsql_temp_injection (x)", ())?; - - Ok(Self { - is_txn: false, - buffer, - capacity: buffer_capacity, - connection: Arc::new(Mutex::new(connection)), - }) - } - - /// Inject on frame into the log. If this was a commit frame, returns Ok(Some(FrameNo)). - pub(crate) fn inject_frame(&mut self, frame: Frame) -> crate::Result> { - let frame_close_txn = frame.header().size_after != 0; - self.buffer.lock().push_back(frame); - if frame_close_txn || self.buffer.lock().len() >= self.capacity { - if !self.is_txn { - self.begin_txn()?; - } - return self.flush(); - } - - Ok(None) - } - - /// Flush the buffer to libsql WAL. - /// Trigger a dummy write, and flush the cache to trigger a call to xFrame. The buffer's frame - /// are then injected into the wal. - fn flush(&mut self) -> crate::Result> { - let lock = self.buffer.lock(); - // the frames in the buffer are either monotonically increasing (log) or decreasing - // (snapshot). Either way, we want to find the biggest frameno we're about to commit, and - // that is either the front or the back of the buffer - let last_frame_no = match lock.back().zip(lock.front()) { - Some((b, f)) => f.header().frame_no.max(b.header().frame_no), - None => { - tracing::trace!("nothing to inject"); - return Ok(None); - } - }; - - drop(lock); - - let connection = self.connection.lock(); - // use prepare cached to avoid parsing the same statement over and over again. - let mut stmt = - connection.prepare_cached("INSERT INTO libsql_temp_injection VALUES (42)")?; - - // We execute the statement, and then force a call to xframe if necesacary. If the execute - // succeeds, then xframe wasn't called, in this case, we call cache_flush, and then process - // the error. - // It is unexpected that execute flushes, but it is possible, so we handle that case. - match stmt.execute(()).and_then(|_| connection.cache_flush()) { - Ok(_) => panic!("replication hook was not called"), - Err(e) => { - if let Some(e) = e.sqlite_error() { - if e.extended_code == LIBSQL_INJECT_OK { - // refresh schema - connection.pragma_update(None, "writable_schema", "reset")?; - if let Err(e) = connection.execute("COMMIT", ()) { - if !matches!(e.sqlite_error(), Some(rusqlite::ffi::Error{ extended_code, .. }) if *extended_code == 201) - { - tracing::error!("injector failed to commit: {e}"); - return Err(Error::FatalReplicationError); - } - } - self.is_txn = false; - assert!(self.buffer.lock().is_empty()); - return Ok(Some(last_frame_no)); - } else if e.extended_code == LIBSQL_INJECT_OK_TXN { - self.is_txn = true; - assert!(self.buffer.lock().is_empty()); - return Ok(None); - } else if e.extended_code == LIBSQL_INJECT_FATAL { - return Err(Error::FatalReplicationError); - } - } - - Err(Error::FatalReplicationError) - } - } - } - - fn begin_txn(&mut self) -> crate::Result<()> { - let conn = self.connection.lock(); - conn.execute("BEGIN IMMEDIATE", ())?; - Ok(()) - } - - pub fn clear_buffer(&mut self) { - self.buffer.lock().clear(); - } -} - -#[cfg(test)] -mod test { - use crate::replication::primary::logger::LogFile; - - use super::Injector; - - #[test] - fn test_simple_inject_frames() { - let log_file = std::fs::File::open("assets/test/simple_wallog").unwrap(); - let log = LogFile::new(log_file, 10000, None).unwrap(); - let temp = tempfile::tempdir().unwrap(); - - let mut injector = Injector::new(temp.path(), 10).unwrap(); - for frame in log.frames_iter().unwrap() { - let frame = frame.unwrap(); - injector.inject_frame(frame).unwrap(); - } - - let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap(); - - conn.query_row("SELECT COUNT(*) FROM test", (), |row| { - assert_eq!(row.get::<_, usize>(0).unwrap(), 5); - Ok(()) - }) - .unwrap(); - } - - #[test] - fn test_inject_frames_split_txn() { - let log_file = std::fs::File::open("assets/test/simple_wallog").unwrap(); - let log = LogFile::new(log_file, 10000, None).unwrap(); - let temp = tempfile::tempdir().unwrap(); - - // inject one frame at a time - let mut injector = Injector::new(temp.path(), 1).unwrap(); - for frame in log.frames_iter().unwrap() { - let frame = frame.unwrap(); - injector.inject_frame(frame).unwrap(); - } - - let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap(); - - conn.query_row("SELECT COUNT(*) FROM test", (), |row| { - assert_eq!(row.get::<_, usize>(0).unwrap(), 5); - Ok(()) - }) - .unwrap(); - } - - #[test] - fn test_inject_partial_txn_isolated() { - let log_file = std::fs::File::open("assets/test/simple_wallog").unwrap(); - let log = LogFile::new(log_file, 10000, None).unwrap(); - let temp = tempfile::tempdir().unwrap(); - - // inject one frame at a time - let mut injector = Injector::new(temp.path(), 10).unwrap(); - let mut iter = log.frames_iter().unwrap(); - - assert!(injector - .inject_frame(iter.next().unwrap().unwrap()) - .unwrap() - .is_none()); - let conn = rusqlite::Connection::open(temp.path().join("data")).unwrap(); - assert!(conn - .query_row("SELECT COUNT(*) FROM test", (), |_| Ok(())) - .is_err()); - - while injector - .inject_frame(iter.next().unwrap().unwrap()) - .unwrap() - .is_none() - {} - - // reset schema - conn.pragma_update(None, "writable_schema", "reset") - .unwrap(); - conn.query_row("SELECT COUNT(*) FROM test", (), |_| Ok(())) - .unwrap(); - } -} diff --git a/sqld/src/replication/replica/meta.rs b/sqld/src/replication/replica/meta.rs index 71dfb1bc..b38534e1 100644 --- a/sqld/src/replication/replica/meta.rs +++ b/sqld/src/replication/replica/meta.rs @@ -1,13 +1,12 @@ -use std::io::{ErrorKind, SeekFrom}; +use std::fs::{File, OpenOptions}; +use std::io::ErrorKind; use std::mem::size_of; +use std::os::unix::prelude::FileExt; use std::path::Path; use std::str::FromStr; use anyhow::Context; -use bytemuck::{bytes_of, try_pod_read_unaligned, Pod, Zeroable}; -use tokio::fs::File; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; -use tokio::pin; +use bytemuck::{try_pod_read_unaligned, Pod, Zeroable}; use uuid::Uuid; use crate::{replication::FrameNo, rpc::replication_log::rpc::HelloResponse}; @@ -16,118 +15,91 @@ use super::error::ReplicationError; #[repr(C)] #[derive(Debug, Pod, Zeroable, Clone, Copy)] -pub struct WalIndexMetaData { - /// id of the replicated log - log_id: u128, - /// committed frame index - pub committed_frame_no: FrameNo, - _padding: u64, -} - -impl WalIndexMetaData { - async fn read(file: impl AsyncRead) -> crate::Result> { - pin!(file); - let mut buf = [0; size_of::()]; - let meta = match file.read_exact(&mut buf).await { - Ok(_) => { - let meta: Self = try_pod_read_unaligned(&buf) - .map_err(|_| anyhow::anyhow!("invalid index meta file"))?; - Some(meta) - } - Err(e) if e.kind() == ErrorKind::UnexpectedEof => None, - Err(e) => Err(e)?, - }; - - Ok(meta) - } -} - pub struct WalIndexMeta { - file: File, - data: Option, + /// This is the anticipated next frame_no to request + pub pre_commit_frame_no: FrameNo, + /// After we have written the frames back to the wal, we set this value to the same value as + /// pre_commit_index + /// On startup we check this value against the pre-commit value to check for consistency + pub post_commit_frame_no: FrameNo, + /// Generation Uuid + /// This number is generated on each primary restart. This let's us know that the primary, and + /// we need to make sure that we are not ahead of the primary. + generation_id: u128, + /// Uuid of the database this instance is a replica of + database_id: u128, } impl WalIndexMeta { - pub async fn open(db_path: &Path) -> crate::Result { + pub fn open(db_path: &Path) -> crate::Result { let path = db_path.join("client_wal_index"); + std::fs::create_dir_all(db_path)?; - tokio::fs::create_dir_all(db_path).await?; - - let mut file = tokio::fs::OpenOptions::new() + Ok(OpenOptions::new() .create(true) .read(true) .write(true) - .open(&path) - .await?; + .open(path)?) + } - let data = WalIndexMetaData::read(&mut file).await?; + pub fn read_from_path(db_path: &Path) -> anyhow::Result> { + let file = Self::open(db_path)?; + Ok(Self::read(&file)?) + } - Ok(Self { file, data }) + fn read(file: &File) -> crate::Result> { + let mut buf = [0; size_of::()]; + let meta = match file.read_exact_at(&mut buf, 0) { + Ok(()) => { + file.read_exact_at(&mut buf, 0)?; + let meta: Self = try_pod_read_unaligned(&buf) + .map_err(|_| anyhow::anyhow!("invalid index meta file"))?; + Some(meta) + } + Err(e) if e.kind() == ErrorKind::UnexpectedEof => None, + Err(e) => Err(e)?, + }; + + Ok(meta) } /// attempts to merge two meta files. - pub fn merge_hello(&mut self, hello: HelloResponse) -> Result<(), ReplicationError> { - let hello_log_id = Uuid::from_str(&hello.log_id) + pub fn merge_from_hello(mut self, hello: HelloResponse) -> Result { + let hello_db_id = Uuid::from_str(&hello.database_id) .context("invalid database id from primary")? .as_u128(); + let hello_gen_id = Uuid::from_str(&hello.generation_id) + .context("invalid generation id from primary")? + .as_u128(); - match self.data { - Some(meta) => { - if meta.log_id != hello_log_id { - Err(ReplicationError::LogIncompatible) - } else { - Ok(()) - } - } - None => { - self.data = Some(WalIndexMetaData { - log_id: hello_log_id, - committed_frame_no: FrameNo::MAX, - _padding: 0, - }); - Ok(()) - } + if hello_db_id != self.database_id { + return Err(ReplicationError::DbIncompatible); } - } - pub async fn flush(&mut self) -> crate::Result<()> { - if let Some(data) = self.data { - // FIXME: we can save a syscall by calling read_exact_at, but let's use tokio API for now - self.file.seek(SeekFrom::Start(0)).await?; - let s = self.file.write(bytes_of(&data)).await?; - // WalIndexMeta is smaller than a page size, and aligned at the beginning of the file, if - // should always be written in a single call - assert_eq!(s, size_of::()); - self.file.flush().await?; + if self.generation_id == hello_gen_id { + Ok(self) + } else if self.pre_commit_frame_no <= hello.generation_start_index { + // Ok: generation changed, but we aren't ahead of primary + self.generation_id = hello_gen_id; + Ok(self) + } else { + Err(ReplicationError::Lagging) } - - Ok(()) } - /// Apply the last commit frame no to the meta file. - /// This function must be called after each injection, because it's idempotent to re-apply the - /// last transaction, but not idempotent if we lose track of more than one. - pub async fn set_commit_frame_no(&mut self, commit_fno: FrameNo) -> crate::Result<()> { - { - let data = self - .data - .as_mut() - .expect("call set_commit_frame_no before initializing meta"); - data.committed_frame_no = commit_fno; - } - - self.flush().await?; - - Ok(()) - } + pub fn new_from_hello(hello: HelloResponse) -> anyhow::Result { + let database_id = Uuid::from_str(&hello.database_id) + .context("invalid database id from primary")? + .as_u128(); + let generation_id = Uuid::from_str(&hello.generation_id) + .context("invalid generation id from primary")? + .as_u128(); - pub(crate) fn current_frame_no(&self) -> Option { - self.data.and_then(|d| { - if d.committed_frame_no == FrameNo::MAX { - None - } else { - Some(d.committed_frame_no) - } + Ok(Self { + pre_commit_frame_no: FrameNo::MAX, + post_commit_frame_no: FrameNo::MAX, + generation_id, + database_id, }) } } diff --git a/sqld/src/replication/replica/mod.rs b/sqld/src/replication/replica/mod.rs index 3fc946b7..a6e7e63c 100644 --- a/sqld/src/replication/replica/mod.rs +++ b/sqld/src/replication/replica/mod.rs @@ -1,6 +1,8 @@ pub mod error; +mod hook; mod injector; mod meta; mod replicator; +mod snapshot; pub use replicator::Replicator; diff --git a/sqld/src/replication/replica/replicator.rs b/sqld/src/replication/replica/replicator.rs index b741718f..4a16c5d3 100644 --- a/sqld/src/replication/replica/replicator.rs +++ b/sqld/src/replication/replica/replicator.rs @@ -1,10 +1,11 @@ +use std::os::unix::prelude::FileExt; use std::path::PathBuf; use std::sync::Arc; +use bytemuck::bytes_of; use futures::StreamExt; -use parking_lot::Mutex; -use tokio::sync::watch; -use tokio::task::spawn_blocking; +use tokio::sync::{mpsc, oneshot, watch, Mutex}; +use tokio::task::JoinSet; use tokio::time::Duration; use tonic::metadata::BinaryMetadataValue; use tonic::transport::Channel; @@ -13,14 +14,17 @@ use tonic::{Code, Request}; use crate::namespace::{NamespaceName, ResetCb, ResetOp}; use crate::replication::frame::Frame; use crate::replication::replica::error::ReplicationError; +use crate::replication::replica::snapshot::TempSnapshot; use crate::replication::FrameNo; use crate::rpc::replication_log::rpc::{ replication_log_client::ReplicationLogClient, HelloRequest, LogOffset, }; use crate::rpc::replication_log::NEED_SNAPSHOT_ERROR_MSG; use crate::rpc::{NAMESPACE_DOESNT_EXIST, NAMESPACE_METADATA_KEY}; +use crate::BLOCKING_RT; -use super::injector::Injector; +use super::hook::{Frames, InjectorHookCtx}; +use super::injector::FrameInjector; use super::meta::WalIndexMeta; const HANDSHAKE_MAX_RETRIES: usize = 100; @@ -31,42 +35,97 @@ type Client = ReplicationLogClient; /// transaction boundaries. pub struct Replicator { client: Client, + db_path: PathBuf, namespace: NamespaceName, - meta: WalIndexMeta, - injector: Arc>, - pub current_frame_no_notifier: watch::Sender>, + meta: Arc>>, + pub current_frame_no_notifier: watch::Receiver>, + frames_sender: mpsc::Sender, + /// hard reset channel: send the namespace there, to reset it reset: ResetCb, } -const INJECTOR_BUFFER_CAPACITY: usize = 10; - impl Replicator { pub async fn new( db_path: PathBuf, channel: Channel, uri: tonic::transport::Uri, namespace: NamespaceName, + join_set: &mut JoinSet>, reset: ResetCb, ) -> anyhow::Result { - let (current_frame_no_notifier, _) = watch::channel(None); - let injector = { - let db_path = db_path.clone(); - spawn_blocking(move || Injector::new(&db_path, INJECTOR_BUFFER_CAPACITY)).await?? - }; let client = Client::with_origin(channel, uri); - let meta = WalIndexMeta::open(&db_path).await?; + let (applied_frame_notifier, current_frame_no_notifier) = watch::channel(None); + let (frames_sender, receiver) = tokio::sync::mpsc::channel(1); let mut this = Self { namespace, client, + db_path: db_path.clone(), current_frame_no_notifier, - meta, - injector: Arc::new(Mutex::new(injector)), + meta: Arc::new(Mutex::new(None)), + frames_sender, reset, }; this.try_perform_handshake().await?; + let meta_file = Arc::new(WalIndexMeta::open(&db_path)?); + let meta = this.meta.clone(); + + let pre_commit = { + let meta = meta.clone(); + let meta_file = meta_file.clone(); + move |fno| { + let mut lock = meta.blocking_lock(); + let meta = lock + .as_mut() + .expect("commit called before meta inialization"); + meta.pre_commit_frame_no = fno; + meta_file.write_all_at(bytes_of(meta), 0)?; + + Ok(()) + } + }; + + let post_commit = { + let meta = meta.clone(); + let meta_file = meta_file; + let notifier = applied_frame_notifier; + move |fno| { + let mut lock = meta.blocking_lock(); + let meta = lock + .as_mut() + .expect("commit called before meta inialization"); + assert_eq!(meta.pre_commit_frame_no, fno); + meta.post_commit_frame_no = fno; + meta_file.write_all_at(bytes_of(meta), 0)?; + let _ = notifier.send(Some(fno)); + + Ok(()) + } + }; + + let (snd, rcv) = oneshot::channel(); + let handle = BLOCKING_RT.spawn_blocking({ + move || -> anyhow::Result<()> { + let ctx = InjectorHookCtx::new(receiver, pre_commit, post_commit); + let mut injector = FrameInjector::new(&db_path, ctx)?; + let _ = snd.send(()); + + while injector.step()? {} + + Ok(()) + } + }); + + join_set.spawn(async move { + handle.await??; + Ok(()) + }); + + // injector is ready: + rcv.await?; + Ok(this) } @@ -84,13 +143,10 @@ impl Replicator { loop { self.try_perform_handshake().await?; - loop { - if let Err(e) = self.replicate().await { - // Replication encountered an error. We log the error, and then shut down the - // injector and propagate a potential panic from there. - tracing::warn!("replication error: {e}"); - break; - } + if let Err(e) = self.replicate().await { + // Replication encountered an error. We log the error, and then shut down the + // injector and propagate a potential panic from there. + tracing::warn!("replication error: {e}"); } tokio::time::sleep(Duration::from_secs(1)).await; } @@ -98,13 +154,20 @@ impl Replicator { async fn handle_replication_error(&self, error: ReplicationError) -> crate::error::Error { match error { - ReplicationError::LogIncompatible => { - tracing::error!("Primary's replication log incompatible with ours: repairing."); - (self.reset)(ResetOp::Reset(self.namespace.clone())); - error.into() + ReplicationError::Lagging => { + tracing::error!("Replica ahead of primary: hard-reseting replica"); } - _ => error.into(), + ReplicationError::DbIncompatible => { + tracing::error!( + "Primary is attempting to replicate a different database, overwriting replica." + ); + } + _ => return error.into(), } + + (self.reset)(ResetOp::Reset(self.namespace.clone())); + + error.into() } async fn try_perform_handshake(&mut self) -> crate::Result<()> { @@ -116,12 +179,22 @@ impl Replicator { Ok(resp) => { let hello = resp.into_inner(); - if let Err(e) = self.meta.merge_hello(hello) { - return Err(self.handle_replication_error(e).await); - } - - self.current_frame_no_notifier - .send_replace(self.meta.current_frame_no()); + let mut lock = self.meta.lock().await; + let meta = match *lock { + Some(meta) => match meta.merge_from_hello(hello) { + Ok(meta) => meta, + Err(e) => return Err(self.handle_replication_error(e).await), + }, + None => match WalIndexMeta::read_from_path(&self.db_path)? { + Some(meta) => match meta.merge_from_hello(hello) { + Ok(meta) => meta, + Err(e) => return Err(self.handle_replication_error(e).await), + }, + None => WalIndexMeta::new_from_hello(hello)?, + }, + }; + + *lock = Some(meta); return Ok(()); } @@ -149,7 +222,9 @@ impl Replicator { } async fn replicate(&mut self) -> anyhow::Result<()> { + const MAX_REPLICA_REPLICATION_BUFFER_LEN: usize = 10_000_000 / 4096; // ~10MB let offset = LogOffset { + // if current == FrameNo::Max then it means that we're starting fresh next_offset: self.next_offset(), }; @@ -157,11 +232,20 @@ impl Replicator { let mut stream = self.client.log_entries(req).await?.into_inner(); + let mut buffer = Vec::new(); loop { match stream.next().await { Some(Ok(frame)) => { - let frame = Frame::try_from(&*frame.data)?; - self.inject_frame(frame).await?; + let frame = Frame::try_from_bytes(frame.data)?; + buffer.push(frame.clone()); + if frame.header().size_after != 0 + || buffer.len() > MAX_REPLICA_REPLICATION_BUFFER_LEN + { + let _ = self + .frames_sender + .send(Frames::Vec(std::mem::take(&mut buffer))) + .await; + } } Some(Err(err)) if err.code() == tonic::Code::FailedPrecondition @@ -170,6 +254,7 @@ impl Replicator { tracing::debug!("loading snapshot"); // remove any outstanding frames in the buffer that are not part of a // transaction: they are now part of the snapshot. + buffer.clear(); self.load_snapshot().await?; } Some(Err(e)) => return Err(e.into()), @@ -179,48 +264,19 @@ impl Replicator { } async fn load_snapshot(&mut self) -> anyhow::Result<()> { - self.injector.lock().clear_buffer(); let next_offset = self.next_offset(); let req = self.make_request(LogOffset { next_offset }); - // FIXME: check for unavailable snapshot and try again, or make primary wait for snapshot - // to become available let frames = self.client.snapshot(req).await?.into_inner(); - let mut stream = frames.map(|data| match data { - Ok(frame) => Frame::try_from(&*frame.data), + let stream = frames.map(|data| match data { + Ok(frame) => Frame::try_from_bytes(frame.data), Err(e) => anyhow::bail!(e), }); + let snap = TempSnapshot::from_stream(&self.db_path, stream).await?; - while let Some(frame) = stream.next().await { - let frame = frame?; - self.inject_frame(frame).await?; - } - - Ok(()) - } - - async fn inject_frame(&mut self, frame: Frame) -> anyhow::Result<()> { - let injector = self.injector.clone(); - match spawn_blocking(move || injector.lock().inject_frame(frame)).await? { - Ok(Some(commit_fno)) => { - self.meta.set_commit_frame_no(commit_fno).await?; - self.current_frame_no_notifier - .send_replace(Some(commit_fno)); - } - Ok(None) => (), - Err(e @ crate::Error::FatalReplicationError) => { - // we conservatively nuke the replica and start replicating from scractch - tracing::error!( - "fatal error replicating `{}` from primary, resetting namespace...", - self.namespace - ); - (self.reset)(ResetOp::Destroy(self.namespace.clone())); - Err(e)? - } - Err(e) => Err(e)?, - } + let _ = self.frames_sender.send(Frames::Snapshot(snap)).await; Ok(()) } @@ -230,6 +286,6 @@ impl Replicator { } fn current_frame_no(&mut self) -> Option { - self.meta.current_frame_no() + *self.current_frame_no_notifier.borrow_and_update() } } diff --git a/sqld/src/replication/replica/snapshot.rs b/sqld/src/replication/replica/snapshot.rs new file mode 100644 index 00000000..523f55e7 --- /dev/null +++ b/sqld/src/replication/replica/snapshot.rs @@ -0,0 +1,50 @@ +use std::path::{Path, PathBuf}; + +use futures::{Stream, StreamExt}; +use tempfile::NamedTempFile; +use tokio::io::{AsyncWriteExt, BufWriter}; + +use crate::replication::frame::{Frame, FrameBorrowed}; + +#[derive(Debug)] +pub struct TempSnapshot { + path: PathBuf, + map: memmap::Mmap, +} + +impl TempSnapshot { + pub async fn from_stream( + db_path: &Path, + mut s: impl Stream> + Unpin, + ) -> anyhow::Result { + let temp_dir = db_path.join("temp"); + tokio::fs::create_dir_all(&temp_dir).await?; + let file = NamedTempFile::new_in(temp_dir)?; + let tokio_file = tokio::fs::File::from_std(file.as_file().try_clone()?); + + let mut tokio_file = BufWriter::new(tokio_file); + while let Some(frame) = s.next().await { + let frame = frame?; + tokio_file.write_all(frame.as_slice()).await?; + } + + tokio_file.flush().await?; + + let (file, path) = file.keep()?; + + let map = unsafe { memmap::Mmap::map(&file)? }; + + Ok(Self { path, map }) + } + + pub fn iter(&self) -> impl Iterator { + self.map.chunks(Frame::SIZE).map(FrameBorrowed::from_bytes) + } +} + +impl Drop for TempSnapshot { + fn drop(&mut self) { + let path = std::mem::take(&mut self.path); + let _ = std::fs::remove_file(path); + } +} diff --git a/sqld/src/replication/snapshot.rs b/sqld/src/replication/snapshot.rs index a9e2fd35..adc42a67 100644 --- a/sqld/src/replication/snapshot.rs +++ b/sqld/src/replication/snapshot.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use crate::namespace::NamespaceName; -use super::frame::FrameMut; +use super::frame::Frame; use super::primary::logger::LogFile; use super::FrameNo; @@ -35,7 +35,7 @@ const MAX_SNAPSHOT_NUMBER: usize = 32; #[repr(C)] pub struct SnapshotFileHeader { /// id of the database - pub log_id: u128, + pub db_id: u128, /// first frame in the snapshot pub start_frame_no: u64, /// end frame in the snapshot @@ -134,7 +134,7 @@ impl SnapshotFile { } /// Iterator on the frames contained in the snapshot file, in reverse frame_no order. - pub fn frames_iter_mut(&self) -> impl Iterator> + '_ { + pub fn frames_iter(&self) -> impl Iterator> + '_ { let mut current_offset = 0; std::iter::from_fn(move || { if current_offset >= self.header.frame_count { @@ -145,7 +145,7 @@ impl SnapshotFile { current_offset += 1; let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE); match self.file.read_exact_at(&mut buf, read_offset as _) { - Ok(_) => match FrameMut::try_from(&*buf) { + Ok(_) => match Frame::try_from_bytes(buf.freeze()) { Ok(frame) => Some(Ok(frame)), Err(e) => Some(Err(e)), }, @@ -154,14 +154,12 @@ impl SnapshotFile { }) } - /// Like `frames_iter`, but stops as soon as a frame with frame_no <= `frame_no` is reached. - /// The frames are returned in monotonically strictly decreasing frame_no. In other words, the - /// most recent frames come first. + /// Like `frames_iter`, but stops as soon as a frame with frame_no <= `frame_no` is reached pub fn frames_iter_from( &self, frame_no: u64, - ) -> impl Iterator> + '_ { - let mut iter = self.frames_iter_mut(); + ) -> impl Iterator> + '_ { + let mut iter = self.frames_iter(); std::iter::from_fn(move || match iter.next() { Some(Ok(frame)) => { if frame.header().frame_no < frame_no { @@ -173,10 +171,6 @@ impl SnapshotFile { other => other, }) } - - pub fn header(&self) -> &SnapshotFileHeader { - &self.header - } } #[derive(Clone, Debug)] @@ -189,19 +183,18 @@ pub type NamespacedSnapshotCallback = Arc anyhow::Result<()> + Send + Sync>; impl LogCompactor { - pub fn new(db_path: &Path, log_id: Uuid, callback: SnapshotCallback) -> anyhow::Result { + pub fn new(db_path: &Path, db_id: u128, callback: SnapshotCallback) -> anyhow::Result { // we create a 0 sized channel, in order to create backpressure when we can't // keep up with snapshop creation: if there isn't any ongoind comptaction task processing, // the compact does not block, and the log is compacted in the background. Otherwise, the // block until there is a free slot to perform compaction. let (sender, receiver) = bounded::<(LogFile, PathBuf, u32)>(0); - let mut merger = SnapshotMerger::new(db_path, log_id)?; + let mut merger = SnapshotMerger::new(db_path, db_id)?; let db_path = db_path.to_path_buf(); let snapshot_dir_path = snapshot_dir_path(&db_path); - // FIXME: use tokio task for compaction let _handle = std::thread::spawn(move || { while let Ok((file, log_path, size_after)) = receiver.recv() { - match perform_compaction(&db_path, file, log_id) { + match perform_compaction(&db_path, file, db_id) { Ok((snapshot_name, snapshot_frame_count)) => { tracing::info!("snapshot `{snapshot_name}` successfully created"); @@ -259,12 +252,12 @@ struct SnapshotMerger { } impl SnapshotMerger { - fn new(db_path: &Path, log_id: Uuid) -> anyhow::Result { + fn new(db_path: &Path, db_id: u128) -> anyhow::Result { let (sender, receiver) = mpsc::channel(); let db_path = db_path.to_path_buf(); let handle = - std::thread::spawn(move || Self::run_snapshot_merger_loop(receiver, &db_path, log_id)); + std::thread::spawn(move || Self::run_snapshot_merger_loop(receiver, &db_path, db_id)); Ok(Self { sender, @@ -281,13 +274,13 @@ impl SnapshotMerger { fn run_snapshot_merger_loop( receiver: mpsc::Receiver<(String, u64, u32)>, db_path: &Path, - log_id: Uuid, + db_id: u128, ) -> anyhow::Result<()> { let mut snapshots = Self::init_snapshot_info_list(db_path)?; while let Ok((name, size, db_page_count)) = receiver.recv() { snapshots.push((name, size)); if Self::should_compact(&snapshots, db_page_count) { - let compacted_snapshot_info = Self::merge_snapshots(&snapshots, db_path, log_id)?; + let compacted_snapshot_info = Self::merge_snapshots(&snapshots, db_path, db_id)?; snapshots.clear(); snapshots.push(compacted_snapshot_info); } @@ -330,18 +323,13 @@ impl SnapshotMerger { fn merge_snapshots( snapshots: &[(String, u64)], db_path: &Path, - log_id: Uuid, + db_id: u128, ) -> anyhow::Result<(String, u64)> { - let mut builder = SnapshotBuilder::new(db_path, log_id)?; + let mut builder = SnapshotBuilder::new(db_path, db_id)?; let snapshot_dir_path = snapshot_dir_path(db_path); - let mut size_after = None; for (name, _) in snapshots.iter().rev() { let snapshot = SnapshotFile::open(&snapshot_dir_path.join(name))?; - // The size after the merged snapshot is the size after the first snapshot to be merged - if size_after.is_none() { - size_after.replace(snapshot.header.size_after); - } - let iter = snapshot.frames_iter_mut(); + let iter = snapshot.frames_iter(); builder.append_frames(iter)?; } @@ -350,7 +338,6 @@ impl SnapshotMerger { builder.header.start_frame_no = start_frame_no; builder.header.end_frame_no = end_frame_no; - builder.header.size_after = size_after.unwrap(); let compacted_snapshot_infos = builder.finish()?; @@ -399,7 +386,7 @@ fn snapshot_dir_path(db_path: &Path) -> PathBuf { } impl SnapshotBuilder { - fn new(db_path: &Path, log_id: Uuid) -> anyhow::Result { + fn new(db_path: &Path, db_id: u128) -> anyhow::Result { let snapshot_dir_path = snapshot_dir_path(db_path); std::fs::create_dir_all(&snapshot_dir_path)?; let mut target = BufWriter::new(NamedTempFile::new_in(&snapshot_dir_path)?); @@ -409,7 +396,7 @@ impl SnapshotBuilder { Ok(Self { seen_pages: HashSet::new(), header: SnapshotFileHeader { - log_id: log_id.as_u128(), + db_id, start_frame_no: u64::MAX, end_frame_no: u64::MIN, frame_count: 0, @@ -425,7 +412,7 @@ impl SnapshotBuilder { /// append frames to the snapshot. Frames must be in decreasing frame_no order. fn append_frames( &mut self, - frames: impl Iterator>, + frames: impl Iterator>, ) -> anyhow::Result<()> { // We iterate on the frames starting from the end of the log and working our way backward. We // make sure that only the most recent version of each file is present in the resulting @@ -434,7 +421,7 @@ impl SnapshotBuilder { // The snapshot file contains the most recent version of each page, in descending frame // number order. That last part is important for when we read it later on. for frame in frames { - let mut frame = frame?; + let frame = frame?; assert!(frame.header().frame_no < self.last_seen_frame_no); self.last_seen_frame_no = frame.header().frame_no; if frame.header().frame_no < self.header.start_frame_no { @@ -446,11 +433,6 @@ impl SnapshotBuilder { self.header.size_after = frame.header().size_after; } - // set all frames as non-commit frame in a snapshot, and let the client decide when to - // commit. This is ok because the client will stream frames backward until caught up, - // and then commit. - frame.header_mut().size_after = 0; - if !self.seen_pages.contains(&frame.header().page_no) { self.seen_pages.insert(frame.header().page_no); self.snapshot_file.write_all(frame.as_slice())?; @@ -468,7 +450,7 @@ impl SnapshotBuilder { file.as_file().write_all_at(bytes_of(&self.header), 0)?; let snapshot_name = format!( "{}-{}-{}.snap", - Uuid::from_u128(self.header.log_id), + Uuid::from_u128(self.header.db_id), self.header.start_frame_no, self.header.end_frame_no, ); @@ -482,10 +464,10 @@ impl SnapshotBuilder { fn perform_compaction( db_path: &Path, file_to_compact: LogFile, - log_id: Uuid, + db_id: u128, ) -> anyhow::Result<(String, u64)> { - let mut builder = SnapshotBuilder::new(db_path, log_id)?; - builder.append_frames(file_to_compact.rev_frames_iter_mut()?)?; + let mut builder = SnapshotBuilder::new(db_path, db_id)?; + builder.append_frames(file_to_compact.rev_frames_iter()?)?; builder.finish() } @@ -498,7 +480,6 @@ mod test { use bytes::Bytes; use tempfile::tempdir; - use crate::replication::frame::Frame; use crate::replication::primary::logger::WalPage; use crate::replication::snapshot::SnapshotFile; @@ -508,8 +489,8 @@ mod test { fn compact_file_create_snapshot() { let temp = tempfile::NamedTempFile::new().unwrap(); let mut log_file = LogFile::new(temp.as_file().try_clone().unwrap(), 0, None).unwrap(); - let log_id = Uuid::new_v4(); - log_file.header.log_id = log_id.as_u128(); + let db_id = Uuid::new_v4(); + log_file.header.db_id = db_id.as_u128(); log_file.write_header().unwrap(); // add 50 pages, each one in two versions @@ -528,7 +509,8 @@ mod test { log_file.commit().unwrap(); let dump_dir = tempdir().unwrap(); - let compactor = LogCompactor::new(dump_dir.path(), log_id, Box::new(|_| Ok(()))).unwrap(); + let compactor = + LogCompactor::new(dump_dir.path(), db_id.as_u128(), Box::new(|_| Ok(()))).unwrap(); compactor .compact(log_file, temp.path().to_path_buf(), 25) .unwrap(); @@ -536,7 +518,7 @@ mod test { thread::sleep(Duration::from_secs(1)); let snapshot_path = - snapshot_dir_path(dump_dir.path()).join(format!("{}-{}-{}.snap", log_id, 0, 49)); + snapshot_dir_path(dump_dir.path()).join(format!("{}-{}-{}.snap", db_id, 0, 49)); let snapshot = read(&snapshot_path).unwrap(); let header: SnapshotFileHeader = pod_read_unaligned(&snapshot[..std::mem::size_of::()]); @@ -544,14 +526,14 @@ mod test { assert_eq!(header.start_frame_no, 0); assert_eq!(header.end_frame_no, 49); assert_eq!(header.frame_count, 25); - assert_eq!(header.log_id, log_id.as_u128()); + assert_eq!(header.db_id, db_id.as_u128()); assert_eq!(header.size_after, 25); let mut seen_frames = HashSet::new(); let mut seen_page_no = HashSet::new(); let data = &snapshot[std::mem::size_of::()..]; data.chunks(LogFile::FRAME_SIZE).for_each(|f| { - let frame = Frame::try_from(f).unwrap(); + let frame = Frame::try_from_bytes(Bytes::copy_from_slice(f)).unwrap(); assert!(!seen_frames.contains(&frame.header().frame_no)); assert!(!seen_page_no.contains(&frame.header().page_no)); seen_page_no.insert(frame.header().page_no); diff --git a/sqld/src/rpc/replication_log.rs b/sqld/src/rpc/replication_log.rs index 79b7c4f8..c82ede3f 100644 --- a/sqld/src/rpc/replication_log.rs +++ b/sqld/src/rpc/replication_log.rs @@ -239,7 +239,9 @@ impl ReplicationLog for ReplicationLogService { })?; let response = HelloResponse { - log_id: logger.log_id().to_string(), + database_id: logger.database_id().unwrap().to_string(), + generation_start_index: logger.generation.start_index, + generation_id: logger.generation.id.to_string(), }; Ok(tonic::Response::new(response)) @@ -266,18 +268,12 @@ impl ReplicationLog for ReplicationLogService { { Ok(Ok(Some(snapshot))) => { BLOCKING_RT.spawn_blocking(move || { - let size_after = snapshot.header().size_after; - let mut frames = snapshot.frames_iter_from(offset).peekable(); + let mut frames = snapshot.frames_iter_from(offset); loop { match frames.next() { - Some(Ok(mut frame)) => { - // this is the last frame we're sending for this snapshot, set the - // frame_no - if frames.peek().is_none() { - frame.header_mut().size_after = size_after; - } + Some(Ok(frame)) => { let _ = sender.blocking_send(Ok(Frame { - data: crate::replication::frame::Frame::from(frame).bytes(), + data: frame.bytes(), })); } Some(Err(e)) => { diff --git a/sqld/tests/namespaces/snapshots/tests__namespaces__dumps__load_namespace_from_no_txn.snap b/sqld/tests/namespaces/snapshots/tests__namespaces__dumps__load_namespace_from_no_txn.snap index 2663c515..86f9365c 100644 --- a/sqld/tests/namespaces/snapshots/tests__namespaces__dumps__load_namespace_from_no_txn.snap +++ b/sqld/tests/namespaces/snapshots/tests__namespaces__dumps__load_namespace_from_no_txn.snap @@ -3,5 +3,5 @@ source: sqld/tests/namespaces/dumps.rs expression: resp.json_value().await.unwrap() --- { - "error": "a dump should execute within a transaction." + "error": "A dump should execute within a transaction." }