From 0fecba80c83ff62be33509ef0ff1e5739179f317 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy <47869999+rkrishn7@users.noreply.github.com> Date: Fri, 15 Mar 2024 14:34:11 -0700 Subject: [PATCH] refactor: swap tokio-tungstenite for fastwebsockets (#97) --- Cargo.lock | 657 ++++++++++++++++++++++++--- src/kiwi/Cargo.toml | 8 +- src/kiwi/src/ws.rs | 346 ++++++++------ src/kiwi/tests/common/healthcheck.rs | 28 ++ src/kiwi/tests/common/mod.rs | 1 + src/kiwi/tests/common/ws.rs | 87 +++- src/kiwi/tests/kafka.rs | 37 +- 7 files changed, 932 insertions(+), 232 deletions(-) create mode 100644 src/kiwi/tests/common/healthcheck.rs diff --git a/Cargo.lock b/Cargo.lock index c3f673a..dd927da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,6 +140,61 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1236b4b292f6c4d6dc34604bb5120d85c3fe1d1aa596bd5cc52ca054d13e7b9e" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.2.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -367,6 +422,22 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" + [[package]] name = "cpp_demangle" version = "0.3.5" @@ -552,12 +623,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "data-encoding" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" - [[package]] name = "debugid" version = "0.8.0" @@ -662,6 +727,29 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "fastwebsockets" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5badf44f2ba24c260a59c17b44151134e6248903363bf838ab5196cebfa75be8" +dependencies = [ + "async-trait", + "axum-core", + "base64 0.21.5", + "bytes", + "http 1.0.0", + "http-body-util", + "hyper 1.2.0", + "hyper-util", + "pin-project", + "rand", + "sha1", + "simdutf8", + "thiserror", + "tokio", + "utf-8", +] + [[package]] name = "fd-lock" version = "4.0.0" @@ -691,6 +779,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.0" @@ -863,6 +966,25 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "h2" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.2" @@ -874,7 +996,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 1.0.0", "indexmap", "slab", "tokio", @@ -924,6 +1046,17 @@ dependencies = [ "digest", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.0.0" @@ -935,6 +1068,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.0" @@ -942,19 +1086,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http", + "http 1.0.0", ] [[package]] name = "http-body-util" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" dependencies = [ "bytes", - "futures-util", - "http", - "http-body", + "futures-core", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -972,24 +1116,78 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.1.0" +version = "0.14.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", + "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.24", + "http 0.2.12", + "http-body 0.4.6", "httparse", "httpdate", "itoa", "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", "tokio", "want", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.28", + "native-tls", + "tokio", + "tokio-native-tls", +] + +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.2.0", + "pin-project-lite", + "socket2", + "tokio", +] + [[package]] name = "id-arena" version = "2.2.1" @@ -1103,6 +1301,15 @@ dependencies = [ "libc", ] +[[package]] +name = "js-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "jwt" version = "0.16.0" @@ -1126,11 +1333,17 @@ dependencies = [ "arc-swap", "async-stream", "async-trait", + "axum", "base64 0.21.5", + "bytes", "clap", + "fastwebsockets", "futures", "futures-util", - "http", + "http 1.0.0", + "http-body-util", + "hyper 1.2.0", + "hyper-util", "jwt", "maplit", "nanoid", @@ -1138,6 +1351,7 @@ dependencies = [ "notify", "once_cell", "rdkafka", + "reqwest", "ringbuf", "serde", "serde_json", @@ -1146,7 +1360,6 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite", "tracing", "tracing-subscriber", "wasmtime", @@ -1170,7 +1383,7 @@ name = "kiwi-sdk" version = "0.1.1-alpha.7" dependencies = [ "anyhow", - "http", + "http 1.0.0", "kiwi-macro", "wit-bindgen", ] @@ -1273,6 +1486,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "maybe-owned" version = "0.3.4" @@ -1303,6 +1522,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1333,6 +1558,24 @@ dependencies = [ "rand", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nix" version = "0.28.0" @@ -1423,6 +1666,50 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openssl" +version = "0.10.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dda2b0f344e78efc2facf7d195d098df0dd72151b26ab98da807afc26c198dff" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "overload" version = "0.1.1" @@ -1464,6 +1751,26 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1640,6 +1947,46 @@ dependencies = [ "smallvec", ] +[[package]] +name = "reqwest" +version = "0.11.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bf93c4af7a8bb7d879d51cebe797356ff10ae8516ace542b5182d9dcac10b2" +dependencies = [ + "base64 0.21.5", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.24", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.17.7" @@ -1702,6 +2049,15 @@ dependencies = [ "sct", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.5", +] + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -1712,6 +2068,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "ryu" version = "1.0.15" @@ -1727,6 +2089,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1743,6 +2114,29 @@ dependencies = [ "untrusted", ] +[[package]] +name = "security-framework" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.20" @@ -1780,6 +2174,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_yaml" version = "0.9.31" @@ -1842,6 +2258,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simdutf8" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" + [[package]] name = "slab" version = "0.4.9" @@ -1859,9 +2281,9 @@ checksum = "826167069c09b99d56f31e9ae5c99049e932a98c9dc2dac47645b08dbbf76ba7" [[package]] name = "smallvec" -version = "1.11.1" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "socket2" @@ -1934,6 +2356,33 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "system-interface" version = "0.26.0" @@ -2043,6 +2492,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -2065,18 +2524,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "tokio-tungstenite" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite", -] - [[package]] name = "tokio-util" version = "0.7.9" @@ -2117,12 +2564,41 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2180,25 +2656,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "tungstenite" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http", - "httparse", - "log", - "rand", - "sha1", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "typenum" version = "1.17.0" @@ -2354,6 +2811,72 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.48", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" + [[package]] name = "wasm-encoder" version = "0.41.0" @@ -2732,10 +3255,10 @@ dependencies = [ "async-trait", "bytes", "futures", - "http", - "http-body", + "http 1.0.0", + "http-body 1.0.0", "http-body-util", - "hyper", + "hyper 1.2.0", "rustls", "tokio", "tokio-rustls", @@ -2811,6 +3334,16 @@ dependencies = [ "wast 70.0.2", ] +[[package]] +name = "web-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" @@ -3047,6 +3580,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "winx" version = "0.36.2" diff --git a/src/kiwi/Cargo.toml b/src/kiwi/Cargo.toml index 811f51b..b99a439 100644 --- a/src/kiwi/Cargo.toml +++ b/src/kiwi/Cargo.toml @@ -24,7 +24,6 @@ serde_yaml = "0.9" thiserror = "1.0.57" tokio = { version = "1", features = ["full"] } tokio-stream = { version = "0.1.14", features = ["sync"] } -tokio-tungstenite = "0.21.0" tracing = "0.1.40" tracing-subscriber = "0.3.18" wasmtime = { version = "18.0.2", features = ["component-model", "async"] } @@ -38,7 +37,14 @@ futures-util = "0.3.30" http = "1.0.0" notify = "6.1.1" arc-swap = "1.7.0" +fastwebsockets = { version = "0.7.0", features = ["with_axum", "upgrade"] } +axum = "0.7.4" +http-body-util = "0.1.1" [dev-dependencies] tempfile = "3" nix = { version = "0.28.0", features = ["signal"] } +reqwest = "0.11.26" +hyper = "1.2.0" +hyper-util = "0.1.3" +bytes = "1.5.0" diff --git a/src/kiwi/src/ws.rs b/src/kiwi/src/ws.rs index c2fe2bf..f473605 100644 --- a/src/kiwi/src/ws.rs +++ b/src/kiwi/src/ws.rs @@ -1,15 +1,16 @@ -use std::borrow::Cow; use std::collections::BTreeMap; use std::sync::Mutex; use std::{net::SocketAddr, sync::Arc}; use arc_swap::ArcSwapOption; -use futures::{SinkExt, StreamExt}; -use tokio::io::{AsyncRead, AsyncWrite}; +use axum::body::Body; +use axum::extract::{ConnectInfo, Request, State}; +use axum::{response::IntoResponse, routing::get, Router}; +use fastwebsockets::{upgrade, CloseCode, FragmentCollector, Frame, Payload, WebSocketError}; +use http::{Response, StatusCode}; +use http_body_util::Empty; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; -use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response}; -use tokio_tungstenite::tungstenite::http::StatusCode; -use tokio_tungstenite::WebSocketStream; use crate::connection::ConnectionManager; use crate::hook::authenticate::types::Authenticate; @@ -17,16 +18,22 @@ use crate::hook::authenticate::types::Outcome; use crate::hook::intercept::types::{AuthCtx, ConnectionCtx, WebSocketConnectionCtx}; use crate::hook::intercept::types::Intercept; -use crate::protocol::{Command, Message, ProtocolError as KiwiProtocolError}; +use crate::protocol::{Command, Message, ProtocolError}; use crate::source::{Source, SourceId}; -use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; -use tokio_tungstenite::tungstenite::protocol::{CloseFrame, Message as ProtocolMessage}; +type Sources = Arc>>>; + +struct AppState { + sources: Sources, + intercept: Arc>, + authenticate: Arc>, + subscriber_config: crate::config::Subscriber, +} /// Starts a WebSocket server with the specified configuration pub async fn serve( listen_addr: &SocketAddr, - sources: Arc>>>, + sources: Sources, intercept: Arc>, authenticate: Arc>, subscriber_config: crate::config::Subscriber, @@ -35,117 +42,146 @@ where I: Intercept + Send + Sync + 'static, A: Authenticate + Send + Sync + Unpin + 'static, { + let app = make_app(sources, intercept, authenticate, subscriber_config); + // TODO: Support TLS let listener = TcpListener::bind(listen_addr).await?; - tracing::info!("Server started. Listening on: {}", listen_addr); - - while let Ok((stream, addr)) = listener.accept().await { - let sources = Arc::clone(&sources); - let intercept = intercept.clone(); - let authenticate = authenticate.clone(); - let subscriber_config = subscriber_config.clone(); - tokio::spawn(async move { - match perform_handshake(stream, authenticate).await { - Ok((mut ws_stream, auth_ctx)) => { - tracing::info!(ip = ?addr, "New WebSocket connection"); - - let connection_ctx = ConnectionCtx::WebSocket(WebSocketConnectionCtx { addr }); - - let _ = drive_stream( - &mut ws_stream, - sources, - auth_ctx, - connection_ctx, - intercept, - subscriber_config, - ) - .await; - } - Err(e) => { - tracing::error!( - "Error during websocket handshake occurred for client {}: {}", - addr, - e - ); - } - } - tracing::info!("{} disconnected", addr); - }); - } + tracing::info!("Server listening on: {}", listen_addr); + + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await?; Ok(()) } -async fn perform_handshake( - stream: S, +fn make_app( + sources: Sources, + intercept: Arc>, authenticate: Arc>, -) -> anyhow::Result<(WebSocketStream, Option)> + subscriber_config: crate::config::Subscriber, +) -> Router where - S: AsyncRead + AsyncWrite + Unpin, - A: Authenticate + Unpin + Send + Sync + 'static, + I: Intercept + Send + Sync + 'static, + A: Authenticate + Send + Sync + Unpin + 'static, { - let mut auth_ctx = None; - - let handle = tokio::runtime::Handle::current(); - - let ws_stream = tokio_tungstenite::accept_hdr_async_with_config( - stream, - |req: &Request, res: Response| { - let request = req.clone(); - - if let Some(hook) = authenticate.load().as_ref() { - // The callback implemented by tokio-tungstenite is not async, which makes - // it difficult to invoke async code from within it. Ultimately, we need - // to block the current thread to run the async code. While it's not ideal, - // tokio provides `tokio::task::block_in_place` to handle this. - // - // The tokio-tungstenite issue is tracked [here](https://github.com/snapview/tokio-tungstenite/issues/159) - let outcome = tokio::task::block_in_place(move || { - handle.block_on(async { hook.authenticate(request).await }) - }); - - match outcome { - Ok(Outcome::Authenticate) => (), - Ok(Outcome::WithContext(ctx)) => { - auth_ctx = Some(AuthCtx::from_bytes(ctx)); - } - outcome => { - if outcome.is_err() { - tracing::error!( - "Failed to run authentication hook. Error {:?}", - outcome.unwrap_err() - ); - } + let state = AppState { + sources, + intercept, + authenticate, + subscriber_config, + }; - let mut res = ErrorResponse::new(Some("Unauthorized".to_string())); - *res.status_mut() = StatusCode::UNAUTHORIZED; - return Err(res); - } + Router::new() + .route("/", get(ws_handler)) + .route("/health", get(healthcheck)) + .with_state(Arc::new(state)) +} + +#[tracing::instrument(skip_all)] +async fn load_auth_ctx( + authenticate: Arc>, + request: Request, +) -> Result, ()> +where + A: Authenticate + Send + Sync + Unpin + 'static, +{ + if let Some(hook) = authenticate.load().as_ref() { + let outcome = hook.authenticate(request.map(|_| ())).await; + + match outcome { + Ok(Outcome::Authenticate) => Ok(None), + Ok(Outcome::WithContext(ctx)) => Ok(Some(AuthCtx::from_bytes(ctx))), + outcome => { + if outcome.is_err() { + tracing::error!( + "Failure occurred while running authentication hook: {:?}", + outcome.unwrap_err() + ); } + + return Err(()); } + } + } else { + Ok(None) + } +} - Ok(res) - }, - None, - ) - .await?; +async fn ws_handler( + ConnectInfo(addr): ConnectInfo, + State(state): State>>, + mut request: Request, +) -> impl IntoResponse +where + I: Intercept + Send + Sync + 'static, + A: Authenticate + Send + Sync + Unpin + 'static, +{ + let (response, fut) = upgrade::upgrade(&mut request).expect("failed to build upgrade response"); - Ok((ws_stream, auth_ctx)) + let authenticate = Arc::clone(&state.authenticate); + + let auth_ctx = if let Ok(auth_ctx) = load_auth_ctx(authenticate, request).await { + auth_ctx + } else { + return Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Empty::new()) + .unwrap(); + }; + + let intercept = Arc::clone(&state.intercept); + let sources = Arc::clone(&state.sources); + let subscriber_config = state.subscriber_config.clone(); + let connection_ctx = ConnectionCtx::WebSocket(WebSocketConnectionCtx { addr }); + + tokio::spawn(async move { + if let Err(e) = handle_client( + fut, + sources, + intercept, + subscriber_config, + connection_ctx.clone(), + auth_ctx, + ) + .await + { + tracing::error!( + addr = ?addr, + "Error occurred while serving WebSocket client: {}", + e + ); + } + + tracing::debug!(connection = ?connection_ctx, "WebSocket connection terminated"); + }); + + response } -async fn drive_stream( - stream: &mut WebSocketStream, - sources: Arc>>>, - auth_ctx: Option, - connection_ctx: ConnectionCtx, +async fn healthcheck() -> impl IntoResponse { + "OK" +} + +async fn handle_client( + fut: upgrade::UpgradeFut, + sources: Sources, intercept: Arc>, subscriber_config: crate::config::Subscriber, + connection_ctx: ConnectionCtx, + auth_ctx: Option, ) -> anyhow::Result<()> where - S: AsyncRead + AsyncWrite + Unpin, I: Intercept + Send + Sync + 'static, { + let ws = fut.await?; + let mut ws = fastwebsockets::FragmentCollector::new(ws); + + tracing::debug!(connection = ?connection_ctx, "WebSocket connection established"); + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::(); @@ -167,55 +203,40 @@ where }); loop { - // It's important that we bias the select block towards the WebSocket stream - // because message pushes are likely to occur much more frequently than - // commands, and we want to ensure the polling of each future remains fair. tokio::select! { biased; - Some(protocol_msg) = stream.next() => { - match protocol_msg { - Ok(msg) => { - let cmd = match msg { - ProtocolMessage::Text(text) => { - match serde_json::from_str::(&text) { - Ok(cmd) => Ok(cmd), - Err(_) => Err(KiwiProtocolError::CommandDeserialization(text)), + Some(cmd) = recv_cmd(&mut ws) => { + match cmd { + Ok(cmd) => { + if cmd_tx.send(cmd).is_err() { + // If the send failed, the channel is closed thus we should + // terminate the connection + break; + } + } + Err(e) => { + let (close_code, reason) = match e { + RecvError::WebSocket(e) => { + match e { + WebSocketError::ConnectionClosed => break, + e => return Err(e.into()), } - }, - ProtocolMessage::Binary(_) => Err(KiwiProtocolError::UnsupportedCommandForm), - ProtocolMessage::Close(_) => { - break; - }, - // Handling of Ping/Close messages are delegated to tungstenite - _ => continue, - }; - - match cmd { - Ok(cmd) => { - if cmd_tx.send(cmd).is_err() { - // If the send failed, the channel is closed - // thus we should terminate the connection - break; + } + RecvError::Protocol(e) => { + match e { + ProtocolError::CommandDeserialization(_) => { + (CloseCode::Policy, e.to_string()) + } + ProtocolError::UnsupportedCommandForm => { + (CloseCode::Unsupported, e.to_string()) + } } }, - Err(e) => { - let close_frame = CloseFrame { - code: CloseCode::Unsupported, - reason: Cow::from(e.to_string()), - }; - - if let Err(e) = stream.close(Some(close_frame)).await { - tracing::error!("Failed to gracefully close the WebSocket connection with error {}", e); - } + }; - break; - }, - } - }, - Err(tokio_tungstenite::tungstenite::Error::ConnectionClosed) => break, - Err(e) => { - tracing::error!("Encountered unexpected error while reading from WS stream: {}", e); + let frame = Frame::close(close_code.into(), reason.as_bytes()); + ws.write_frame(frame).await?; break; } } @@ -225,10 +246,10 @@ where Some(msg) => { let txt = serde_json::to_string(&msg).expect("failed to serialize message"); - // TODO: Batch here and flush on an interval? - if let Err(e) = stream.send(ProtocolMessage::Text(txt)).await { - tracing::error!("Encountered unexpected error while feeding data to WS stream: {}", e); - } + let frame = Frame::text(Payload::from(txt.as_bytes())); + + ws.write_frame(frame).await?; + } None => { // The sole sender (our ingest actor) has hung up for some reason so we want to @@ -236,9 +257,44 @@ where break; }, } - }, + } } } Ok(()) } + +enum RecvError { + WebSocket(WebSocketError), + Protocol(ProtocolError), +} + +async fn recv_cmd(ws: &mut FragmentCollector) -> Option> +where + S: AsyncReadExt + AsyncWriteExt + Unpin, +{ + let frame = match ws.read_frame().await { + Ok(frame) => frame, + Err(e) => { + return Some(Err(RecvError::WebSocket(e))); + } + }; + + match frame.opcode { + fastwebsockets::OpCode::Text => { + Some( + serde_json::from_slice::(&frame.payload).map_err(|_| { + RecvError::Protocol(ProtocolError::CommandDeserialization( + // SAFETY: We know the payload is valid UTF-8 because `read_frame` + // guarantees that text frames payloads are valid UTF-8 + unsafe { std::str::from_utf8_unchecked(&frame.payload) }.to_string(), + )) + }), + ) + } + fastwebsockets::OpCode::Binary => Some(Err(RecvError::Protocol( + ProtocolError::UnsupportedCommandForm, + ))), + _ => None, + } +} diff --git a/src/kiwi/tests/common/healthcheck.rs b/src/kiwi/tests/common/healthcheck.rs new file mode 100644 index 0000000..40b0914 --- /dev/null +++ b/src/kiwi/tests/common/healthcheck.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +use anyhow::anyhow; + +pub struct Healthcheck<'a> { + pub interval: Duration, + pub attempts: u32, + pub url: &'a str, +} + +impl<'a> Healthcheck<'a> { + pub async fn run(&self) -> anyhow::Result<()> { + for _ in 0..self.attempts { + if let Ok(response) = reqwest::get(self.url).await { + if response.status().is_success() { + return Ok(()); + } + } + + tokio::time::sleep(self.interval).await; + } + + Err(anyhow!( + "Healthcheck failed after {} attempts", + self.attempts + )) + } +} diff --git a/src/kiwi/tests/common/mod.rs b/src/kiwi/tests/common/mod.rs index 87fd162..84e1160 100644 --- a/src/kiwi/tests/common/mod.rs +++ b/src/kiwi/tests/common/mod.rs @@ -1,3 +1,4 @@ +pub mod healthcheck; pub mod kafka; pub mod kiwi; pub mod ws; diff --git a/src/kiwi/tests/common/ws.rs b/src/kiwi/tests/common/ws.rs index 2ec9656..41a5eb7 100644 --- a/src/kiwi/tests/common/ws.rs +++ b/src/kiwi/tests/common/ws.rs @@ -1,27 +1,70 @@ -use futures::{SinkExt, StreamExt}; -use http::Response; +use fastwebsockets::FragmentCollector; +use fastwebsockets::Frame; +use fastwebsockets::OpCode; +use fastwebsockets::Payload; +use futures::Future; +use http_body_util::Empty; use tokio::net::TcpStream; -use tokio_tungstenite::{ - connect_async, - tungstenite::{client::IntoClientRequest, Message}, - MaybeTlsStream, WebSocketStream, -}; + +use bytes::Bytes; +use hyper::body::Incoming; +use hyper::header::CONNECTION; +use hyper::header::UPGRADE; +use hyper::upgrade::Upgraded; +use hyper::{Request, Response, Uri}; +use hyper_util::rt::TokioIo; pub struct Client { - stream: WebSocketStream>, + ws: FragmentCollector>, +} + +struct SpawnExecutor; + +impl hyper::rt::Executor for SpawnExecutor +where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn(fut); + } } impl Client { - pub async fn connect( - url: R, - ) -> anyhow::Result<(Self, Response>>)> { - let (stream, response) = connect_async(url).await?; + pub async fn connect(uri: &str) -> anyhow::Result<(Self, Response)> { + let uri: Uri = uri.try_into()?; + let stream = TcpStream::connect( + format!("{}:{}", uri.host().unwrap(), uri.port_u16().unwrap()).as_str(), + ) + .await?; + + let req = Request::builder() + .method("GET") + .uri(&uri) + .header("Host", uri.host().unwrap()) + .header(UPGRADE, "websocket") + .header(CONNECTION, "upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .body(Empty::::new())?; - Ok((Self { stream }, response)) + let (ws, res) = fastwebsockets::handshake::client(&SpawnExecutor, req, stream).await?; + + Ok(( + Self { + ws: FragmentCollector::new(ws), + }, + res, + )) } pub async fn send_text(&mut self, text: &str) -> anyhow::Result<()> { - self.stream.send(Message::Text(text.to_string())).await?; + self.ws + .write_frame(Frame::text(Payload::Borrowed(text.as_bytes()))) + .await?; Ok(()) } @@ -33,16 +76,18 @@ impl Client { Ok(()) } - pub async fn recv(&mut self) -> Option> { - self.stream.next().await + pub async fn recv_text_frame(&mut self) -> anyhow::Result> { + let frame = self.ws.read_frame().await?; + + match frame.opcode { + OpCode::Text => Ok(frame), + _ => Err(anyhow::anyhow!("Expected text frame")), + } } pub async fn recv_json(&mut self) -> anyhow::Result { - let message = self - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("Connection closed"))??; - let text = message.to_text()?; + let text_frame = self.recv_text_frame().await?; + let text = std::str::from_utf8(text_frame.payload.as_ref())?; let value = serde_json::from_str(&text)?; Ok(value) diff --git a/src/kiwi/tests/kafka.rs b/src/kiwi/tests/kafka.rs index 848b242..1171060 100644 --- a/src/kiwi/tests/kafka.rs +++ b/src/kiwi/tests/kafka.rs @@ -9,6 +9,7 @@ use common::ws::Client as WsClient; use kiwi::protocol::{Command, CommandResponse, Message, Notice, SubscriptionMode}; use once_cell::sync::Lazy; +use crate::common::healthcheck::Healthcheck; use crate::common::kafka::Producer; static BOOTSTRAP_SERVERS: Lazy = Lazy::new(|| { @@ -39,8 +40,13 @@ async fn test_receives_messages_kafka_source() -> anyhow::Result<()> { )?; let _kiwi = Process::new_with_args(&["--config", config.path_str()])?; - // TODO: Replace with a proper health check - tokio::time::sleep(Duration::from_secs(2)).await; + Healthcheck { + interval: Duration::from_millis(200), + attempts: 10, + url: "http://127.0.0.1:8000/health", + } + .run() + .await?; let (mut ws_client, _) = WsClient::connect("ws://127.0.0.1:8000").await?; @@ -129,8 +135,13 @@ async fn test_closes_subscription_on_partition_added() -> anyhow::Result<()> { )?; let _kiwi = Process::new_with_args(&["--config", config.path_str()])?; - // TODO: Replace with a proper health check - tokio::time::sleep(Duration::from_secs(2)).await; + Healthcheck { + interval: Duration::from_millis(200), + attempts: 10, + url: "http://127.0.0.1:8000/health", + } + .run() + .await?; let (mut ws_client, _) = WsClient::connect("ws://127.0.0.1:8000").await?; @@ -193,8 +204,13 @@ async fn test_named_kafka_source() -> anyhow::Result<()> { )?; let _kiwi = Process::new_with_args(&["--config", config.path_str()])?; - // TODO: Replace with a proper health check - tokio::time::sleep(Duration::from_secs(2)).await; + Healthcheck { + interval: Duration::from_millis(200), + attempts: 10, + url: "http://127.0.0.1:8000/health", + } + .run() + .await?; let (mut ws_client, _) = WsClient::connect("ws://127.0.0.1:8000").await?; @@ -257,8 +273,13 @@ async fn test_dynamic_config_source_removal() -> anyhow::Result<()> { )?; let _kiwi = Process::new_with_args(&["--config", config.path_str()])?; - // TODO: Replace with a proper health check - tokio::time::sleep(Duration::from_secs(2)).await; + Healthcheck { + interval: Duration::from_millis(200), + attempts: 10, + url: "http://127.0.0.1:8000/health", + } + .run() + .await?; let (mut ws_client, _) = WsClient::connect("ws://127.0.0.1:8000").await?;