From de8c62d6361818f578f5a5c3249a365f258fdb5f Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 19 Sep 2024 08:01:24 -0700 Subject: [PATCH] expand pingserver functionality (#134) Adds gRPC, including gRPC directly implemented over HTTP2 and HTTP3, support to the pingserver and provides an alternate implementation of the ASCII pingserver using Tokio as the IO engine. --- .github/actions/pingserver/action.yml | 2 +- .github/workflows/cargo.yml | 6 +- Cargo.lock | 1065 ++++++++++++++++++-- Cargo.toml | 2 +- README.md | 6 +- config/pingserver.toml | 15 + src/server/pingserver/Cargo.toml | 52 +- src/server/pingserver/benches/benchmark.rs | 61 -- src/server/pingserver/build.rs | 4 + src/server/pingserver/proto/pingpong.proto | 12 + src/server/pingserver/src/config.rs | 195 ++++ src/server/pingserver/src/lib.rs | 67 -- src/server/pingserver/src/main.rs | 79 +- src/server/pingserver/src/tokio/admin.rs | 233 +++++ src/server/pingserver/src/tokio/ascii.rs | 96 ++ src/server/pingserver/src/tokio/grpc.rs | 41 + src/server/pingserver/src/tokio/http2.rs | 106 ++ src/server/pingserver/src/tokio/http3.rs | 130 +++ src/server/pingserver/src/tokio/metrics.rs | 198 ++++ src/server/pingserver/src/tokio/mod.rs | 87 ++ src/server/pingserver/tests/integration.rs | 176 ---- 21 files changed, 2201 insertions(+), 432 deletions(-) delete mode 100644 src/server/pingserver/benches/benchmark.rs create mode 100644 src/server/pingserver/build.rs create mode 100644 src/server/pingserver/proto/pingpong.proto create mode 100644 src/server/pingserver/src/config.rs delete mode 100644 src/server/pingserver/src/lib.rs create mode 100644 src/server/pingserver/src/tokio/admin.rs create mode 100644 src/server/pingserver/src/tokio/ascii.rs create mode 100644 src/server/pingserver/src/tokio/grpc.rs create mode 100644 src/server/pingserver/src/tokio/http2.rs create mode 100644 src/server/pingserver/src/tokio/http3.rs create mode 100644 src/server/pingserver/src/tokio/metrics.rs create mode 100644 src/server/pingserver/src/tokio/mod.rs delete mode 100644 src/server/pingserver/tests/integration.rs diff --git a/.github/actions/pingserver/action.yml b/.github/actions/pingserver/action.yml index 523734b7..39ff5eb7 100644 --- a/.github/actions/pingserver/action.yml +++ b/.github/actions/pingserver/action.yml @@ -41,6 +41,6 @@ runs: shell: bash - name: Run pingserver run: | - pelikan_pingserver_rs server.toml & + pelikan_pingserver server.toml & sleep 10 shell: bash diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index d7c800e5..3d96fd4e 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -30,7 +30,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@stable + - uses: ./.github/actions/setup-rust - uses: Swatinem/rust-cache@v2 - uses: swlynch99/cargo-sweep-action@v1 - uses: taiki-e/install-action@cargo-hack @@ -96,7 +96,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - uses: dtolnay/rust-toolchain@stable + - uses: ./.github/actions/setup-rust - name: cargo fmt shell: bash run: | @@ -146,7 +146,7 @@ jobs: shell: bash run: | mkdir install - cp target/release/pelikan_pingserver_rs install + cp target/release/pelikan_pingserver install cp target/release/pelikan_pingproxy_rs install - uses: actions/upload-artifact@v3 if: ${{ matrix.profile == 'release' && matrix.os == 'ubuntu-latest' }} diff --git a/Cargo.lock b/Cargo.lock index 1785c32e..891eaee2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,6 +60,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anes" version = "0.1.6" @@ -177,6 +192,12 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.2.0" @@ -190,7 +211,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd279f60d540c599125bfe1ac89293a411f8fed25fedbfbf3dc114eb6f0dc6df" dependencies = [ "libc", - "mio", + "mio 0.8.11", ] [[package]] @@ -200,13 +221,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", "itoa", "matchit", "memchr", @@ -215,7 +236,34 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", + "sync_wrapper 0.1.2", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core 0.4.3", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", "tower", "tower-layer", "tower-service", @@ -230,14 +278,34 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.12", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -265,6 +333,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bindgen" version = "0.66.1" @@ -280,7 +354,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn", ] @@ -322,6 +396,15 @@ dependencies = [ "constant_time_eq", ] +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bloom" version = "0.3.2" @@ -375,11 +458,17 @@ version = "3.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" -version = "1.6.0" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "cast" @@ -397,6 +486,12 @@ dependencies = [ "libc", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cexpr" version = "0.6.0" @@ -412,6 +507,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.52.4", +] + [[package]] name = "chunked_transfer" version = "1.5.0" @@ -509,6 +618,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "common" version = "0.3.2" @@ -554,6 +673,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "cpufeatures" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +dependencies = [ + "libc", +] + [[package]] name = "criterion" version = "0.5.1" @@ -639,6 +767,22 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "data-encoding" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" + [[package]] name = "datatier" version = "0.1.0" @@ -659,12 +803,31 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "either" version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "entrystore" version = "0.3.2" @@ -700,6 +863,12 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "fnv" version = "1.0.7" @@ -748,6 +917,15 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa9a19cbb55df58761df49b23516a86d432839add4af60fc256da840f66ed35b" +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -859,6 +1037,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.12" @@ -893,7 +1081,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.6", "slab", "tokio", @@ -901,6 +1089,53 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h3" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e7675a0963b47a6d12fe44c279918b4ffb19baee838ac37f48d2722ad5bc6ab" +dependencies = [ + "bytes", + "fastrand", + "futures-util", + "http 1.1.0", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "h3-quinn" +version = "0.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c799f413fceeea505236c4d8132f084ff4b55a652288d91439ee93dc24d855" +dependencies = [ + "bytes", + "futures", + "h3", + "quinn", + "tokio", + "tokio-util", +] + [[package]] name = "half" version = "2.4.0" @@ -923,6 +1158,36 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -949,6 +1214,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -956,7 +1232,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -972,6 +1271,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.28" @@ -982,9 +1287,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -996,18 +1301,105 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-timeout" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.28", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.4.1", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", +] + +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -1063,6 +1455,26 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jni" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" +dependencies = [ + "cesu8", + "combine", + "jni-sys", + "log", + "thiserror", + "walkdir", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.28" @@ -1243,6 +1655,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1270,6 +1692,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "momento" version = "0.32.1" @@ -1278,8 +1712,8 @@ checksum = "28ff818ee9c88dd6488b888f3f7fa16fdc63f980f564e8083c5a21ce5cfc1cb3" dependencies = [ "base64 0.21.7", "futures", - "h2", - "hyper", + "h2 0.3.26", + "hyper 0.14.28", "jsonwebtoken", "log", "momento-protos", @@ -1287,7 +1721,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "tonic", + "tonic 0.10.2", "zstd", ] @@ -1297,8 +1731,8 @@ version = "0.84.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45a32a52eedb4cb8ab09d6a46c0bd6091fcee90896068129c0b61083a8f00abd" dependencies = [ - "prost", - "tonic", + "prost 0.12.3", + "tonic 0.10.2", ] [[package]] @@ -1330,6 +1764,30 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf78b1242a953be96e01b5f8ed8ffdfc8055c0a2b779899b3835e5d27a69dced" +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "nom" version = "7.1.3" @@ -1375,16 +1833,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "object" version = "0.32.2" @@ -1499,11 +1947,47 @@ dependencies = [ "foreign-types-shared 0.3.1", "libc", "metriken", - "mio", + "mio 0.8.11", "openssl", "openssl-sys", ] +[[package]] +name = "pelikan-pingserver" +version = "0.3.2" +dependencies = [ + "backtrace", + "bytes", + "chrono", + "clap", + "common", + "config", + "entrystore", + "h2 0.4.6", + "h3", + "h3-quinn", + "http 1.1.0", + "http-body-util", + "humantime", + "hyper 1.4.1", + "logger", + "metriken", + "pin-project", + "prost 0.13.2", + "protocol-ping", + "quinn", + "rustls 0.23.13", + "rustls-native-certs 0.8.0", + "serde", + "server", + "session", + "tokio", + "toml", + "tonic 0.12.2", + "tonic-build", + "warp", +] + [[package]] name = "pelikan-segcache" version = "0.3.2" @@ -1535,6 +2019,16 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.2.6", +] + [[package]] name = "phf" version = "0.11.2" @@ -1623,22 +2117,6 @@ dependencies = [ "proxy", ] -[[package]] -name = "pingserver" -version = "0.3.2" -dependencies = [ - "backtrace", - "clap", - "common", - "config", - "criterion", - "entrystore", - "logger", - "metriken", - "protocol-ping", - "server", -] - [[package]] name = "pkg-config" version = "0.3.30" @@ -1685,6 +2163,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -1710,7 +2198,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.3", +] + +[[package]] +name = "prost" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2ecbe40f08db5c006b5764a2645f7f3f141ce756412ac9e1dd6087e6d32995" +dependencies = [ + "bytes", + "prost-derive 0.13.2", +] + +[[package]] +name = "prost-build" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" +dependencies = [ + "bytes", + "heck", + "itertools 0.11.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost 0.13.2", + "prost-types", + "regex", + "syn", + "tempfile", ] [[package]] @@ -1726,6 +2245,28 @@ dependencies = [ "syn", ] +[[package]] +name = "prost-derive" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acf0c195eebb4af52c752bec4f52f645da98b6e92077a04110c7f349477ae5ac" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +dependencies = [ + "prost 0.13.2", +] + [[package]] name = "protocol-admin" version = "0.3.2" @@ -1838,6 +2379,56 @@ dependencies = [ "switchboard", ] +[[package]] +name = "quinn" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" +dependencies = [ + "bytes", + "futures-io", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash 2.0.0", + "rustls 0.23.13", + "socket2", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +dependencies = [ + "bytes", + "rand", + "ring 0.17.8", + "rustc-hash 2.0.0", + "rustls 0.23.13", + "rustls-platform-verifier", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" +dependencies = [ + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.35" @@ -2021,6 +2612,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rustix" version = "0.38.32" @@ -2042,10 +2639,25 @@ checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" dependencies = [ "log", "ring 0.17.8", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.23.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" +dependencies = [ + "log", + "once_cell", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -2053,7 +2665,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.3", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.3", + "rustls-pki-types", "schannel", "security-framework", ] @@ -2067,6 +2705,49 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" + +[[package]] +name = "rustls-platform-verifier" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93bda3f493b9abe5b93b3e7e3ecde0df292f2bd28c0296b90586ee0055ff5123" +dependencies = [ + "core-foundation", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls 0.23.13", + "rustls-native-certs 0.7.1", + "rustls-platform-verifier-android", + "rustls-webpki 0.102.8", + "security-framework", + "security-framework-sys", + "webpki-roots 0.26.3", + "winapi", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2077,6 +2758,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring 0.17.8", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -2107,6 +2799,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -2133,6 +2831,7 @@ dependencies = [ "core-foundation", "core-foundation-sys", "libc", + "num-bigint", "security-framework-sys", ] @@ -2202,6 +2901,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "server" version = "0.3.2" @@ -2236,6 +2947,17 @@ dependencies = [ "protocol-common", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2332,6 +3054,12 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "switchboard" version = "0.2.1" @@ -2361,6 +3089,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "tap" version = "1.0.1" @@ -2466,23 +3200,37 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c55115c6fbe2d2bef26eb09ad74bde02d8255476fc0c7b515ef09fbb35742d82" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" -version = "1.37.0" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -2497,9 +3245,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -2512,7 +3260,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.11", "tokio", ] @@ -2527,6 +3275,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -2594,20 +3354,20 @@ checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", - "h2", - "http", - "http-body", - "hyper", - "hyper-timeout", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", - "prost", - "rustls", - "rustls-native-certs", - "rustls-pemfile", + "prost 0.12.3", + "rustls 0.21.11", + "rustls-native-certs 0.6.3", + "rustls-pemfile 1.0.4", "tokio", "tokio-rustls", "tokio-stream", @@ -2615,7 +3375,50 @@ dependencies = [ "tower-layer", "tower-service", "tracing", - "webpki-roots", + "webpki-roots 0.25.4", +] + +[[package]] +name = "tonic" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6f6ba989e4b2c58ae83d862d3a3e27690b6e3ae630d0deb59f3697f32aa88ad" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.5", + "base64 0.22.1", + "bytes", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.1", + "hyper-timeout 0.5.1", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.2", + "socket2", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe4ee8877250136bd7e3d2331632810a4df4ea5e004656990d8d66d2f5ee8a67" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", ] [[package]] @@ -2656,6 +3459,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2687,6 +3491,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.3" @@ -2697,12 +3520,42 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + +[[package]] +name = "unicode-bidi" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +dependencies = [ + "tinyvec", +] + [[package]] name = "untrusted" version = "0.7.1" @@ -2715,12 +3568,29 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "url" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + [[package]] name = "urlencoding" version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" @@ -2758,6 +3628,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper 0.14.28", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2834,6 +3733,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2865,6 +3773,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.4", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -3044,6 +3961,12 @@ dependencies = [ "syn", ] +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + [[package]] name = "zstd" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index 80ce882f..0dbe361d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ blake3 = "1.5.0" boring = "3.1.0" boring-sys = "3.1.0" bstr = "1.7.0" -bytes = "1.5.0" +bytes = "1.7.2" clap = "4.4.6" clocksource = "0.8.1" crossbeam-channel = "0.5.8" diff --git a/README.md b/README.md index a43a8bdb..141af401 100644 --- a/README.md +++ b/README.md @@ -51,8 +51,10 @@ Pelikan contains the following products: storage, a TTL-centric design offering extremely high memory efficiency and excellent core scalability. See our [NSDI'21 paper] for design and evaluation details. -- `pelikan_pingserver_rs`: an over-engineered, production-ready ping server - useful as a tutorial and for measuring baseline RPC performance +- `pelikan_pingserver`: an over-engineered, production-ready ping server which + is useful as a tutorial and for measuring baseline RPC performance. It + supports multiple protocols and application transports to allow comparing the + performance of different protocols, transports, and implementations. - [`momento_proxy`][momento_proxy-url]: a proxy which allows existing applications to use Momento instead of a Memcache-compatible cache backend. diff --git a/config/pingserver.toml b/config/pingserver.toml index adca0a6e..04a2a28d 100644 --- a/config/pingserver.toml +++ b/config/pingserver.toml @@ -1,5 +1,20 @@ daemonize = false +[general] +# choose between 'mio' and 'tokio' +engine = "mio" +# choose between: +# * 'ascii' - a text based protocol with optional TLS +# * 'grpc' - a simple gRPC implementation using `tonic` +# * 'http2' - a gRPC implementation that is manually implemented on HTTP2 +# * 'http3' - a gRPC implementation that is manually implemented on HTTP3 (QUIC) +# +# Please note: 'mio' engine currently only supports 'ascii' protocol +protocol = "ascii" + +[metrics] +interval = "1s" + [admin] # interfaces listening on host = "0.0.0.0" diff --git a/src/server/pingserver/Cargo.toml b/src/server/pingserver/Cargo.toml index 0ce74156..cf9cef33 100644 --- a/src/server/pingserver/Cargo.toml +++ b/src/server/pingserver/Cargo.toml @@ -1,7 +1,8 @@ [package] -name = "pingserver" -description = "a simple ascii ping/pong server" +name = "pelikan-pingserver" +description = "a ping/pong server with multiple protocol support" authors = ["Brian Martin "] +build = "build.rs" version = { workspace = true } edition = { workspace = true } @@ -9,36 +10,45 @@ homepage = { workspace = true } repository = { workspace = true } license = { workspace = true } -[lib] -name = "pelikan_pingserver_rs" -path = "src/lib.rs" -doc = true - [[bin]] -name = "pelikan_pingserver_rs" +name = "pelikan_pingserver" path = "src/main.rs" doc = false -[[test]] -name = "integration" -path = "tests/integration.rs" -harness = false - -[[bench]] -name = "benchmark" -path = "benches/benchmark.rs" -harness = false - [dependencies] backtrace = { workspace = true } +bytes = { workspace = true } +chrono = "0.4.38" clap = { workspace = true } common = { path = "../../common" } config = { path = "../../config" } entrystore = { path = "../../entrystore" } +humantime = "2.1.0" +h2 = "0.4.6" +h3 = "0.0.6" +h3-quinn = "0.0.7" +http = "1.1.0" +http-body-util = "0.1.2" +hyper = { version = "1.4.1", features = ["http1", "http2", "server"] } logger = { path = "../../logger" } metriken = { workspace = true } +pin-project = "1.1.5" +prost = "0.13.2" protocol-ping = { path = "../../protocol/ping", features = ["server"] } -server = { path = "../../core/server", features = ["boringssl"] } +quinn = "0.11.5" +rustls = { version = "0.23.13", default-features = false, features = [ + "logging", + "ring", + "std", +] } +rustls-native-certs = "0.8.0" +serde = { workspace = true, features = ["derive"] } +server = { path = "../../core/server" } +session = { path = "../../session" } +tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] } +toml = { workspace = true } +tonic = { version = "0.12.2" } +warp = "0.3.7" -[dev-dependencies] -criterion = "0.5.1" +[build-dependencies] +tonic-build = "0.12.2" diff --git a/src/server/pingserver/benches/benchmark.rs b/src/server/pingserver/benches/benchmark.rs deleted file mode 100644 index eaa8573a..00000000 --- a/src/server/pingserver/benches/benchmark.rs +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! This is a very basic benchmark which tests the performance of the server. -//! It's only using one connection and a very primitive blocking client, so -//! these results do not reflect the true performance of the server when under -//! load. It can be used to get a rough idea of how changes may impact -//! performance. -//! -//! For formal performance testing, it is recommended to use -//! [rpc-perf](https://github.com/twitter/rpc-perf). - -use config::PingserverConfig; -use criterion::{criterion_group, criterion_main, Criterion, Throughput}; -use pelikan_pingserver_rs::Pingserver; - -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::time::Duration; - -fn ping_benchmark(c: &mut Criterion) { - let config = PingserverConfig::default(); - - // launch the server - let server = Pingserver::new(config).expect("failed to launch pingserver"); - - // wait for server to startup. duration is chosen to be longer than we'd - // expect startup to take in a slow ci environment. - std::thread::sleep(Duration::from_secs(10)); - - // connect and initialize an empty buffer - let mut stream = TcpStream::connect("127.0.0.1:12321").expect("failed to connect"); - let mut buffer = vec![0; 1024 * 1024]; - - // define a benchmarking group - let mut group = c.benchmark_group("request"); - group.throughput(Throughput::Elements(1)); - - let msg = b"ping\r\n"; - let bench_name = "ping"; - - group.bench_function(bench_name, |b| { - b.iter(|| { - assert!(stream.write_all(msg).is_ok()); - if let Ok(bytes) = stream.read(&mut buffer) { - if &buffer[0..bytes] != b"PONG\r\n" { - panic!("invalid response"); - } - } else { - panic!("read error"); - } - }) - }); - - // shutdown the server - server.shutdown(); -} - -criterion_group!(benches, ping_benchmark); -criterion_main!(benches); diff --git a/src/server/pingserver/build.rs b/src/server/pingserver/build.rs new file mode 100644 index 00000000..d0cd8d7d --- /dev/null +++ b/src/server/pingserver/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/pingpong.proto")?; + Ok(()) +} diff --git a/src/server/pingserver/proto/pingpong.proto b/src/server/pingserver/proto/pingpong.proto new file mode 100644 index 00000000..8e6f0d88 --- /dev/null +++ b/src/server/pingserver/proto/pingpong.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package pingpong; + +service Ping { + rpc Ping (PingRequest) returns (PongResponse); +} + +message PingRequest { +} + +message PongResponse { +} diff --git a/src/server/pingserver/src/config.rs b/src/server/pingserver/src/config.rs new file mode 100644 index 00000000..d192be9c --- /dev/null +++ b/src/server/pingserver/src/config.rs @@ -0,0 +1,195 @@ +use config::*; + +use serde::{Deserialize, Serialize}; + +use std::io::Read; +use std::net::SocketAddr; +use std::time::Duration; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Config { + pub general: General, + pub metrics: Metrics, + + // application modules + #[serde(default)] + pub admin: Admin, + #[serde(default)] + pub server: Server, + #[serde(default)] + pub worker: Worker, + #[serde(default)] + pub time: Time, + #[serde(default)] + pub tls: Tls, + + // ccommon + #[serde(default)] + pub buf: Buf, + #[serde(default)] + pub debug: Debug, + #[serde(default)] + pub klog: Klog, + #[serde(default)] + pub sockio: Sockio, + #[serde(default)] + pub tcp: Tcp, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct General { + pub engine: Engine, + pub protocol: Protocol, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct Metrics { + #[serde(default = "interval")] + pub interval: String, +} + +impl Metrics { + pub fn interval(&self) -> Duration { + self.interval.parse::().unwrap().into() + } +} + +fn interval() -> String { + "1s".into() +} + +#[derive(Serialize, Deserialize, Debug, Default, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum Engine { + #[default] + Mio, + Tokio, +} + +#[derive(Serialize, Deserialize, Debug, Default, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum Protocol { + #[default] + Ascii, + Grpc, + Http2, + Http3, +} + +impl Config { + pub fn load(file: &str) -> Result { + let mut file = std::fs::File::open(file)?; + let mut content = String::new(); + file.read_to_string(&mut content)?; + + let config: Config = match toml::from_str(&content) { + Ok(t) => t, + Err(e) => { + error!("{}", e); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Error parsing config", + )); + } + }; + + if config.general.protocol == Protocol::Grpc && config.general.engine == Engine::Mio { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "GRPC support requires using the Tokio engine", + )); + } + + match config.metrics.interval.parse::() { + Ok(interval) => { + if Into::::into(interval) < Duration::from_millis(10) { + eprintln!("metrics interval cannot be less than 10ms"); + std::process::exit(1); + } + } + Err(e) => { + eprintln!("metrics interval is not valid: {e}"); + std::process::exit(1); + } + } + + Ok(config) + } + + pub fn listen(&self) -> SocketAddr { + self.server + .socket_addr() + .map_err(|e| { + error!("{}", e); + std::io::Error::new(std::io::ErrorKind::Other, "Bad listen address") + }) + .map_err(|_| { + std::process::exit(1); + }) + .unwrap() + } +} + +impl AdminConfig for Config { + fn admin(&self) -> &Admin { + &self.admin + } +} + +impl BufConfig for Config { + fn buf(&self) -> &Buf { + &self.buf + } +} + +impl DebugConfig for Config { + fn debug(&self) -> &Debug { + &self.debug + } +} + +impl KlogConfig for Config { + fn klog(&self) -> &Klog { + &self.klog + } +} + +impl ServerConfig for Config { + fn server(&self) -> &Server { + &self.server + } +} + +impl SockioConfig for Config { + fn sockio(&self) -> &Sockio { + &self.sockio + } +} + +impl TcpConfig for Config { + fn tcp(&self) -> &Tcp { + &self.tcp + } +} + +impl TimeConfig for Config { + fn time(&self) -> &Time { + &self.time + } +} + +impl TlsConfig for Config { + fn tls(&self) -> &Tls { + &self.tls + } +} + +impl WorkerConfig for Config { + fn worker(&self) -> &Worker { + &self.worker + } + + fn worker_mut(&mut self) -> &mut Worker { + &mut self.worker + } +} diff --git a/src/server/pingserver/src/lib.rs b/src/server/pingserver/src/lib.rs deleted file mode 100644 index ac9c51bc..00000000 --- a/src/server/pingserver/src/lib.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! Pingserver is an implementation of a basic backend service which responds to -//! each `PING` with a `PONG`. This allows for testing of the core components -//! without the overheads associated with managing any state. -//! -//! This library is primarily used for automated testing. Users should prefer to -//! run the Pingserver binary provided by this crate. - -use config::*; -use entrystore::Noop; -use logger::*; -use protocol_ping::{Request, RequestParser, Response}; -use server::{Process, ProcessBuilder}; - -type Parser = RequestParser; -type Storage = Noop; - -/// This structure represents a running `Pingserver` process. -pub struct Pingserver { - process: Process, -} - -impl Pingserver { - /// Creates a new `Pingserver` process from the given `PingserverConfig`. - pub fn new(config: PingserverConfig) -> Result { - // initialize logging - let log_drain = configure_logging(&config); - - // initialize metrics - common::metrics::init(); - - // initialize storage - let storage = Storage::new(); - - // initialize parser - let parser = Parser::new(); - - // initialize process - let process_builder = ProcessBuilder::::new( - &config, log_drain, parser, storage, - )? - .version(env!("CARGO_PKG_VERSION")); - - // spawn threads - let process = process_builder.spawn(); - - Ok(Self { process }) - } - - /// Wait for all threads to complete. Blocks until the process has fully - /// terminated. Under normal conditions, this will block indefinitely. - pub fn wait(self) { - self.process.wait() - } - - /// Triggers a shutdown of the process and blocks until the process has - /// fully terminated. This is more likely to be used for running integration - /// tests or other automated testing. - pub fn shutdown(self) { - self.process.shutdown() - } -} - -common::metrics::test_no_duplicates!(); diff --git a/src/server/pingserver/src/main.rs b/src/server/pingserver/src/main.rs index 4b861965..95e3ab33 100644 --- a/src/server/pingserver/src/main.rs +++ b/src/server/pingserver/src/main.rs @@ -1,26 +1,28 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! Pingserver is an implementation of a basic backend service which responds to -//! each `PING` with a `PONG`. This allows for testing of the core components -//! without the overheads associated with managing any state. -//! -//! Running this binary is the primary way of using Pingserver. - #[macro_use] extern crate logger; +use config::{Config, Engine}; + +use entrystore::Noop; +use logger::{configure_logging, Drain}; +use protocol_ping::{Request, RequestParser, Response}; +use server::{ProcessBuilder, PERCENTILES}; + use backtrace::Backtrace; use clap::{Arg, Command}; -use config::PingserverConfig; use metriken::*; -use pelikan_pingserver_rs::Pingserver; -use server::PERCENTILES; -/// The entry point into the running Pingserver instance. This function parses -/// parses the command line options, loads the configuration, and launches the -/// core threads. +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; + +type Parser = RequestParser; +type Storage = Noop; + +mod config; +mod tokio; + +static RUNNING: AtomicBool = AtomicBool::new(true); + fn main() { // custom panic hook to terminate whole process after unwinding std::panic::set_hook(Box::new(|s| { @@ -41,6 +43,12 @@ fn main() { configurability, and other valuable traits in a typical production \ environment.", ) + .arg( + Arg::new("CONFIG") + .help("Server configuration file") + .action(clap::ArgAction::Set) + .index(1), + ) .arg( Arg::new("stats") .short('s') @@ -48,12 +56,6 @@ fn main() { .help("List all metrics in stats") .action(clap::ArgAction::SetTrue), ) - .arg( - Arg::new("CONFIG") - .help("Server configuration file") - .action(clap::ArgAction::Set) - .index(1), - ) .get_matches(); if matches.get_flag("stats") { @@ -95,7 +97,7 @@ fn main() { // load config from file let config = if let Some(file) = matches.get_one::("CONFIG") { debug!("loading config: {}", file); - match PingserverConfig::load(file) { + match Config::load(file) { Ok(c) => c, Err(error) => { eprintln!("error loading config file: {file}\n{error}"); @@ -106,12 +108,31 @@ fn main() { Default::default() }; - // launch - match Pingserver::new(config) { - Ok(s) => s.wait(), - Err(e) => { - eprintln!("error launching pingserver: {e}"); - std::process::exit(1); + // initialize logging + let log = configure_logging(&config); + + // initialize metrics + common::metrics::init(); + + // launch the server + match config.general.engine { + Engine::Mio => { + // initialize storage + let storage = Storage::new(); + + // initialize parser + let parser = Parser::new(); + + // initialize process + let process_builder = ProcessBuilder::::new( + &config, log, parser, storage, + ) + .expect("failed to initialize process"); + + // spawn threads + let process = process_builder.spawn(); + process.wait(); } + Engine::Tokio => tokio::spawn(config, log), } } diff --git a/src/server/pingserver/src/tokio/admin.rs b/src/server/pingserver/src/tokio/admin.rs new file mode 100644 index 00000000..985cdbf3 --- /dev/null +++ b/src/server/pingserver/src/tokio/admin.rs @@ -0,0 +1,233 @@ +use crate::tokio::METRICS_SNAPSHOT; +use crate::*; + +use ::config::AdminConfig; + +use metriken::Value; + +use std::net::ToSocketAddrs; +use std::sync::Arc; + +/// The HTTP admin server. +pub async fn http(config: Arc) { + let admin = filters::admin(); + + let addr = format!("{}:{}", config.admin().host(), config.admin().port()); + + let addr = addr + .to_socket_addrs() + .expect("bad listen address") + .next() + .expect("couldn't determine listen address"); + + warp::serve(admin).run(addr).await; +} + +mod filters { + use super::*; + use warp::Filter; + + /// The combined set of admin endpoint filters + pub fn admin() -> impl Filter + Clone { + prometheus_stats().or(human_stats()).or(json_stats()) + } + + /// Serves Prometheus / OpenMetrics text format metrics. + /// + /// GET /metrics + pub fn prometheus_stats( + ) -> impl Filter + Clone { + warp::path!("metrics") + .and(warp::get()) + .and_then(handlers::prometheus_stats) + } + + /// Serves a human readable metrics output. + /// + /// GET /vars + pub fn human_stats( + ) -> impl Filter + Clone { + warp::path!("vars") + .and(warp::get()) + .and_then(handlers::human_stats) + } + + /// Serves JSON metrics output that is compatible with Twitter Server / + /// Finagle metrics endpoints. Multiple paths are provided for enhanced + /// compatibility with metrics collectors. + /// + /// GET /metrics.json + /// GET /vars.json + /// GET /admin/metrics.json + pub fn json_stats( + ) -> impl Filter + Clone { + warp::path!("metrics.json") + .and(warp::get()) + .and_then(handlers::json_stats) + .or(warp::path!("vars.json") + .and(warp::get()) + .and_then(handlers::json_stats)) + .or(warp::path!("admin" / "metrics.json") + .and(warp::get()) + .and_then(handlers::json_stats)) + } +} + +pub mod handlers { + + use super::*; + use core::convert::Infallible; + use std::time::UNIX_EPOCH; + + /// Serves Prometheus / OpenMetrics text format metrics. All metrics have + /// type information, some have descriptions as well. Percentiles read from + /// heatmaps are exposed with a `percentile` label where the value + /// corresponds to the percentile in the range of 0.0 - 100.0. + /// + /// See: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md + /// + /// ```text + /// # TYPE some_counter counter + /// # HELP some_counter An unsigned 64bit monotonic counter. + /// counter 0 + /// # TYPE some_gauge gauge + /// # HELP some_gauge A signed 64bit gauge. + /// some_gauge 0 + /// # TYPE some_distribution{percentile="50.0"} gauge + /// some_distribution{percentile="50.0"} 0 + /// ``` + pub async fn prometheus_stats() -> Result { + let mut data = Vec::new(); + + let metrics_snapshot = METRICS_SNAPSHOT.read().await; + + let timestamp = metrics_snapshot + .current + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + + for metric in &metriken::metrics() { + if metric.name().starts_with("log_") { + continue; + } + + let name = metric.name(); + + match metric.value() { + Some(Value::Counter(value)) => { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} counter\n# HELP {name} {description}\n{name} {value}" + )); + } else { + data.push(format!("# TYPE {name} counter\n{name} {value}")); + } + } + Some(Value::Gauge(value)) => { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} gauge\n# HELP {name} {description}\n{name} {value}" + )); + } else { + data.push(format!("# TYPE {name} gauge\n{name} {value}")); + } + } + Some(Value::Other(_)) => { + let percentiles = metrics_snapshot.percentiles(metric.name()); + + for (_label, percentile, value) in percentiles { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); + } else { + data.push(format!( + "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); + } + } + } + _ => continue, + } + } + + data.sort(); + let mut content = data.join("\n"); + content += "\n"; + let parts: Vec<&str> = content.split('/').collect(); + Ok(parts.join("_")) + } + + /// Serves JSON formatted metrics following the conventions of Finagle / + /// TwitterServer. Percentiles read from heatmaps will have a percentile + /// label appended to the metric name in the form `/p999` which would be the + /// 99.9th percentile. + /// + /// ```text + /// {"get/ok": 0,"client/request/p999": 0, ... } + /// ``` + pub async fn json_stats() -> Result { + let data = human_formatted_stats().await; + + let mut content = "{".to_string(); + content += &data.join(","); + content += "}"; + + Ok(content) + } + + /// Serves human readable stats. One metric per line with a `LF` as the + /// newline character (Unix-style). Percentiles will have percentile labels + /// appened with a `/` as a separator. + /// + /// ``` + /// get/ok: 0 + /// client/request/latency/p50: 0, + /// ``` + pub async fn human_stats() -> Result { + let data = human_formatted_stats().await; + + let mut content = data.join("\n"); + content += "\n"; + Ok(content) + } +} + +// human formatted stats that can be exposed as human stats or converted to json +pub async fn human_formatted_stats() -> Vec { + let mut data = Vec::new(); + + let metrics_snapshot = METRICS_SNAPSHOT.read().await; + + for metric in &metriken::metrics() { + if metric.name().starts_with("log_") { + continue; + } + + let name = metric.name(); + + match metric.value() { + Some(Value::Counter(value)) => { + data.push(format!("\"{name}\": {value}")); + } + Some(Value::Gauge(value)) => { + data.push(format!("\"{name}\": {value}")); + } + Some(Value::Other(_)) => { + let percentiles = metrics_snapshot.percentiles(metric.name()); + + for (label, _percentile, value) in percentiles { + data.push(format!("\"{name}/{label}\": {value}",)); + } + } + _ => continue, + } + } + + data.sort(); + + data +} diff --git a/src/server/pingserver/src/tokio/ascii.rs b/src/server/pingserver/src/tokio/ascii.rs new file mode 100644 index 00000000..89f56a84 --- /dev/null +++ b/src/server/pingserver/src/tokio/ascii.rs @@ -0,0 +1,96 @@ +use crate::Config; + +use ::config::BufConfig; +use protocol_ping::{Compose, Parse, Request, Response}; +use session::{Buf, BufMut, Buffer}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +use std::borrow::{Borrow, BorrowMut}; +use std::io::ErrorKind; +use std::sync::Arc; + +pub async fn run(config: Arc) { + let listener = std::net::TcpListener::bind(config.listen()).unwrap(); + let listener = TcpListener::from_std(listener).unwrap(); + + loop { + if let Ok((mut socket, _)) = listener.accept().await { + if socket.set_nodelay(true).is_err() { + continue; + } + + let buf_size = config.buf().size(); + + tokio::spawn(async move { + // initialize parser and the read and write bufs + let parser = protocol_ping::RequestParser::new(); + let mut read_buffer = Buffer::new(buf_size); + let mut write_buffer = Buffer::new(buf_size); + + loop { + // read from the socket + match socket.read(read_buffer.borrow_mut()).await { + Ok(0) => { + // socket was closed, return to close + return; + } + Ok(n) => { + // bytes received, advance read buffer + // to make them available for parsing + unsafe { + read_buffer.advance_mut(n); + } + } + Err(_) => { + // some other error occurred, return to + // close + return; + } + }; + + // parse the read buffer + let request = match parser.parse(read_buffer.borrow()) { + Ok(request) => { + // got a complete request, consume the + // bytes for the request by advancing + // the read buffer + let consumed = request.consumed(); + read_buffer.advance(consumed); + + request + } + Err(e) => match e.kind() { + ErrorKind::WouldBlock => { + // incomplete request, loop to read + // again + continue; + } + _ => { + // some parse error, return to close + return; + } + }, + }; + + // compose a response into the write buffer + match request.into_inner() { + Request::Ping => { + Response::Pong.compose(&mut write_buffer); + } + } + + // flush the write buffer, return to close on + // error + if socket.write_all(write_buffer.borrow()).await.is_err() { + return; + } + + // clear the write buffer + write_buffer.clear(); + } + }); + } + } +} diff --git a/src/server/pingserver/src/tokio/grpc.rs b/src/server/pingserver/src/tokio/grpc.rs new file mode 100644 index 00000000..cad2319a --- /dev/null +++ b/src/server/pingserver/src/tokio/grpc.rs @@ -0,0 +1,41 @@ +use crate::{Config, RUNNING}; + +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use tonic::transport::Server as TonicServer; +use tonic::{Request as TonicRequest, Response as TonicResponse, Status as TonicStatus}; + +pub mod pingpong { + tonic::include_proto!("pingpong"); +} + +use pingpong::ping_server::{Ping, PingServer}; +use pingpong::{PingRequest, PongResponse}; + +#[derive(Debug, Default)] +pub struct Server {} + +#[tonic::async_trait] +impl Ping for Server { + async fn ping( + &self, + _request: TonicRequest, + ) -> Result, TonicStatus> { + Ok(TonicResponse::new(PongResponse {})) + } +} + +pub async fn run(config: Arc) { + tokio::spawn(async move { + if let Err(e) = TonicServer::builder() + .add_service(PingServer::new(Server::default())) + .serve(config.listen()) + .await + { + error!("{e}"); + }; + + RUNNING.store(false, Ordering::Relaxed); + }); +} diff --git a/src/server/pingserver/src/tokio/http2.rs b/src/server/pingserver/src/tokio/http2.rs new file mode 100644 index 00000000..285e6bbf --- /dev/null +++ b/src/server/pingserver/src/tokio/http2.rs @@ -0,0 +1,106 @@ +use crate::Config; + +use session::REQUEST_LATENCY; + +use bytes::BytesMut; +use chrono::Utc; +use http::{HeaderMap, HeaderValue, Version}; +use tokio::net::TcpListener; + +use std::sync::Arc; +use std::time::Instant; + +pub async fn run(config: Arc) { + let listener = TcpListener::bind(config.listen()).await.unwrap(); + + loop { + if let Ok((stream, _)) = listener.accept().await { + let _ = stream.set_nodelay(true).is_err(); + + tokio::task::spawn(async move { + match ::h2::server::handshake(stream).await { + Ok(mut conn) => { + loop { + match conn.accept().await { + Some(Ok((request, mut sender))) => { + let start = Instant::now(); + + tokio::spawn(async move { + let (_parts, mut body) = request.into_parts(); + + let mut content = BytesMut::new(); + + // receive all request body content + while let Some(data) = body.data().await { + if data.is_err() { + // TODO(bmartin): increment error stats + return; + } + + let data = data.unwrap(); + + content.extend_from_slice(&data); + let _ = + body.flow_control().release_capacity(data.len()); + } + + // we don't need the trailers, but read them here + if body.trailers().await.is_err() { + // TODO(bmartin): increment error stats + return; + } + + let mut date = + HeaderValue::from_str(&Utc::now().to_rfc2822()) + .unwrap(); + date.set_sensitive(true); + + // build our response + let response = http::response::Builder::new() + .status(200) + .version(Version::HTTP_2) + .header("content-type", "application/grpc") + .header("date", date) + .body(()) + .unwrap(); + + let content = BytesMut::zeroed(5); + + let mut trailers = HeaderMap::new(); + trailers.append("grpc-status", 0.into()); + + // send the response + if let Ok(mut stream) = + sender.send_response(response, false) + { + if stream.send_data(content.into(), false).is_ok() + && stream.send_trailers(trailers).is_ok() + { + let stop = Instant::now(); + let latency = stop.duration_since(start).as_nanos(); + + let _ = REQUEST_LATENCY.increment(latency as _); + } + } + + // TODO(bmartin): increment error stats + }); + } + Some(Err(e)) => { + eprintln!("error: {e}"); + break; + } + None => { + continue; + } + } + } + } + Err(e) => { + eprintln!("error during handshake: {e}"); + } + } + }); + } + } +} diff --git a/src/server/pingserver/src/tokio/http3.rs b/src/server/pingserver/src/tokio/http3.rs new file mode 100644 index 00000000..1f95ba77 --- /dev/null +++ b/src/server/pingserver/src/tokio/http3.rs @@ -0,0 +1,130 @@ +use crate::Config; + +use common::ssl::TlsConfig as TlsConfigTrait; +use config::TlsConfig; +use session::REQUEST_LATENCY; + +use bytes::{Buf, BytesMut}; +use chrono::Utc; +use http::{HeaderMap, HeaderValue, Version}; +use quinn::crypto::rustls::QuicServerConfig; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; + +use std::sync::Arc; +use std::time::Instant; + +pub async fn run(config: Arc) { + let cert_file = config + .tls() + .certificate() + .expect("no certificate configured"); + let cert_content = std::fs::read(cert_file).expect("failed to read cert"); + let cert = CertificateDer::from(cert_content); + + let key_file = config + .tls() + .private_key() + .expect("no private key configured"); + let key_content = std::fs::read(key_file).expect("failed to read private key"); + let key = PrivateKeyDer::try_from(key_content).expect("failed to load private key"); + + let mut tls_config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(vec![cert], key) + .expect("error configuring tls"); + + tls_config.max_early_data_size = u32::MAX; + tls_config.alpn_protocols = vec!["h3".into()]; + + let quic_config = QuicServerConfig::try_from(tls_config).expect("failed to configure quic"); + + let server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_config)); + let endpoint = quinn::Endpoint::server(server_config, config.listen()) + .expect("failed to start quic endpoint"); + + loop { + if let Some(incoming_conn) = endpoint.accept().await { + tokio::spawn(async move { + if let Ok(conn) = incoming_conn.await { + if let Ok(mut conn) = + h3::server::Connection::new(h3_quinn::Connection::new(conn)).await + { + loop { + match conn.accept().await { + Ok(Some((request, mut stream))) => { + let start = Instant::now(); + + tokio::spawn(async move { + let (_parts, _body) = request.into_parts(); + + let mut content = BytesMut::new(); + + while let Ok(data) = stream.recv_data().await { + if let Some(mut data) = data { + while data.has_remaining() { + let chunk: &[u8] = data.chunk(); + content.extend_from_slice(chunk); + data.advance(chunk.len()); + } + } else { + break; + } + } + + if let Ok(_trailers) = stream.recv_trailers().await { + let date = + HeaderValue::from_str(&Utc::now().to_rfc2822()) + .unwrap(); + + let response = http::response::Builder::new() + .status(200) + .version(Version::HTTP_3) + .header("content-type", "application/grpc") + .header("date", date) + .body(()) + .unwrap(); + + let content = BytesMut::zeroed(5); + + let mut trailers = HeaderMap::new(); + trailers.append("grpc-status", 0.into()); + + if stream.send_response(response).await.is_err() { + return; + } + + if stream.send_data(content).await.is_err() { + return; + } + + if stream.send_trailers(trailers).await.is_err() { + return; + } + + let stop = Instant::now(); + let latency = stop.duration_since(start).as_nanos(); + + let _ = REQUEST_LATENCY.increment(latency as _); + } + }); + } + Ok(None) => { + // break if no Request is accepted + break; + } + Err(err) => { + match err.get_error_level() { + // break on connection errors + h3::error::ErrorLevel::ConnectionError => break, + // continue on stream errors + h3::error::ErrorLevel::StreamError => continue, + } + } + } + } + } + } + }); + } + } +} diff --git a/src/server/pingserver/src/tokio/metrics.rs b/src/server/pingserver/src/tokio/metrics.rs new file mode 100644 index 00000000..cf8e951d --- /dev/null +++ b/src/server/pingserver/src/tokio/metrics.rs @@ -0,0 +1,198 @@ +use metriken::{histogram, AtomicHistogram, RwLockHistogram, Value}; + +use std::collections::HashMap; +use std::time::SystemTime; + +pub static PERCENTILES: &[(&str, f64)] = &[ + ("p25", 25.0), + ("p50", 50.0), + ("p75", 75.0), + ("p90", 90.0), + ("p99", 99.0), + ("p999", 99.9), + ("p9999", 99.99), +]; + +pub struct MetricsSnapshot { + pub current: SystemTime, + pub previous: SystemTime, + pub counters: CountersSnapshot, + pub histograms: HistogramsSnapshot, +} + +impl Default for MetricsSnapshot { + fn default() -> Self { + Self::new() + } +} + +impl MetricsSnapshot { + pub fn new() -> Self { + let now = SystemTime::now(); + + Self { + current: now, + previous: now, + counters: Default::default(), + histograms: Default::default(), + } + } + + pub fn update(&mut self) { + self.previous = self.current; + self.current = SystemTime::now(); + + self.counters.update(); + self.histograms.update(); + } + + pub fn percentiles(&self, name: &str) -> Vec<(String, f64, u64)> { + self.histograms.percentiles(name) + } +} + +pub struct HistogramsSnapshot { + pub previous: HashMap, + pub deltas: HashMap, +} + +impl Default for HistogramsSnapshot { + fn default() -> Self { + Self::new() + } +} + +impl HistogramsSnapshot { + pub fn new() -> Self { + let mut current = HashMap::new(); + + for metric in &metriken::metrics() { + match metric.value() { + Some(Value::Other(other)) => { + let histogram = if let Some(histogram) = other.downcast_ref::() + { + histogram.load() + } else if let Some(histogram) = other.downcast_ref::() { + histogram.load() + } else { + None + }; + + if let Some(histogram) = histogram { + current.insert(metric.name().to_string(), histogram); + } + } + _ => continue, + } + } + + let deltas = current.clone(); + + Self { + previous: current, + deltas, + } + } + + pub fn update(&mut self) { + for metric in &metriken::metrics() { + match metric.value() { + Some(Value::Other(other)) => { + let histogram = if let Some(histogram) = other.downcast_ref::() + { + histogram.load() + } else if let Some(histogram) = other.downcast_ref::() { + histogram.load() + } else { + None + }; + + if let Some(histogram) = histogram { + let name = metric.name().to_string(); + + if let Some(previous) = self.previous.get(&name) { + self.deltas + .insert(name.clone(), histogram.wrapping_sub(previous).unwrap()); + } + + self.previous.insert(name, histogram); + } + } + _ => continue, + } + } + } + + pub fn percentiles(&self, metric: &str) -> Vec<(String, f64, u64)> { + let mut result = Vec::new(); + + let percentiles: Vec = PERCENTILES + .iter() + .map(|(_, percentile)| *percentile) + .collect(); + + if let Some(snapshot) = self.deltas.get(metric) { + if let Ok(Some(percentiles)) = snapshot.percentiles(&percentiles) { + for ((label, _), (percentile, bucket)) in PERCENTILES.iter().zip(percentiles.iter()) + { + result.push((label.to_string(), *percentile, bucket.end())); + } + } + } + + result + } +} + +#[derive(Clone)] +pub struct CountersSnapshot { + pub current: HashMap, + pub previous: HashMap, +} + +impl Default for CountersSnapshot { + fn default() -> Self { + Self::new() + } +} + +impl CountersSnapshot { + pub fn new() -> Self { + let mut current = HashMap::new(); + let previous = HashMap::new(); + + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + let metric = metric.name().to_string(); + + if let Some(_counter) = any.downcast_ref::() { + current.insert(metric.clone(), 0); + } + } + Self { current, previous } + } + + pub fn update(&mut self) { + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + if let Some(counter) = any.downcast_ref::() { + if let Some(old_value) = self + .current + .insert(metric.name().to_string(), counter.value()) + { + self.previous.insert(metric.name().to_string(), old_value); + } + } + } + } +} diff --git a/src/server/pingserver/src/tokio/mod.rs b/src/server/pingserver/src/tokio/mod.rs new file mode 100644 index 00000000..f933804e --- /dev/null +++ b/src/server/pingserver/src/tokio/mod.rs @@ -0,0 +1,87 @@ +use crate::config::{Config, Protocol}; +use crate::*; + +use ::tokio::runtime::Builder; +use ::tokio::sync::RwLock; +use ::tokio::time::sleep; +use metriken::Lazy; + +use std::sync::Arc; + +mod admin; +mod ascii; +mod grpc; +mod http2; +mod http3; +mod metrics; + +static METRICS_SNAPSHOT: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(Default::default()))); + +pub fn spawn(config: Config, mut log: Box) { + let config = Arc::new(config); + + // initialize async runtime for control plane + let control_runtime = Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .build() + .expect("failed to initialize tokio runtime"); + + // spawn logging thread + control_runtime.spawn(async move { + while RUNNING.load(Ordering::Relaxed) { + sleep(Duration::from_millis(1)).await; + let _ = log.flush(); + } + let _ = log.flush(); + }); + + // spawn thread to maintain histogram snapshots + { + let interval = config.metrics.interval(); + control_runtime.spawn(async move { + while RUNNING.load(Ordering::Relaxed) { + // acquire a lock and update the snapshots + { + let mut snapshots = METRICS_SNAPSHOT.write().await; + snapshots.update(); + } + + // delay until next update + sleep(interval).await; + } + }); + } + + // spawn the admin thread + control_runtime.spawn(admin::http(config.clone())); + + // initialize async runtime for the data plane + let data_runtime = Builder::new_multi_thread() + .enable_all() + .worker_threads(config.worker.threads()) + .build() + .expect("failed to initialize tokio runtime"); + + match config.general.protocol { + Protocol::Ascii => { + data_runtime.block_on(async move { ascii::run(config).await }); + } + Protocol::Grpc => { + data_runtime.spawn(async move { grpc::run(config).await }); + } + Protocol::Http2 => { + data_runtime.spawn(async move { http2::run(config).await }); + } + Protocol::Http3 => { + data_runtime.spawn(async move { http3::run(config).await }); + } + } + + while RUNNING.load(Ordering::Relaxed) { + std::thread::sleep(Duration::from_millis(250)); + } + data_runtime.shutdown_timeout(std::time::Duration::from_millis(100)); + control_runtime.shutdown_timeout(std::time::Duration::from_millis(100)); +} diff --git a/src/server/pingserver/tests/integration.rs b/src/server/pingserver/tests/integration.rs deleted file mode 100644 index f4f58c94..00000000 --- a/src/server/pingserver/tests/integration.rs +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright 2021 Twitter, Inc. -// Licensed under the Apache License, Version 2.0 -// http://www.apache.org/licenses/LICENSE-2.0 - -//! A basic integration test suite to run against the Pingserver. - -#[macro_use] -extern crate logger; - -use config::PingserverConfig; -use pelikan_pingserver_rs::Pingserver; - -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::time::Duration; - -fn main() { - debug!("launching server"); - let server = Pingserver::new(PingserverConfig::default()).expect("failed to launch"); - - // wait for server to startup. duration is chosen to be longer than we'd - // expect startup to take in a slow ci environment. - std::thread::sleep(Duration::from_secs(10)); - - debug!("beginning tests"); - println!(); - - test("ping", &[("PING\r\n", Some("PONG\r\n"))]); - - admin_test( - "version", - &[( - "version\r\n", - Some(&format!("VERSION {}\r\n", env!("CARGO_PKG_VERSION"))), - )], - ); - - // shutdown server and join - info!("shutdown..."); - server.shutdown(); - info!("passed!"); -} - -// opens a new connection, operating on request + response pairs from the -// provided data. -fn test(name: &str, data: &[(&str, Option<&str>)]) { - info!("testing: {}", name); - debug!("connecting to server"); - let mut stream = TcpStream::connect("127.0.0.1:12321").expect("failed to connect"); - stream - .set_read_timeout(Some(Duration::from_millis(250))) - .expect("failed to set read timeout"); - stream - .set_write_timeout(Some(Duration::from_millis(250))) - .expect("failed to set write timeout"); - - debug!("sending request"); - for (request, response) in data { - match stream.write(request.as_bytes()) { - Ok(bytes) => { - if bytes == request.len() { - debug!("full request sent"); - } else { - error!("incomplete write"); - panic!("status: failed\n"); - } - } - Err(_) => { - error!("error sending request"); - panic!("status: failed\n"); - } - } - - std::thread::sleep(Duration::from_millis(10)); - let mut buf = vec![0; 4096]; - - if let Some(response) = response { - match stream.read(&mut buf) { - Err(e) => { - panic!("error reading response: {e}"); - } - Ok(_) => { - if response.as_bytes() != &buf[0..response.len()] { - error!("expected: {:?}", response.as_bytes()); - error!("received: {:?}", &buf[0..response.len()]); - panic!("status: failed\n"); - } else { - debug!("correct response"); - } - } - } - assert_eq!(response.as_bytes(), &buf[0..response.len()]); - } else if let Err(e) = stream.read(&mut buf) { - if e.kind() == std::io::ErrorKind::WouldBlock { - debug!("got no response"); - } else { - error!("error reading response"); - panic!("status: failed\n"); - } - } else { - error!("expected no response"); - panic!("status: failed\n"); - } - - if data.len() > 1 { - std::thread::sleep(Duration::from_millis(10)); - } - } - info!("status: passed\n"); -} - -// opens a new connection to the admin port, sends a request, and checks the response. -fn admin_test(name: &str, data: &[(&str, Option<&str>)]) { - info!("testing: {}", name); - debug!("connecting to server"); - let mut stream = TcpStream::connect("127.0.0.1:9999").expect("failed to connect"); - stream - .set_read_timeout(Some(Duration::from_millis(250))) - .expect("failed to set read timeout"); - stream - .set_write_timeout(Some(Duration::from_millis(250))) - .expect("failed to set write timeout"); - - debug!("sending request"); - for (request, response) in data { - match stream.write(request.as_bytes()) { - Ok(bytes) => { - if bytes == request.len() { - debug!("full request sent"); - } else { - error!("incomplete write"); - panic!("status: failed\n"); - } - } - Err(_) => { - error!("error sending request"); - panic!("status: failed\n"); - } - } - - std::thread::sleep(Duration::from_millis(10)); - let mut buf = vec![0; 4096]; - - if let Some(response) = response { - if stream.read(&mut buf).is_err() { - std::thread::sleep(Duration::from_millis(500)); - panic!("error reading response"); - } else if response.as_bytes() != &buf[0..response.len()] { - error!("expected: {:?}", response.as_bytes()); - error!("received: {:?}", &buf[0..response.len()]); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } else { - debug!("correct response"); - } - assert_eq!(response.as_bytes(), &buf[0..response.len()]); - } else if let Err(e) = stream.read(&mut buf) { - if e.kind() == std::io::ErrorKind::WouldBlock { - debug!("got no response"); - } else { - error!("error reading response"); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } - } else { - error!("expected no response"); - std::thread::sleep(Duration::from_millis(500)); - panic!("status: failed\n"); - } - - if data.len() > 1 { - std::thread::sleep(Duration::from_millis(10)); - } - } - info!("status: passed\n"); -}