From f8d3b75d08db07d41f4be2caa9c90766eada59de Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 19 Aug 2021 13:41:45 -0700 Subject: [PATCH] logger: initial commit of klog functionality for segcache-rs Adds a new logger which has command logger specific functionality. Adds command logging to pelikan_segcache_rs. Changes to the memcache wire protocol to support providing the necessary context for command logging. Fixes `cas` operation in seg storage implementation. --- Cargo.lock | 282 ++++++++-------- Cargo.toml | 1 + config/segcache.toml | 4 + src/rust/config/src/debug.rs | 2 +- src/rust/config/src/klog.rs | 92 ++++++ src/rust/config/src/lib.rs | 2 + src/rust/config/src/segcache.rs | 7 + src/rust/core/server/Cargo.toml | 1 + src/rust/core/server/src/process/builder.rs | 4 +- src/rust/core/server/src/threads/admin.rs | 9 +- src/rust/core/server/src/threads/listener.rs | 4 +- .../core/server/src/threads/worker/multi.rs | 2 +- .../core/server/src/threads/worker/single.rs | 10 +- src/rust/entrystore/src/seg/memcache.rs | 28 +- src/rust/logger/Cargo.toml | 12 + src/rust/logger/src/lib.rs | 310 ++++++++++++++++++ src/rust/protocol/Cargo.toml | 2 + src/rust/protocol/src/lib.rs | 8 +- src/rust/protocol/src/memcache/entry/mod.rs | 10 +- src/rust/protocol/src/memcache/storage/mod.rs | 8 +- src/rust/protocol/src/memcache/wire/mod.rs | 75 ++--- .../src/memcache/wire/request/command.rs | 17 + .../protocol/src/memcache/wire/request/mod.rs | 46 +++ .../src/memcache/wire/request/parse.rs | 5 +- .../src/memcache/wire/response/mod.rs | 243 ++++++++++++-- .../protocol/src/ping/wire/response/mod.rs | 3 +- src/rust/server/segcache/Cargo.toml | 2 + src/rust/server/segcache/src/lib.rs | 11 + src/rust/server/segcache/src/main.rs | 10 +- src/rust/storage/seg/src/hashtable/mod.rs | 6 +- 30 files changed, 960 insertions(+), 256 deletions(-) create mode 100644 src/rust/config/src/klog.rs create mode 100644 src/rust/logger/Cargo.toml create mode 100644 src/rust/logger/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 0eccb4c32..44544b8c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "addr2line" -version = "0.15.2" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7a2e47a1fbe209ee101dd6d61285226744c6c8d3c21c8dc878ba6cb9f467f3a" +checksum = "3e61f2b7f93d2c7d2b08263acaa4a363b3e276806c68af6134c44f523bf1aacd" dependencies = [ "gimli", ] @@ -76,9 +76,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" [[package]] name = "backtrace" -version = "0.3.60" +version = "0.3.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7815ea54e4d821e791162e078acbebfd6d8c8939cd559c9335dceb1c8ca7282" +checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01" dependencies = [ "addr2line", "cc", @@ -120,9 +120,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "1.2.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "boring" @@ -179,18 +179,18 @@ checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" [[package]] name = "cast" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57cdfa5d50aad6cb4d44dcab6101a7f79925bd59d82ca42f38a9856a28865374" +checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a" dependencies = [ "rustc_version", ] [[package]] name = "cc" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a72c244c1ff497a746a7e1fb3d14bd08420ecda70c8f25c7112f2781652d787" +checksum = "e70cc2f62c6ce1868963827bd677764c62d07c3d9a3e1fb1177ee1a9ab199eb2" [[package]] name = "cexpr" @@ -275,16 +275,16 @@ dependencies = [ [[package]] name = "criterion" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab327ed7354547cc2ef43cbe20ef68b988e70b4b593cbd66a2a61733123a3d23" +checksum = "1604dafd25fba2fe2d5895a9da139f8dc9b319a5fe5354ca137cbbce4e178d10" dependencies = [ "atty", "cast", "clap", "criterion-plot", "csv", - "itertools 0.10.1", + "itertools", "lazy_static", "num-traits", "oorandom", @@ -301,12 +301,26 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e022feadec601fba1649cfa83586381a4ad31c6bf3a9ab7d408118b05dd9889d" +checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57" dependencies = [ "cast", - "itertools 0.9.0", + "itertools", +] + +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", ] [[package]] @@ -321,9 +335,9 @@ dependencies = [ [[package]] name = "crossbeam-deque" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" dependencies = [ "cfg-if", "crossbeam-epoch", @@ -343,6 +357,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.5" @@ -393,9 +417,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f" +checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3" dependencies = [ "atty", "humantime", @@ -425,6 +449,23 @@ dependencies = [ "libc", ] +[[package]] +name = "flexi_logger" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ba2265890613939b533fa11c3728651531419ac549ccf527896201581f23991" +dependencies = [ + "atty", + "chrono", + "crossbeam", + "glob", + "lazy_static", + "log", + "regex", + "thiserror", + "yansi", +] + [[package]] name = "foreign-types" version = "0.3.2" @@ -459,9 +500,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.24.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4075386626662786ddb0ec9081e7c7eeb1ba31951f447ca780ef9f5d568189" +checksum = "f0a01e0497841a3b2db4f8afa483cce65f7e96a3498bd6c541734792aeac8fe7" [[package]] name = "glob" @@ -486,9 +527,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" dependencies = [ "libc", ] @@ -499,15 +540,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" -[[package]] -name = "itertools" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "284f18f85651fe11e8a991b2adb42cb078325c996ed026d994719efcfca1d54b" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.10.1" @@ -525,9 +557,9 @@ checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" [[package]] name = "js-sys" -version = "0.3.51" +version = "0.3.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83bdfbace3a0e81a4253f73b49e960b053e396a11012cbd49b9b74d6a2b67062" +checksum = "e4bf49d50e2961077d9c99f4b7997d770a1114f087c3c2e0069b36c13fc2979d" dependencies = [ "wasm-bindgen", ] @@ -556,9 +588,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.96" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5600b4e6efc5421841a2138a6b082e07fe12f9aaa12783d50e5d13325b26b4fc" +checksum = "a7f823d141fe0a24df1e23b4af4e3c7ba9e5966ec514ea068c93024aa7deb765" [[package]] name = "libloading" @@ -579,6 +611,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "logger" +version = "0.1.0" +dependencies = [ + "chrono", + "config", + "crossbeam", + "log", +] + [[package]] name = "mach" version = "0.3.2" @@ -590,9 +632,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" [[package]] name = "memmap2" @@ -633,9 +675,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.11" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956" +checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" dependencies = [ "libc", "log", @@ -703,18 +745,18 @@ dependencies = [ [[package]] name = "object" -version = "0.25.2" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8bc1d42047cf336f0f939c99e97183cf31551bf0f2865a2ec9c8d91fd4ffb5e" +checksum = "ee2766204889d09937d00bfbb7fec56bb2a199e2ade963cab19185d8a6104c7c" dependencies = [ "memchr", ] [[package]] name = "once_cell" -version = "1.7.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" +checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" [[package]] name = "oorandom" @@ -743,15 +785,6 @@ dependencies = [ "webpki", ] -[[package]] -name = "pest" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53" -dependencies = [ - "ucd-trie", -] - [[package]] name = "pingserver" version = "0.1.0" @@ -783,15 +816,15 @@ dependencies = [ [[package]] name = "plotters-backend" -version = "0.3.0" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b07fffcddc1cb3a1de753caa4e4df03b79922ba43cf882acc1bdd7e8df9f4590" +checksum = "d88417318da0eaf0fdcdb51a0ee6c3bed624333bff8f946733049380be67ac1c" [[package]] name = "plotters-svg" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b38a02e23bd9604b842a812063aec4ef702b57989c37b655254bb61c471ad211" +checksum = "521fa9638fa597e1dc53e9412a4f9cefb01187ee1f7413076f9e6749e2885ba9" dependencies = [ "plotters-backend", ] @@ -804,9 +837,9 @@ checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "proc-macro2" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038" +checksum = "5c7ed8b8c7b886ea3ed7dde405212185f423ab44682667c8c6dd14aa1d9f6612" dependencies = [ "unicode-xid", ] @@ -818,9 +851,11 @@ dependencies = [ "common", "config", "criterion", + "logger", "metrics", "rustcommon-fastmetrics", "rustcommon-time", + "session", ] [[package]] @@ -843,9 +878,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" dependencies = [ "libc", "rand_chacha", @@ -855,9 +890,9 @@ dependencies = [ [[package]] name = "rand_chacha" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", "rand_core", @@ -865,18 +900,18 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ "getrandom", ] [[package]] name = "rand_hc" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" dependencies = [ "rand_core", ] @@ -964,9 +999,9 @@ dependencies = [ [[package]] name = "rustc-demangle" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "410f7acf3cb3a44527c5d9546bad4bf4e6c460915d5f9f2fc524498bfe8f70ce" +checksum = "dead70b0b5e03e9c814bcb6b01e03e68f7c57a80aa48c72ec92152ab3e818d49" [[package]] name = "rustc-hash" @@ -976,9 +1011,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustc_version" -version = "0.3.3" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ "semver", ] @@ -986,7 +1021,7 @@ dependencies = [ [[package]] name = "rustcommon-atomics" version = "1.0.0" -source = "git+https://github.com/twitter/rustcommon#4e48dcf23accda109775fd88f3de207d504d751b" +source = "git+https://github.com/twitter/rustcommon#88e07783af36f011b3847baf4f0bef0724c4eaad" dependencies = [ "serde", ] @@ -994,7 +1029,7 @@ dependencies = [ [[package]] name = "rustcommon-buffer" version = "0.2.0" -source = "git+https://github.com/twitter/rustcommon#4e48dcf23accda109775fd88f3de207d504d751b" +source = "git+https://github.com/twitter/rustcommon#88e07783af36f011b3847baf4f0bef0724c4eaad" dependencies = [ "bytes", ] @@ -1002,7 +1037,7 @@ dependencies = [ [[package]] name = "rustcommon-fastmetrics" version = "0.0.3" -source = "git+https://github.com/twitter/rustcommon#4e48dcf23accda109775fd88f3de207d504d751b" +source = "git+https://github.com/twitter/rustcommon#88e07783af36f011b3847baf4f0bef0724c4eaad" dependencies = [ "thiserror", ] @@ -1010,7 +1045,7 @@ dependencies = [ [[package]] name = "rustcommon-histogram" version = "1.0.0" -source = "git+https://github.com/twitter/rustcommon#4e48dcf23accda109775fd88f3de207d504d751b" +source = "git+https://github.com/twitter/rustcommon#88e07783af36f011b3847baf4f0bef0724c4eaad" dependencies = [ "rustcommon-atomics", "thiserror", @@ -1019,7 +1054,7 @@ dependencies = [ [[package]] name = "rustcommon-logger" version = "1.0.0" -source = "git+https://github.com/twitter/rustcommon#4e48dcf23accda109775fd88f3de207d504d751b" +source = "git+https://github.com/twitter/rustcommon#88e07783af36f011b3847baf4f0bef0724c4eaad" dependencies = [ "log", "time", @@ -1028,7 +1063,7 @@ dependencies = [ [[package]] name = "rustcommon-time" version = "0.0.6" -source = "git+https://github.com/twitter/rustcommon#4e48dcf23accda109775fd88f3de207d504d751b" +source = "git+https://github.com/twitter/rustcommon#88e07783af36f011b3847baf4f0bef0724c4eaad" dependencies = [ "lazy_static", "libc", @@ -1104,6 +1139,8 @@ dependencies = [ "config", "criterion", "entrystore", + "flexi_logger", + "logger", "metrics", "protocol", "rustcommon-fastmetrics", @@ -1114,36 +1151,24 @@ dependencies = [ [[package]] name = "semver" -version = "0.11.0" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" -dependencies = [ - "semver-parser", -] - -[[package]] -name = "semver-parser" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" -dependencies = [ - "pest", -] +checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" [[package]] name = "serde" -version = "1.0.126" +version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +checksum = "f03b9878abf6d14e6779d3f24f07b2cfa90352cfec4acc5aab8f1ac7f146fae8" dependencies = [ "serde_derive", ] [[package]] name = "serde_cbor" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" dependencies = [ "half", "serde", @@ -1151,9 +1176,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.126" +version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" +checksum = "a024926d3432516606328597e0f224a51355a493b49fdd67e9209187cbe55ecc" dependencies = [ "proc-macro2", "quote", @@ -1162,9 +1187,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.64" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79" +checksum = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127" dependencies = [ "itoa", "ryu", @@ -1186,6 +1211,7 @@ dependencies = [ "crossbeam-channel", "entrystore", "libc", + "logger", "metrics", "mio", "protocol", @@ -1236,9 +1262,9 @@ checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" [[package]] name = "slab" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527" +checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590" [[package]] name = "spin" @@ -1272,9 +1298,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.72" +version = "1.0.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e8cdbefb79a9a5a65e0db8b47b723ee907b7c7f8496c76a1770b5c310bab82" +checksum = "1873d832550d4588c3dbc20f01361ab00bfe741048f71e3fecf145a7cc18b29c" dependencies = [ "proc-macro2", "quote", @@ -1313,18 +1339,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.25" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa6f76457f59514c7eeb4e59d891395fab0b2fd1d40723ae737d64153392e9c6" +checksum = "93119e4feac1cbe6c798c34d3a53ea0026b0b1de6a120deef895137c0529bfe2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.25" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a36768c0fbf1bb15eca10defa29526bda730a2376c2ab4393ccfa16fb1a318d" +checksum = "060d69a0afe7796bf42e9e2ff91f5ee691fb15c53d38b4b62a9a53eb23164745" dependencies = [ "proc-macro2", "quote", @@ -1361,17 +1387,11 @@ dependencies = [ "serde", ] -[[package]] -name = "ucd-trie" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" - [[package]] name = "unicode-segmentation" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796" +checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" [[package]] name = "unicode-width" @@ -1422,9 +1442,9 @@ checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" [[package]] name = "wasm-bindgen" -version = "0.2.74" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" +checksum = "8ce9b1b516211d33767048e5d47fa2a381ed8b76fc48d2ce4aa39877f9f183e0" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1432,9 +1452,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.74" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b33f6a0694ccfea53d94db8b2ed1c3a8a4c86dd936b13b9f0a15ec4a451b900" +checksum = "cfe8dc78e2326ba5f845f4b5bf548401604fa20b1dd1d365fb73b6c1d6364041" dependencies = [ "bumpalo", "lazy_static", @@ -1447,9 +1467,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.74" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "088169ca61430fe1e58b8096c24975251700e7b1f6fd91cc9d59b04fb9b18bd4" +checksum = "44468aa53335841d9d6b6c023eaab07c0cd4bddbcfdee3e2bb1e8d2cb8069fef" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1457,9 +1477,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.74" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be2241542ff3d9f241f5e2cb6dd09b37efe786df8851c54957683a49f0987a97" +checksum = "0195807922713af1e67dc66132c7328206ed9766af3858164fb583eedc25fbad" dependencies = [ "proc-macro2", "quote", @@ -1470,15 +1490,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.74" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f" +checksum = "acdb075a845574a1fa5f09fd77e43f7747599301ea3417a9fbffdeedfc1f4a29" [[package]] name = "web-sys" -version = "0.3.51" +version = "0.3.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e828417b379f3df7111d3a2a9e5753706cae29c41f7c4029ee9fd77f3e09e582" +checksum = "224b2f6b67919060055ef1a67807367c2066ed520c3862cc013d26cf893a783c" dependencies = [ "js-sys", "wasm-bindgen", @@ -1545,3 +1565,9 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "yansi" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fc79f4a1e39857fc00c3f662cbf2651c771f00e9c15fe2abc341806bd46bd71" diff --git a/Cargo.toml b/Cargo.toml index c59754dbb..082527db1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "src/rust/config", "src/rust/core/server", "src/rust/entrystore", + "src/rust/logger", "src/rust/metrics", "src/rust/protocol", "src/rust/queues", diff --git a/config/segcache.toml b/config/segcache.toml index 9b3e64ee4..e9b9748db 100644 --- a/config/segcache.toml +++ b/config/segcache.toml @@ -49,6 +49,10 @@ time_type = "Memcache" # choose from: error, warn, info, debug, trace log_level = "info" +[klog] +klog_file = "segcache.cmd" +klog_sample = 100 + [sockio] [tcp] diff --git a/src/rust/config/src/debug.rs b/src/rust/config/src/debug.rs index 0343e4ee1..1a7296f51 100644 --- a/src/rust/config/src/debug.rs +++ b/src/rust/config/src/debug.rs @@ -24,7 +24,7 @@ fn log_nbuf() -> usize { } // struct definitions -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DebugConfig { #[serde(with = "LevelDef")] #[serde(default = "log_level")] diff --git a/src/rust/config/src/klog.rs b/src/rust/config/src/klog.rs new file mode 100644 index 000000000..b94b468f1 --- /dev/null +++ b/src/rust/config/src/klog.rs @@ -0,0 +1,92 @@ +// Copyright 2020 Twitter, Inc. +// Licensed under the Apache License, Version 2.0 +// http://www.apache.org/licenses/LICENSE-2.0 + +use serde::{Deserialize, Serialize}; + +//////////////////////////////////////////////////////////////////////////////// +// constants to define default values +//////////////////////////////////////////////////////////////////////////////// + +// log to the file path +const KLOG_FILE: Option = None; + +// flush interval in milliseconds +const KLOG_INTERVAL: usize = 100; + +// buffer size in bytes +const KLOG_NBUF: usize = 0; + +// log 1 in every N commands +const KLOG_SAMPLE: usize = 100; + +//////////////////////////////////////////////////////////////////////////////// +// helper functions +//////////////////////////////////////////////////////////////////////////////// + +fn klog_file() -> Option { + KLOG_FILE +} + +fn klog_interval() -> usize { + KLOG_INTERVAL +} + +fn klog_nbuf() -> usize { + KLOG_NBUF +} + +fn klog_sample() -> usize { + KLOG_SAMPLE +} + + +//////////////////////////////////////////////////////////////////////////////// +// struct definitions +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct KlogConfig { + #[serde(default = "klog_file")] + klog_file: Option, + #[serde(default = "klog_interval")] + klog_interval: usize, + #[serde(default = "klog_nbuf")] + klog_nbuf: usize, + #[serde(default = "klog_sample")] + klog_sample: usize, +} + +//////////////////////////////////////////////////////////////////////////////// +// implementation +//////////////////////////////////////////////////////////////////////////////// + +impl KlogConfig { + pub fn file(&self) -> Option { + self.klog_file.clone() + } + + pub fn interval(&self) -> usize { + self.klog_interval + } + + pub fn nbuf(&self) -> usize { + self.klog_nbuf + } + + pub fn sample(&self) -> usize { + self.klog_sample + } +} + +// trait implementations +impl Default for KlogConfig { + fn default() -> Self { + Self { + klog_file: klog_file(), + klog_interval: klog_interval(), + klog_nbuf: klog_nbuf(), + klog_sample: klog_sample(), + } + } +} diff --git a/src/rust/config/src/lib.rs b/src/rust/config/src/lib.rs index 9c407d81d..d27238c87 100644 --- a/src/rust/config/src/lib.rs +++ b/src/rust/config/src/lib.rs @@ -10,6 +10,7 @@ mod array; mod buf; mod dbuf; mod debug; +mod klog; mod pingserver; pub mod seg; mod segcache; @@ -26,6 +27,7 @@ pub use array::ArrayConfig; pub use buf::BufConfig; pub use dbuf::DbufConfig; pub use debug::DebugConfig; +pub use klog::KlogConfig; pub use pingserver::PingserverConfig; pub use seg::SegConfig; pub use segcache::SegcacheConfig; diff --git a/src/rust/config/src/segcache.rs b/src/rust/config/src/segcache.rs index 49a1ee92b..ba31e97b2 100644 --- a/src/rust/config/src/segcache.rs +++ b/src/rust/config/src/segcache.rs @@ -57,6 +57,8 @@ pub struct SegcacheConfig { #[serde(default)] debug: DebugConfig, #[serde(default)] + klog: KlogConfig, + #[serde(default)] sockio: SockioConfig, #[serde(default)] tcp: TcpConfig, @@ -120,6 +122,10 @@ impl SegcacheConfig { &self.debug } + pub fn klog(&self) -> &KlogConfig { + &self.klog + } + pub fn sockio(&self) -> &SockioConfig { &self.sockio } @@ -153,6 +159,7 @@ impl Default for SegcacheConfig { buf: Default::default(), debug: Default::default(), + klog: Default::default(), sockio: Default::default(), tcp: Default::default(), tls: Default::default(), diff --git a/src/rust/core/server/Cargo.toml b/src/rust/core/server/Cargo.toml index 1535a9780..c020612a7 100644 --- a/src/rust/core/server/Cargo.toml +++ b/src/rust/core/server/Cargo.toml @@ -20,6 +20,7 @@ common = { path = "../../common" } config = { path = "../../config" } crossbeam-channel = "0.5.0" libc = "0.2.83" +logger = { path = "../../logger" } metrics = { path = "../../metrics" } mio = { version = "0.7.7", features = ["os-poll", "tcp"] } protocol = { path = "../../protocol" } diff --git a/src/rust/core/server/src/process/builder.rs b/src/rust/core/server/src/process/builder.rs index 285ba3247..6d8180685 100644 --- a/src/rust/core/server/src/process/builder.rs +++ b/src/rust/core/server/src/process/builder.rs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 +use logger::PelikanLogReceiver; use super::*; use crate::threads::*; use crate::THREAD_PREFIX; @@ -46,13 +47,14 @@ where storage: Storage, max_buffer_size: usize, parser: Parser, + logger: PelikanLogReceiver, ) -> Self { // initialize admin let ssl_context = common::ssl::ssl_context(tls_config).unwrap_or_else(|e| { error!("failed to initialize TLS: {}", e); std::process::exit(1); }); - let admin = Admin::new(admin_config, ssl_context).unwrap_or_else(|e| { + let admin = Admin::new(admin_config, ssl_context, logger).unwrap_or_else(|e| { error!("failed to initialize admin: {}", e); std::process::exit(1); }); diff --git a/src/rust/core/server/src/threads/admin.rs b/src/rust/core/server/src/threads/admin.rs index 9ce6aa9d0..0e53bf229 100644 --- a/src/rust/core/server/src/threads/admin.rs +++ b/src/rust/core/server/src/threads/admin.rs @@ -5,6 +5,7 @@ //! The admin thread, which handles admin requests to return stats, get version //! info, etc. +use logger::PelikanLogReceiver; use super::EventLoop; use crate::poll::{Poll, LISTENER_TOKEN, WAKER_TOKEN}; use boring::ssl::{HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslStream}; @@ -37,11 +38,12 @@ pub struct Admin { ssl_context: Option, signal_queue: QueuePairs<(), Signal>, parser: AdminRequestParser, + log_receiver: PelikanLogReceiver, } impl Admin { /// Creates a new `Admin` event loop. - pub fn new(config: &AdminConfig, ssl_context: Option) -> Result { + pub fn new(config: &AdminConfig, ssl_context: Option, log_receiver: PelikanLogReceiver) -> Result { let addr = config.socket_addr().map_err(|e| { error!("{}", e); std::io::Error::new(std::io::ErrorKind::Other, "Bad listen address") @@ -68,6 +70,7 @@ impl Admin { ssl_context, signal_queue, parser: AdminRequestParser::new(), + log_receiver, }) } @@ -116,7 +119,7 @@ impl Admin { Ok((stream, _)) => { // handle TLS if it is configured if let Some(ssl_context) = &self.ssl_context { - match Ssl::new(&ssl_context).map(|v| v.accept(stream)) { + match Ssl::new(ssl_context).map(|v| v.accept(stream)) { // handle case where we have a fully-negotiated // TLS stream on accept() Ok(Ok(tls_stream)) => { @@ -264,6 +267,8 @@ impl Admin { } self.get_rusage(); + + self.log_receiver.flush(); } } diff --git a/src/rust/core/server/src/threads/listener.rs b/src/rust/core/server/src/threads/listener.rs index 643cbf10d..17d3385a0 100644 --- a/src/rust/core/server/src/threads/listener.rs +++ b/src/rust/core/server/src/threads/listener.rs @@ -80,7 +80,7 @@ impl Listener { Ok((stream, _)) => { // handle TLS if it is configured if let Some(ssl_context) = &self.ssl_context { - match Ssl::new(&ssl_context).map(|v| v.accept(stream)) { + match Ssl::new(ssl_context).map(|v| v.accept(stream)) { // handle case where we have a fully-negotiated // TLS stream on accept() Ok(Ok(tls_stream)) => { @@ -219,7 +219,7 @@ impl Listener { } } _ => { - self.handle_session_event(&event); + self.handle_session_event(event); } } } diff --git a/src/rust/core/server/src/threads/worker/multi.rs b/src/rust/core/server/src/threads/worker/multi.rs index 65e45c306..2363301a8 100644 --- a/src/rust/core/server/src/threads/worker/multi.rs +++ b/src/rust/core/server/src/threads/worker/multi.rs @@ -121,7 +121,7 @@ where } } Token(_) => { - self.handle_event(&event); + self.handle_event(event); } } } diff --git a/src/rust/core/server/src/threads/worker/single.rs b/src/rust/core/server/src/threads/worker/single.rs index 160fd7900..b15c03c0b 100644 --- a/src/rust/core/server/src/threads/worker/single.rs +++ b/src/rust/core/server/src/threads/worker/single.rs @@ -7,6 +7,7 @@ //! the request using the backing storage, and then composes a response onto the //! session buffer. +use rustcommon_time::CoarseInstant; use super::EventLoop; use crate::poll::{Poll, WAKER_TOKEN}; use common::signal::Signal; @@ -76,11 +77,16 @@ where /// Run the `Worker` in a loop, handling new session events pub fn run(&mut self) { let mut events = Events::with_capacity(self.nevent); + let mut last_expire = CoarseInstant::recent(); loop { increment_counter!(&Stat::WorkerEventLoop); - self.storage.expire(); + let now = CoarseInstant::recent(); + if now != last_expire { + self.storage.expire(); + last_expire = now; + } // get events with timeout if self.poll.poll(&mut events, self.timeout).is_err() { @@ -108,7 +114,7 @@ where } } _ => { - self.handle_event(&event); + self.handle_event(event); } } } diff --git a/src/rust/entrystore/src/seg/memcache.rs b/src/rust/entrystore/src/seg/memcache.rs index 4a894b5b5..a2fe48750 100644 --- a/src/rust/entrystore/src/seg/memcache.rs +++ b/src/rust/entrystore/src/seg/memcache.rs @@ -8,25 +8,33 @@ use protocol::memcache::{MemcacheEntry, MemcacheStorage, MemcacheStorageError}; impl MemcacheStorage for Seg { fn get(&mut self, keys: &[Box<[u8]>]) -> Box<[MemcacheEntry]> { - let mut items = Vec::new(); + let mut items = Vec::with_capacity(keys.len()); for key in keys { if let Some(item) = self.data.get(key) { let o = item.optional().unwrap_or(&[0, 0, 0, 0]); let flags = u32::from_be_bytes([o[0], o[1], o[2], o[3]]); items.push(MemcacheEntry { key: item.key().to_vec().into_boxed_slice(), - value: item.value().to_vec().into_boxed_slice(), + value: Some(item.value().to_vec().into_boxed_slice()), flags, cas: Some(item.cas().into()), ttl: None, }); + } else { + items.push(MemcacheEntry { + key: key.to_vec().into_boxed_slice(), + value: None, + flags: 0, + cas: None, + ttl: None, + }); } } items.into_boxed_slice() } - fn set(&mut self, entry: MemcacheEntry) -> Result<(), MemcacheStorageError> { + fn set(&mut self, entry: &MemcacheEntry) -> Result<(), MemcacheStorageError> { let ttl = if entry.ttl() == Some(0) { return Err(MemcacheStorageError::NotStored); } else { @@ -35,7 +43,7 @@ impl MemcacheStorage for Seg { match self.data.insert( entry.key(), - entry.value(), + entry.value().unwrap_or(b""), Some(&entry.flags().to_be_bytes()), ttl, ) { @@ -44,7 +52,7 @@ impl MemcacheStorage for Seg { } } - fn add(&mut self, entry: MemcacheEntry) -> Result<(), MemcacheStorageError> { + fn add(&mut self, entry: &MemcacheEntry) -> Result<(), MemcacheStorageError> { let ttl = if entry.ttl() == Some(0) { return Err(MemcacheStorageError::NotStored); } else { @@ -56,7 +64,7 @@ impl MemcacheStorage for Seg { .data .insert( entry.key(), - entry.value(), + entry.value().unwrap_or(b""), Some(&entry.flags().to_be_bytes()), ttl, ) @@ -68,7 +76,7 @@ impl MemcacheStorage for Seg { } } - fn replace(&mut self, entry: MemcacheEntry) -> Result<(), MemcacheStorageError> { + fn replace(&mut self, entry: &MemcacheEntry) -> Result<(), MemcacheStorageError> { let ttl = if entry.ttl() == Some(0) { return Err(MemcacheStorageError::NotStored); } else { @@ -80,7 +88,7 @@ impl MemcacheStorage for Seg { .data .insert( entry.key(), - entry.value(), + entry.value().unwrap_or(b""), Some(&entry.flags().to_be_bytes()), ttl, ) @@ -100,7 +108,7 @@ impl MemcacheStorage for Seg { } } - fn cas(&mut self, entry: MemcacheEntry) -> Result<(), MemcacheStorageError> { + fn cas(&mut self, entry: &MemcacheEntry) -> Result<(), MemcacheStorageError> { let ttl = if entry.ttl() == Some(0) { return Err(MemcacheStorageError::NotStored); } else { @@ -109,7 +117,7 @@ impl MemcacheStorage for Seg { match self.data.cas( entry.key(), - entry.value(), + entry.value().unwrap_or(b""), Some(&entry.flags().to_be_bytes()), ttl, entry.cas().unwrap_or(0) as u32, diff --git a/src/rust/logger/Cargo.toml b/src/rust/logger/Cargo.toml new file mode 100644 index 000000000..3a9749896 --- /dev/null +++ b/src/rust/logger/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "logger" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4.19" +config = { path = "../config" } +crossbeam = "*" +log = { version = "0.4.8", features = ["std", "release_max_level_debug"] } \ No newline at end of file diff --git a/src/rust/logger/src/lib.rs b/src/rust/logger/src/lib.rs new file mode 100644 index 000000000..548c99a7e --- /dev/null +++ b/src/rust/logger/src/lib.rs @@ -0,0 +1,310 @@ +// Copyright 2021 Twitter, Inc. +// Licensed under the Apache License, Version 2.0 +// http://www.apache.org/licenses/LICENSE-2.0 + +use core::sync::atomic::{AtomicUsize, Ordering}; + +use std::io::{BufWriter, Write}; +use std::path::PathBuf; +use std::sync::Arc; + +use config::{DebugConfig, KlogConfig}; +use crossbeam::queue::ArrayQueue; + +pub use log::*; + +#[macro_export] +macro_rules! klog { + ($($arg:tt)*) => ( + error!(target: "klog", $($arg)*); + ) +} + +pub struct NopSender {} + +pub struct LogSender { + level: LevelFilter, + sender: Arc>>, + buf_pool: Arc>>, + buf_size: usize, + format: FormatFunction, +} + +pub struct SamplingLogSender { + sender: LogSender, + current: AtomicUsize, + sample: usize, +} + +pub type FormatFunction = fn( + write: &mut dyn std::io::Write, + // now: DateTime, + record: &Record, +) -> Result<(), std::io::Error>; + +pub fn default_format( + w: &mut dyn std::io::Write, + record: &Record, +) -> Result<(), std::io::Error> { + writeln!( + w, + "{} [{}] {}", + record.level(), + record.module_path().unwrap_or(""), + record.args() + ) +} + +pub struct LogReceiver { + receiver: Arc>>, + buf_pool: Arc>>, + buf_size: usize, + writer: BufWriter, +} + +impl LogReceiver { + pub fn flush(&mut self) { + for _ in 0..self.receiver.len() { + if let Some(mut msg) = self.receiver.pop() { + let _ = self.writer.write(&msg); + if msg.capacity() <= self.buf_size { + msg.clear(); + let _ = self.buf_pool.push(msg); + } + } else { + break; + } + } + let _ = self.writer.flush(); + } +} + +pub struct LogBuilder { + buf_size: usize, + buf_pool: usize, + level: LevelFilter, + format: FormatFunction, + path: Option, +} + +impl Default for LogBuilder { + fn default() -> Self { + Self { + buf_size: 1024, + buf_pool: 1024 * 2, + level: LevelFilter::Info, + format: default_format, + path: None, + } + } +} + +impl LogBuilder { + pub fn buf_size(mut self, size: usize) -> Self { + self.buf_size = size; + self + } + + pub fn buf_pool(mut self, count: usize) -> Self { + self.buf_pool = count; + self + } + + pub fn level(mut self, level: LevelFilter) -> Self { + self.level = level; + self + } + + pub fn format(mut self, format: FormatFunction) -> Self { + self.format = format; + self + } + + pub fn build(self) -> (LogSender, LogReceiver) { + let log_queue = Arc::new(ArrayQueue::new(1024*1024)); + let buf_queue = Arc::new(ArrayQueue::new(self.buf_pool)); + + let sender = LogSender { + level: self.level, + format: self.format, + sender: log_queue.clone(), + buf_pool: buf_queue.clone(), + buf_size: self.buf_size, + }; + + let receiver = LogReceiver { + receiver: log_queue, + buf_pool: buf_queue, + buf_size: self.buf_size, + writer: BufWriter::with_capacity(1024 * 1024 * 2, std::io::stdout()), + }; + + (sender, receiver) + } +} + +impl Log for NopSender { + fn enabled(&self, _: &log::Metadata<'_>) -> bool { false } + fn log(&self, _: &log::Record<'_>) { } + fn flush(&self) { } +} + +impl Log for LogSender { + fn enabled(&self, metadata: &log::Metadata<'_>) -> bool { + metadata.level() <= self.level + } + + fn log(&self, record: &log::Record<'_>) { + if !self.enabled(record.metadata()) { + return; + } + + let mut buffer = self.buf_pool.pop() + .unwrap_or_else(|| Vec::with_capacity(self.buf_size)); + + if (self.format)(&mut buffer, record).is_ok() { + let _ = self.sender.push(buffer); + } + } + + fn flush(&self) { } +} + +impl Log for SamplingLogSender { + fn enabled(&self, metadata: &log::Metadata<'_>) -> bool { + self.sender.enabled(metadata) + } + + fn log(&self, record: &log::Record<'_>) { + if !self.enabled(record.metadata()) { + return; + } + if self.current.fetch_add(1, Ordering::Relaxed) == self.sample { + self.current.fetch_sub(self.sample, Ordering::Relaxed); + self.sender.log(record) + } + } + + fn flush(&self) { } +} + +#[derive(Clone)] +pub struct PelikanLogSender { + debug: Arc, + command: Arc, + level: Level, +} + +impl PelikanLogSender { + pub fn start(self) { + let level = self.level; + log::set_boxed_logger(Box::new(self)) + .map(|()| log::set_max_level(level.to_level_filter())) + .expect("failed to start logger"); + } +} + +pub struct PelikanLogReceiver { + debug: LogReceiver, + command: Option, +} + +impl PelikanLogReceiver { + pub fn flush(&mut self) { + self.debug.flush(); + if let Some(command) = &mut self.command { + command.flush() + }; + } +} + +#[derive(Default)] +pub struct PelikanLogBuilder { + debug: DebugConfig, + command: KlogConfig, +} + +impl PelikanLogBuilder { + pub fn debug(mut self, config: DebugConfig) -> Self { + self.debug = config; + self + } + + pub fn command(mut self, config: KlogConfig) -> Self { + self.command = config; + self + } + + pub fn build(self) -> (PelikanLogSender, PelikanLogReceiver) { + let (debug_send, debug_recv) = LogBuilder::default() + .buf_size(2048) + .buf_pool(8192) + .level(filter_for_level(self.debug.log_level())) + .format(default_format) + .build(); + + let (klog_send, klog_recv) = if let Some(_file) = self.command.file() { + let (s,r) = LogBuilder::default() + .buf_size(2048) + .buf_pool(8192) + .format(default_format) + .build(); + let s: Box = if self.command.sample() > 1 { + Box::new(SamplingLogSender { + sample: self.command.sample(), + current: AtomicUsize::new(self.command.sample()), + sender: s, + }) + } else { + Box::new(s) + }; + (s, Some(r)) + } else { + (Box::new(NopSender { }) as Box, None) + }; + + let sender = PelikanLogSender { + debug: Arc::new(Box::new(debug_send)), + command: Arc::new(klog_send), + level: self.debug.log_level(), + }; + + let receiver = PelikanLogReceiver { + debug: debug_recv, + command: klog_recv, + }; + + (sender, receiver) + } + +} + +fn filter_for_level(level: Level) -> LevelFilter { + match level { + Level::Trace => LevelFilter::Trace, + Level::Debug => LevelFilter::Debug, + Level::Info => LevelFilter::Info, + Level::Warn => LevelFilter::Warn, + Level::Error => LevelFilter::Error, + } +} + +impl Log for PelikanLogSender { + fn enabled(&self, metadata: &log::Metadata<'_>) -> bool { + if metadata.target() == "klog" { + self.command.enabled(metadata) + } else { + self.debug.enabled(metadata) + } + } + + fn log(&self, record: &log::Record<'_>) { + if record.metadata().target() == "klog" { + self.command.log(record) + } else { + self.debug.log(record) + } + } + + fn flush(&self) { } +} diff --git a/src/rust/protocol/Cargo.toml b/src/rust/protocol/Cargo.toml index 1c12822de..f41fb3f97 100644 --- a/src/rust/protocol/Cargo.toml +++ b/src/rust/protocol/Cargo.toml @@ -16,9 +16,11 @@ harness = false [dependencies] common = { path = "../../rust/common" } config = { path = "../../rust/config" } +logger = { path = "../../rust/logger" } metrics = { path = "../../rust/metrics" } rustcommon-fastmetrics = { git = "https://github.com/twitter/rustcommon" } rustcommon-time = { git = "https://github.com/twitter/rustcommon" } +session = { path = "../../rust/session" } [dev-dependencies] criterion = "0.3.4" diff --git a/src/rust/protocol/src/lib.rs b/src/rust/protocol/src/lib.rs index f527b34d6..f91711b61 100644 --- a/src/rust/protocol/src/lib.rs +++ b/src/rust/protocol/src/lib.rs @@ -2,10 +2,14 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 +#[macro_use] +extern crate logger; + #[macro_use] extern crate rustcommon_fastmetrics; -use std::io::Write; +use session::Session; + pub mod admin; pub mod memcache; @@ -14,7 +18,7 @@ pub mod ping; pub const CRLF: &str = "\r\n"; pub trait Compose { - fn compose(self, dst: &mut Buffer); + fn compose(self, dst: &mut Session); } pub trait Execute { diff --git a/src/rust/protocol/src/memcache/entry/mod.rs b/src/rust/protocol/src/memcache/entry/mod.rs index 19df05976..487a529c4 100644 --- a/src/rust/protocol/src/memcache/entry/mod.rs +++ b/src/rust/protocol/src/memcache/entry/mod.rs @@ -5,7 +5,7 @@ #[derive(Debug)] pub struct MemcacheEntry { pub key: Box<[u8]>, - pub value: Box<[u8]>, + pub value: Option>, pub ttl: Option, pub flags: u32, pub cas: Option, @@ -16,8 +16,12 @@ impl MemcacheEntry { &self.key } - pub fn value(&self) -> &[u8] { - &self.value + pub fn value(&self) -> Option<&[u8]> { + if self.value.is_some() { + Some(self.value.as_ref().unwrap().as_ref()) + } else { + None + } } /// The TTL in seconds. `Some(0)` indicates immediate expiration. `None` diff --git a/src/rust/protocol/src/memcache/storage/mod.rs b/src/rust/protocol/src/memcache/storage/mod.rs index 04c56e5bd..fae29c893 100644 --- a/src/rust/protocol/src/memcache/storage/mod.rs +++ b/src/rust/protocol/src/memcache/storage/mod.rs @@ -17,18 +17,18 @@ pub trait MemcacheStorage { fn get(&mut self, keys: &[Box<[u8]>]) -> Box<[MemcacheEntry]>; /// Store an entry and return a response - fn set(&mut self, entry: MemcacheEntry) -> Result<(), MemcacheStorageError>; + fn set(&mut self, entry: &MemcacheEntry) -> Result<(), MemcacheStorageError>; /// Stores an item if the key is not currently in the cache - fn add(&mut self, entry: MemcacheEntry) -> Result<(), MemcacheStorageError>; + fn add(&mut self, entry: &MemcacheEntry) -> Result<(), MemcacheStorageError>; /// Stores an item only if the key is already in the cache - fn replace(&mut self, entry: MemcacheEntry) -> Result<(), MemcacheStorageError>; + fn replace(&mut self, entry: &MemcacheEntry) -> Result<(), MemcacheStorageError>; /// Remove the item with the specified key fn delete(&mut self, key: &[u8]) -> Result<(), MemcacheStorageError>; /// Compare and store on the CAS value, replacing the stored item if the CAS /// value matches the provided value. - fn cas(&mut self, entry: MemcacheEntry) -> Result<(), MemcacheStorageError>; + fn cas(&mut self, entry: &MemcacheEntry) -> Result<(), MemcacheStorageError>; } diff --git a/src/rust/protocol/src/memcache/wire/mod.rs b/src/rust/protocol/src/memcache/wire/mod.rs index 0a765a4e4..0acbdd15c 100644 --- a/src/rust/protocol/src/memcache/wire/mod.rs +++ b/src/rust/protocol/src/memcache/wire/mod.rs @@ -18,42 +18,23 @@ where T: MemcacheStorage, { fn execute(&mut self, request: MemcacheRequest) -> Option { - let response = match request { - MemcacheRequest::Get { keys } => { - increment_counter!(&Stat::Get); - - let entries = self.get(&keys); - - increment_counter_by!(&Stat::GetKey, keys.len() as u64); - increment_counter_by!(&Stat::GetKeyHit, entries.len() as u64); - increment_counter_by!(&Stat::GetKeyMiss, keys.len() as u64 - entries.len() as u64); - - MemcacheResponse::Values { - entries, + let result = match request { + MemcacheRequest::Get { ref keys } => { + MemcacheResult::Values { + entries: self.get(keys), cas: false, } } - MemcacheRequest::Gets { keys } => { - increment_counter!(&Stat::Gets); - - let entries = self.get(&keys); - - increment_counter_by!(&Stat::GetsKey, keys.len() as u64); - increment_counter_by!(&Stat::GetsKeyHit, entries.len() as u64); - increment_counter_by!(&Stat::GetsKeyMiss, keys.len() as u64 - entries.len() as u64); - - MemcacheResponse::Values { entries, cas: true } + MemcacheRequest::Gets { ref keys } => { + MemcacheResult::Values { entries: self.get(keys), cas: true } } - MemcacheRequest::Set { entry, noreply } => { - increment_counter!(&Stat::Set); + MemcacheRequest::Set { ref entry, noreply } => { let response = match self.set(entry) { Ok(_) => { - increment_counter!(&Stat::SetStored); - MemcacheResponse::Stored + MemcacheResult::Stored } Err(MemcacheStorageError::NotStored) => { - increment_counter!(&Stat::SetNotstored); - MemcacheResponse::NotStored + MemcacheResult::NotStored } _ => { unreachable!() @@ -64,16 +45,13 @@ where } response } - MemcacheRequest::Add { entry, noreply } => { - increment_counter!(&Stat::Add); + MemcacheRequest::Add { ref entry, noreply } => { let response = match self.add(entry) { Ok(_) => { - increment_counter!(&Stat::AddStored); - MemcacheResponse::Stored + MemcacheResult::Stored } Err(MemcacheStorageError::NotStored) => { - increment_counter!(&Stat::AddNotstored); - MemcacheResponse::NotStored + MemcacheResult::NotStored } _ => { unreachable!() @@ -84,11 +62,10 @@ where } response } - MemcacheRequest::Replace { entry, noreply } => { - increment_counter!(&Stat::Replace); + MemcacheRequest::Replace { ref entry, noreply } => { let response = match self.replace(entry) { - Ok(_) => MemcacheResponse::Stored, - Err(MemcacheStorageError::NotStored) => MemcacheResponse::NotStored, + Ok(_) => MemcacheResult::Stored, + Err(MemcacheStorageError::NotStored) => MemcacheResult::NotStored, _ => { unreachable!() } @@ -98,11 +75,10 @@ where } response } - MemcacheRequest::Delete { key, noreply } => { - increment_counter!(&Stat::Delete); - let response = match self.delete(&key) { - Ok(_) => MemcacheResponse::Deleted, - Err(MemcacheStorageError::NotFound) => MemcacheResponse::NotFound, + MemcacheRequest::Delete { ref key, noreply } => { + let response = match self.delete(key) { + Ok(_) => MemcacheResult::Deleted, + Err(MemcacheStorageError::NotFound) => MemcacheResult::NotFound, _ => { unreachable!() } @@ -112,13 +88,12 @@ where } response } - MemcacheRequest::Cas { entry, noreply } => { - increment_counter!(&Stat::Cas); + MemcacheRequest::Cas { ref entry, noreply } => { let response = match self.cas(entry) { - Ok(_) => MemcacheResponse::Stored, - Err(MemcacheStorageError::NotFound) => MemcacheResponse::NotFound, - Err(MemcacheStorageError::Exists) => MemcacheResponse::Exists, - Err(MemcacheStorageError::NotStored) => MemcacheResponse::NotStored, + Ok(_) => MemcacheResult::Stored, + Err(MemcacheStorageError::NotFound) => MemcacheResult::NotFound, + Err(MemcacheStorageError::Exists) => MemcacheResult::Exists, + Err(MemcacheStorageError::NotStored) => MemcacheResult::NotStored, }; if noreply { return None; @@ -127,6 +102,6 @@ where } }; - Some(response) + Some(MemcacheResponse { request, result }) } } diff --git a/src/rust/protocol/src/memcache/wire/request/command.rs b/src/rust/protocol/src/memcache/wire/request/command.rs index 891d85646..a148ee7c3 100644 --- a/src/rust/protocol/src/memcache/wire/request/command.rs +++ b/src/rust/protocol/src/memcache/wire/request/command.rs @@ -6,6 +6,7 @@ use crate::ParseError; use core::convert::TryFrom; /// Memcache protocol commands +#[derive(PartialEq)] pub enum MemcacheCommand { Get, Gets, @@ -37,3 +38,19 @@ impl TryFrom<&[u8]> for MemcacheCommand { Ok(cmd) } } + +impl std::fmt::Display for MemcacheCommand { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + let name = match self { + Self::Get => "get", + Self::Gets => "gets", + Self::Set => "set", + Self::Add => "add", + Self::Replace => "replace", + Self::Cas => "cas", + Self::Delete => "delete", + Self::Quit => "quit", + }; + write!(f, "{}", name) + } +} diff --git a/src/rust/protocol/src/memcache/wire/request/mod.rs b/src/rust/protocol/src/memcache/wire/request/mod.rs index b3c213a9b..f68be482f 100644 --- a/src/rust/protocol/src/memcache/wire/request/mod.rs +++ b/src/rust/protocol/src/memcache/wire/request/mod.rs @@ -28,3 +28,49 @@ pub enum MemcacheRequest { Delete { key: Key, noreply: bool }, Cas { entry: MemcacheEntry, noreply: bool }, } + +impl MemcacheRequest { + pub fn noreply(&self) -> bool { + match self { + Self::Set { noreply, .. } => *noreply, + Self::Add { noreply, .. } => *noreply, + Self::Replace { noreply, .. } => *noreply, + Self::Delete { noreply, .. } => *noreply, + Self::Cas { noreply, .. } => *noreply, + _ => false, + } + } + + pub fn key(&self) -> Result<&[u8], ()> { + match self { + Self::Set { entry, .. } => Ok(entry.key()), + Self::Add { entry, .. } => Ok(entry.key()), + Self::Replace { entry, .. } => Ok(entry.key()), + Self::Delete { key, .. } => Ok(key.as_ref()), + Self::Cas { entry, .. } => Ok(entry.key()), + _ => Err(()), + } + } + + pub fn entry(&self) -> Option<&MemcacheEntry> { + match self { + Self::Set { entry, .. } => Some(entry), + Self::Add { entry, .. } => Some(entry), + Self::Replace { entry, .. } => Some(entry), + Self::Cas { entry, .. } => Some(entry), + _ => None, + } + } + + pub fn command(&self) -> MemcacheCommand { + match self { + Self::Get { .. } => MemcacheCommand::Get, + Self::Gets { .. } => MemcacheCommand::Gets, + Self::Set { .. } => MemcacheCommand::Set, + Self::Add { .. } => MemcacheCommand::Add, + Self::Replace { .. } => MemcacheCommand::Replace, + Self::Delete { .. } => MemcacheCommand::Delete, + Self::Cas { .. } => MemcacheCommand::Cas, + } + } +} diff --git a/src/rust/protocol/src/memcache/wire/request/parse.rs b/src/rust/protocol/src/memcache/wire/request/parse.rs index e23da05e9..591178fd7 100644 --- a/src/rust/protocol/src/memcache/wire/request/parse.rs +++ b/src/rust/protocol/src/memcache/wire/request/parse.rs @@ -145,6 +145,7 @@ fn parse_command(buffer: &[u8]) -> Result { #[allow(clippy::unnecessary_wraps)] fn parse_get(buffer: &[u8]) -> Result, ParseError> { + increment_counter!(&Stat::Get); let mut parse_state = ParseState::new(buffer); // this was already checked for when determining the command @@ -329,9 +330,7 @@ fn parse_set( } let key = buffer[(cmd_end + 1)..key_end].to_vec().into_boxed_slice(); - let value = buffer[value_start..value_end] - .to_vec() - .into_boxed_slice(); + let value = Some(buffer[value_start..value_end].to_vec().into_boxed_slice()); let entry = MemcacheEntry { key, diff --git a/src/rust/protocol/src/memcache/wire/response/mod.rs b/src/rust/protocol/src/memcache/wire/response/mod.rs index 9427f36d0..e75521e89 100644 --- a/src/rust/protocol/src/memcache/wire/response/mod.rs +++ b/src/rust/protocol/src/memcache/wire/response/mod.rs @@ -2,12 +2,22 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 +use std::borrow::Cow; +use session::Session; +use crate::memcache::wire::MemcacheCommand; use crate::memcache::MemcacheEntry; +use crate::memcache::MemcacheRequest; use crate::Compose; use crate::CRLF; use std::io::Write; +use metrics::Stat; -pub enum MemcacheResponse { +pub struct MemcacheResponse { + pub(crate) request: MemcacheRequest, + pub(crate) result: MemcacheResult, +} + +pub enum MemcacheResult { Deleted, Exists, Values { @@ -19,54 +29,221 @@ pub enum MemcacheResponse { Stored, } +impl MemcacheResult { + fn len(&self) -> usize { + self.as_bytes().len() + } + + fn as_bytes(&self) -> &[u8] { + match self { + Self::Deleted => b"DELETED\r\n", + Self::Exists => b"EXISTS\r\n", + Self::Values { .. } => b"VALUE ", + Self::NotFound => b"NOT_FOUND\r\n", + Self::NotStored => b"NOT_STORED\r\n", + Self::Stored => b"STORED\r\n", + } + } + + /// Returns the numeric code representing the result of the request. This is + /// used in the command log and maintains backwards compatibility by + /// mirroring the codes from: + /// https://github.com/twitter/pelikan/blob/master/src/protocol/data/memcache/response.h + fn code(&self) -> usize { + match self { + // UNKNOWN + // OK + // END + // STAT + // VALUE + Self::Stored => 5, + Self::Exists => 6, + Self::Deleted => 7, + Self::NotFound => 8, + Self::NotStored => 9, + // CLIENT_ERROR + // SERVER_ERROR + _ => usize::MAX, + } + } +} + // TODO(bmartin): consider a different trait bound here when reworking buffers. // We ignore the unused result warnings here because we know we're using a // buffer with infallible writes (growable buffer). This is *not* guaranteed by // the current trait bound. #[allow(unused_must_use)] impl Compose for MemcacheResponse { - fn compose(self, dst: &mut Buffer) { - match self { - Self::Deleted => { - dst.write_all(b"DELETED\r\n"); + fn compose(self, dst: &mut Session) { + match self.request { + MemcacheRequest::Get { .. } => increment_counter!(&Stat::Get), + MemcacheRequest::Gets { .. } => increment_counter!(&Stat::Gets), + MemcacheRequest::Set { .. } => { + match self.result { + MemcacheResult::Stored => increment_counter!(&Stat::SetStored), + MemcacheResult::NotStored => increment_counter!(&Stat::SetNotstored), + _ => unreachable!(), + } + increment_counter!(&Stat::Set); + } + MemcacheRequest::Add { .. } => { + match self.result { + MemcacheResult::Stored => increment_counter!(&Stat::AddStored), + MemcacheResult::NotStored => increment_counter!(&Stat::AddNotstored), + _ => unreachable!(), + } + increment_counter!(&Stat::Add); } - Self::Exists => { - dst.write_all(b"EXISTS\r\n"); + MemcacheRequest::Replace { .. } => { + match self.result { + MemcacheResult::Stored => increment_counter!(&Stat::ReplaceStored), + MemcacheResult::NotStored => increment_counter!(&Stat::ReplaceNotstored), + _ => unreachable!(), + } + increment_counter!(&Stat::Replace); + } + MemcacheRequest::Delete { .. } => { + match self.result { + MemcacheResult::NotFound => increment_counter!(&Stat::DeleteNotfound), + MemcacheResult::Deleted => increment_counter!(&Stat::DeleteDeleted), + _ => unreachable!(), + } + increment_counter!(&Stat::Delete); + } + MemcacheRequest::Cas { .. } => { + match self.result { + MemcacheResult::Exists => increment_counter!(&Stat::CasExists), + MemcacheResult::NotFound => increment_counter!(&Stat::CasNotfound), + MemcacheResult::NotStored => increment_counter!(&Stat::CasEx), + MemcacheResult::Stored => increment_counter!(&Stat::CasStored), + _ => unreachable!(), + } + increment_counter!(&Stat::Cas); } - Self::Values { entries, cas } => { - for entry in entries.iter() { - dst.write_all(b"VALUE "); + } + if let MemcacheResult::Values { ref entries, cas } = self.result { + let mut hits = 0; + let total = entries.len(); + + for entry in entries.iter() { + let response_len = if let Some(value) = entry.value() { + hits += 1; + let start_len = dst.write_pending(); + dst.write_all(self.result.as_bytes()); dst.write_all(&*entry.key); + if cas { - dst.write_all( - &format!( - " {} {} {}", - entry.flags, - entry.value.len(), - entry.cas.unwrap_or(0) - ) - .into_bytes(), - ); + write!(dst, + " {} {} {}", + entry.flags, + value.len(), + entry.cas.unwrap_or(0) + ) } else { - dst.write_all( - &format!(" {} {}", entry.flags, entry.value.len()).into_bytes(), - ); - } + write!(dst, " {} {}", entry.flags, value.len()) + }; + dst.write_all(CRLF.as_bytes()); - dst.write_all(&*entry.value); + dst.write_all(value); dst.write_all(CRLF.as_bytes()); - } - dst.write_all(b"END\r\n"); - } - Self::NotFound => { - dst.write_all(b"NOT_FOUND\r\n"); + + // return the number of bytes in the reply + dst.write_pending() - start_len + } else { + 0 + }; + klog_get(&self.request.command(), entry.key(), response_len); } - Self::NotStored => { - dst.write_all(b"NOT_STORED\r\n"); + if self.request.command() == MemcacheCommand::Get { + increment_counter_by!(&Stat::GetKey, total as u64); + increment_counter_by!(&Stat::GetKeyHit, hits as u64); + increment_counter_by!(&Stat::GetKeyMiss, (total - hits) as u64); + } else { + increment_counter_by!(&Stat::GetsKey, total as u64); + increment_counter_by!(&Stat::GetsKeyHit, hits as u64); + increment_counter_by!(&Stat::GetsKeyMiss, (total - hits) as u64); } - Self::Stored => { - dst.write_all(b"STORED\r\n"); + + dst.write_all(b"END\r\n"); + } else { + let response_len = if self.request.noreply() { + 0 + } else { + dst.write_all(self.result.as_bytes()); + self.result.len() + }; + + match self.request.command() { + MemcacheCommand::Delete => klog_delete(&self, response_len), + MemcacheCommand::Cas => klog_cas(&self, response_len), + _ => klog_store(&self, response_len), } } } } + +//////////////////////////////////////////////////////////////////////////////// +// Helper Functions +//////////////////////////////////////////////////////////////////////////////// + +/// Transform +fn string_key(key: Result<&[u8], ()>) -> Cow<'_, str> { + String::from_utf8_lossy(key.unwrap_or_else(|_| b"")) +} + +/// Logs a CAS command +fn klog_cas(response: &MemcacheResponse, response_len: usize) { + if let Some(entry) = response.request.entry() { + klog!( + "\"{} {} {} {} {} {}\" {} {}", + response.request.command(), + string_key(response.request.key()), + entry.flags(), + entry.ttl.unwrap_or(0), + entry.value().map(|v| v.len()).unwrap_or(0), + entry.cas().unwrap_or(0) as u32, + response.result.code(), + response_len + ); + } else { + debug!("{} request missing entry", response.request.command()); + } +} + +/// Logs a DELETE command +fn klog_delete(response: &MemcacheResponse, response_len: usize) { + klog!( + "\"{} {}\" {} {}", + response.request.command(), + string_key(response.request.key()), + response.result.code(), + response_len + ); +} + +/// Logs GET or GETS +fn klog_get(command: &MemcacheCommand, key: &[u8], response_len: usize) { + if response_len == 0 { + klog!("\"{} {}\" 0 {}", command, string_key(Ok(key)), response_len); + } else { + klog!("\"{} {}\" 4 {}", command, string_key(Ok(key)), response_len); + } +} + +/// Logs SET, ADD, or REPLACE +fn klog_store(response: &MemcacheResponse, response_len: usize) { + if let Some(entry) = response.request.entry() { + klog!( + "\"{} {} {} {} {}\" {} {}", + response.request.command(), + string_key(response.request.key()), + entry.flags(), + entry.ttl.unwrap_or(0), + entry.value().map(|v| v.len()).unwrap_or(0), + response.result.code(), + response_len + ); + } else { + debug!("{} request missing entry", response.request.command()); + } +} diff --git a/src/rust/protocol/src/ping/wire/response/mod.rs b/src/rust/protocol/src/ping/wire/response/mod.rs index 10710dfb6..df6748d5a 100644 --- a/src/rust/protocol/src/ping/wire/response/mod.rs +++ b/src/rust/protocol/src/ping/wire/response/mod.rs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0 // http://www.apache.org/licenses/LICENSE-2.0 +use session::Session; use crate::Compose; use std::io::Write; @@ -15,7 +16,7 @@ pub enum PingResponse { // the current trait bound. #[allow(unused_must_use)] impl Compose for PingResponse { - fn compose(self, dst: &mut Buffer) { + fn compose(self, dst: &mut Session) { match self { Self::Pong => { dst.write_all(b"PONG\r\n"); diff --git a/src/rust/server/segcache/Cargo.toml b/src/rust/server/segcache/Cargo.toml index 38ceace77..dc120f388 100644 --- a/src/rust/server/segcache/Cargo.toml +++ b/src/rust/server/segcache/Cargo.toml @@ -40,6 +40,8 @@ debug = ["entrystore/debug"] backtrace = "0.3.56" config = { path = "../../config" } entrystore = { path = "../../entrystore" } +flexi_logger = { version = "0.18.0", features = ["async"] } +logger = { path = "../../logger" } metrics = { path = "../../metrics" } protocol = { path = "../../protocol" } rustcommon-logger = { git = "https://github.com/twitter/rustcommon" } diff --git a/src/rust/server/segcache/src/lib.rs b/src/rust/server/segcache/src/lib.rs index ed656e20d..283d41899 100644 --- a/src/rust/server/segcache/src/lib.rs +++ b/src/rust/server/segcache/src/lib.rs @@ -6,6 +6,7 @@ //! a subset of the Memcache protocol. Segment based storage allows us to //! perform efficient eager expiration of items. +use logger::PelikanLogBuilder; use config::SegcacheConfig; use entrystore::Seg; use protocol::memcache::{MemcacheRequest, MemcacheRequestParser, MemcacheResponse}; @@ -17,6 +18,7 @@ type Response = MemcacheResponse; type Storage = Seg; /// This structure represents a running `Segcache` process. +#[allow(dead_code)] pub struct Segcache { process: Process, } @@ -24,6 +26,14 @@ pub struct Segcache { impl Segcache { /// Creates a new `Segcache` process from the given `SegcacheConfig`. pub fn new(config: SegcacheConfig) -> Self { + // initialize logging + let (logger, receiver) = PelikanLogBuilder::default() + .debug(config.debug().clone()) + .command(config.klog().clone()) + .build(); + + logger.start(); + // initialize metrics metrics::init(); @@ -50,6 +60,7 @@ impl Segcache { storage, max_buffer_size, parser, + receiver, ); // spawn threads diff --git a/src/rust/server/segcache/src/main.rs b/src/rust/server/segcache/src/main.rs index f22066fbc..872cddbfc 100644 --- a/src/rust/server/segcache/src/main.rs +++ b/src/rust/server/segcache/src/main.rs @@ -3,12 +3,11 @@ // http://www.apache.org/licenses/LICENSE-2.0 #[macro_use] -extern crate rustcommon_logger; +extern crate logger; use backtrace::Backtrace; use config::SegcacheConfig; use pelikan_segcache_rs::Segcache; -use rustcommon_logger::Logger; fn main() { // custom panic hook to terminate whole process after unwinding @@ -32,13 +31,6 @@ fn main() { Default::default() }; - // initialize logging - Logger::new() - .label(env!("CARGO_CRATE_NAME")) - .level(config.debug().log_level()) - .init() - .expect("Failed to initialize logger"); - // launch segcache Segcache::new(config).wait() } diff --git a/src/rust/storage/seg/src/hashtable/mod.rs b/src/rust/storage/seg/src/hashtable/mod.rs index 618c84b18..67c9269e9 100644 --- a/src/rust/storage/seg/src/hashtable/mod.rs +++ b/src/rust/storage/seg/src/hashtable/mod.rs @@ -511,10 +511,6 @@ impl HashTable { trace!("hash: {} mask: {} bucket: {}", hash, self.mask, bucket_id); - if cas != get_cas(bucket.data[0]) { - return Err(SegError::Exists); - } - loop { let n_item_slot = if chain_idx == chain_len { N_BUCKET_SLOT @@ -550,6 +546,8 @@ impl HashTable { // TODO(bmartin): what is expected on overflow of the cas bits? self.data[(hash & self.mask) as usize].data[0] += 1; return Ok(()); + } else { + return Err(SegError::Exists); } } }