From b806ba05e3e5d39a9486539ec3014761b3ae8bc7 Mon Sep 17 00:00:00 2001 From: 0xterminator Date: Fri, 15 Nov 2024 15:35:20 +0200 Subject: [PATCH] feat(repo): Added grpc server to publisher --- Cargo.lock | 315 +++++++++++++++++- crates/fuel-streams-publisher/Cargo.toml | 10 + crates/fuel-streams-publisher/build.rs | 25 ++ crates/fuel-streams-publisher/config.toml | 29 ++ .../proto/fuel_streamer.proto | 18 + .../fuel-streams-publisher/src/grpc/config.rs | 185 ++++++++++ .../src/grpc/fuel_streamer.bin | Bin 0 -> 670 bytes .../src/grpc/fuel_streamer.rs | 132 ++++++++ crates/fuel-streams-publisher/src/grpc/mod.rs | 2 + .../src/grpc/shutdown.rs | 37 ++ crates/fuel-streams-publisher/src/lib.rs | 1 + 11 files changed, 740 insertions(+), 14 deletions(-) create mode 100644 crates/fuel-streams-publisher/build.rs create mode 100644 crates/fuel-streams-publisher/config.toml create mode 100644 crates/fuel-streams-publisher/proto/fuel_streamer.proto create mode 100644 crates/fuel-streams-publisher/src/grpc/config.rs create mode 100644 crates/fuel-streams-publisher/src/grpc/fuel_streamer.bin create mode 100644 crates/fuel-streams-publisher/src/grpc/fuel_streamer.rs create mode 100644 crates/fuel-streams-publisher/src/grpc/mod.rs create mode 100644 crates/fuel-streams-publisher/src/grpc/shutdown.rs diff --git a/Cargo.lock b/Cargo.lock index d06a0603..6ae5c994 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,7 +64,7 @@ dependencies = [ "encoding_rs", "flate2", "futures-core", - "h2", + "h2 0.3.26", "http 0.2.12", "httparse", "httpdate", @@ -802,6 +802,12 @@ dependencies = [ "critical-section", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "attohttpc" version = "0.24.1" @@ -848,7 +854,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.2.9", "bitflags 1.3.2", "bytes", "futures-util", @@ -856,7 +862,7 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.31", "itoa", - "matchit", + "matchit 0.5.0", "memchr", "mime", "percent-encoding", @@ -866,12 +872,39 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 0.1.2", "tokio", - "tower", + "tower 0.4.13", "tower-http 0.3.5", "tower-layer", "tower-service", ] +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", + "tower 0.5.1", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.2.9" @@ -888,6 +921,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -1624,6 +1677,18 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "confy" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45b1f4c00870f07dc34adcac82bb6a72cc5aabca8536ba1797e01df51d2ce9a0" +dependencies = [ + "directories", + "serde", + "thiserror", + "toml", +] + [[package]] name = "console" version = "0.15.8" @@ -2275,6 +2340,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "directories" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a49173b84e034382284f27f1af4dcbbd231ffa358c0fe316541a7337f376a35" +dependencies = [ + "dirs-sys 0.4.1", +] + [[package]] name = "directories-next" version = "2.0.0" @@ -2291,7 +2365,7 @@ version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" dependencies = [ - "dirs-sys", + "dirs-sys 0.3.7", ] [[package]] @@ -2315,6 +2389,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys 0.48.0", +] + [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -2977,7 +3063,7 @@ dependencies = [ "async-graphql", "async-graphql-value", "async-trait", - "axum", + "axum 0.5.17", "clap 4.5.20", "derive_more 0.99.18", "enum-iterator", @@ -3019,7 +3105,7 @@ dependencies = [ "tokio-rayon", "tokio-stream", "tokio-util", - "tower", + "tower 0.4.13", "tower-http 0.4.4", "tracing", "uuid", @@ -3636,6 +3722,7 @@ dependencies = [ "async-trait", "chrono", "clap 4.5.20", + "confy", "derive_more 1.0.0", "displaydoc", "dotenvy", @@ -3653,6 +3740,8 @@ dependencies = [ "openssl", "parking_lot", "prometheus", + "prost 0.13.3", + "prost-build", "rand", "rayon", "rust_decimal", @@ -3663,6 +3752,11 @@ dependencies = [ "sysinfo", "thiserror", "tokio", + "toml", + "tonic", + "tonic-build", + "tonic-health", + "tonic-reflection", "tracing", "tracing-actix-web", "url", @@ -4120,6 +4214,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.6.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.4.1" @@ -4469,7 +4582,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -4492,9 +4605,11 @@ dependencies = [ "bytes", "futures-channel", "futures-util", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -4534,6 +4649,19 @@ dependencies = [ "webpki-roots 0.26.6", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.5.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -4552,9 +4680,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -5770,6 +5898,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matrixmultiply" version = "0.3.9" @@ -5965,6 +6099,12 @@ dependencies = [ "unsigned-varint 0.7.2", ] +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" + [[package]] name = "multistream-select" version = "0.13.0" @@ -6506,6 +6646,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -7145,7 +7291,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", +] + +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive 0.13.3", +] + +[[package]] +name = "prost-build" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.13.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost 0.13.3", + "prost-types", + "regex", + "syn 2.0.82", + "tempfile", ] [[package]] @@ -7161,6 +7338,28 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.82", +] + +[[package]] +name = "prost-types" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +dependencies = [ + "prost 0.13.3", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -7223,7 +7422,7 @@ dependencies = [ "libflate", "log", "names", - "prost", + "prost 0.11.9", "reqwest 0.11.27", "thiserror", "url", @@ -7575,7 +7774,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.31", @@ -9467,6 +9666,77 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.7", + "base64 0.22.1", + "bytes", + "flate2", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.3", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.82", +] + +[[package]] +name = "tonic-health" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1eaf34ddb812120f5c601162d5429933c9b527d901ab0e7f930d3147e33a09b2" +dependencies = [ + "async-stream", + "prost 0.13.3", + "tokio", + "tokio-stream", + "tonic", +] + +[[package]] +name = "tonic-reflection" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "878d81f52e7fcfd80026b7fdb6a9b578b3c3653ba987f87f0dce4b64043cba27" +dependencies = [ + "prost 0.13.3", + "prost-types", + "tokio", + "tokio-stream", + "tonic", +] + [[package]] name = "tower" version = "0.4.13" @@ -9475,8 +9745,11 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", "tokio-util", "tower-layer", @@ -9484,6 +9757,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.3.5" @@ -9498,7 +9785,7 @@ dependencies = [ "http-body 0.4.6", "http-range-header", "pin-project-lite", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", ] diff --git a/crates/fuel-streams-publisher/Cargo.toml b/crates/fuel-streams-publisher/Cargo.toml index 63520665..ee8c3d35 100644 --- a/crates/fuel-streams-publisher/Cargo.toml +++ b/crates/fuel-streams-publisher/Cargo.toml @@ -20,6 +20,7 @@ async-nats = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } clap = { workspace = true } +confy = "0.6" derive_more = { version = "1.0", features = ["full"] } displaydoc = { workspace = true } dotenvy = { workspace = true } @@ -36,6 +37,7 @@ futures = { workspace = true } num_cpus = "1.16" parking_lot = { version = "0.12", features = ["serde"] } prometheus = { version = "0.13", features = ["process"] } +prost = "0.13.3" rand = { workspace = true } rayon = "1.10.0" rust_decimal = { version = "1.13" } @@ -46,12 +48,20 @@ sha2 = { workspace = true } sysinfo = { version = "0.29" } thiserror = "1.0" tokio = { workspace = true } +toml = "0.8.4" +tonic = { version = "0.12.3", features = ["gzip"] } +tonic-health = { version = "0.12.3" } +tonic-reflection = { version = "0.12.3" } tracing = { workspace = true } tracing-actix-web = { workspace = true } url = "2.5.2" [dev-dependencies] +[build-dependencies] +prost-build = "0.13.3" +tonic-build = "0.12.3" + [features] default = [] test-helpers = [] diff --git a/crates/fuel-streams-publisher/build.rs b/crates/fuel-streams-publisher/build.rs new file mode 100644 index 00000000..64f92947 --- /dev/null +++ b/crates/fuel-streams-publisher/build.rs @@ -0,0 +1,25 @@ +fn main() { + let protos = &["proto/fuel_streamer.proto"]; + + // server + let prost_config_server = prost_build::Config::default(); + tonic_build::configure() + .build_server(true) + .build_client(false) + .build_transport(true) + .file_descriptor_set_path("src/grpc/fuel_streamer.bin") + .out_dir("src/grpc") + .compile_protos_with_config(prost_config_server, protos, &[] as &[&str]) + .expect("failed to compile server protos"); + + // client + let prost_config_client = prost_build::Config::default(); + tonic_build::configure() + .build_client(true) + .build_server(false) + .build_transport(true) + .file_descriptor_set_path("src/grpc/fuel_streamer.bin") + .out_dir("src/grpc") + .compile_protos_with_config(prost_config_client, protos, &[] as &[&str]) + .expect("failed to compile client protos"); +} diff --git a/crates/fuel-streams-publisher/config.toml b/crates/fuel-streams-publisher/config.toml new file mode 100644 index 00000000..ba6855a0 --- /dev/null +++ b/crates/fuel-streams-publisher/config.toml @@ -0,0 +1,29 @@ +[grpc] +# whether to enable gRPC reflection(introspection) +enable-reflection = true +# which compression encodings does the server accept for requests +accept-compressed = "Gzip" +# which compression encodings might the server use for responses +send-compressed = "Gzip" +# limits the maximum size of a decoded message. Defaults to 50MB +max-decoding-message-size = 52428800 +# limits the maximum size of an encoded message. Defaults to 50MB +max-encoding-message-size = 52428800 +# limits the maximum size of streaming channel +max-channel-size = 128 +# set a timeout on for all request handlers in seconds. Defaults to 60s +timeout = 180 +# sets the maximum frame size to use for HTTP2(must be within 16,384 and 16,777,215). Defaults to 16MB +max-frame-size = 16777215 +# set the concurrency limit applied to on requests inbound per connection. Defaults to 32 +concurrency-limit-per-connection = 32 +# sets the SETTINGS_MAX_CONCURRENT_STREAMS spec option for HTTP2 connections +max-concurrent-streams = 1024 +# set the value of `TCP_NODELAY` option for accepted connections. Enabled by default +tcp-nodelay = true +# max number of future periods considered during requests +draw-lookahead-period-count = 10 +# set whether HTTP2 Ping frames are enabled on accepted connections. Default is no HTTP2 keepalive (`None`) +http2-keepalive-interval = 180 +# sets a timeout for receiving an acknowledgement of the keepalive ping. Default is 20 seconds +http2-keepalive-timeout = 60 diff --git a/crates/fuel-streams-publisher/proto/fuel_streamer.proto b/crates/fuel-streams-publisher/proto/fuel_streamer.proto new file mode 100644 index 00000000..1e45e787 --- /dev/null +++ b/crates/fuel-streams-publisher/proto/fuel_streamer.proto @@ -0,0 +1,18 @@ + +syntax = "proto3"; + +package fuel_streamer; + +service BtcServer { + rpc StreamBlocks(StreamBlocksRequestFilter) returns (StreamBlocksResponse) {} +} + +message StreamBlocksRequestFilter { + uint64 from = 1; + uint64 to = 2; +} + +message StreamBlocksResponse { + string type = 1; + bytes data = 2; +} diff --git a/crates/fuel-streams-publisher/src/grpc/config.rs b/crates/fuel-streams-publisher/src/grpc/config.rs new file mode 100644 index 00000000..2ef3f0a6 --- /dev/null +++ b/crates/fuel-streams-publisher/src/grpc/config.rs @@ -0,0 +1,185 @@ +use std::{ + path::{Path, PathBuf}, + str::FromStr, + time::Duration, +}; + +use clap::Parser; +use confy::ConfyError; +use displaydoc::Display as DisplayDoc; +use serde::{Deserialize, Deserializer, Serialize}; +use thiserror::Error; +use tokio::{fs::File, io::AsyncReadExt}; + +#[derive(Debug, DisplayDoc, Error)] +pub enum Error { + /// Open config file: {0} + OpenConfig(std::io::Error), + /// Failed to parse config: {0} + ParseConfig(toml::de::Error), + /// Failed to parse config as utf-8: {0} + ParseUtf8(std::string::FromUtf8Error), + /// Failed to read config file: {0} + ReadConfig(std::io::Error), + /// Failed to read config metadata: {0} + ReadMeta(std::io::Error), + /// Failed to read env config: {0} + Confy(ConfyError), + /// Missing config element: {0} + MissingConfigElement(&'static str), +} + +#[derive(Debug, Default, Deserialize, Clone)] +#[serde(deny_unknown_fields, rename_all = "kebab-case")] +pub struct GrpcConfig { + /// whether to enable gRPC reflection + pub enable_reflection: bool, + /// which compression encodings does the server accept for requests + #[serde(skip_serializing_if = "Option::is_none")] + pub accept_compressed: Option, + /// which compression encodings might the server use for responses + #[serde(skip_serializing_if = "Option::is_none")] + pub send_compressed: Option, + /// limits the maximum size of a decoded message. Defaults to 4MB + pub max_decoding_message_size: usize, + /// limits the maximum size of an encoded message. Defaults to 4MB + pub max_encoding_message_size: usize, + /// limits the maximum size of streaming channel + #[allow(dead_code)] + pub max_channel_size: usize, + /// set a timeout on for all request handlers + #[serde(deserialize_with = "deserialize_duration_from_usize")] + pub timeout: Duration, + /// sets the SETTINGS_INITIAL_WINDOW_SIZE spec option for HTTP2 stream-level flow control. + /// Default is 65,535 + #[serde(skip_serializing_if = "Option::is_none")] + pub initial_stream_window_size: Option, + /// set whether TCP keepalive messages are enabled on accepted connections + #[serde(skip_serializing_if = "Option::is_none")] + pub tcp_keepalive: Option, + /// sets the max connection-level flow control for HTTP2. Default is 65,535 + #[serde(skip_serializing_if = "Option::is_none")] + pub initial_connection_window_size: Option, + /// sets the maximum frame size to use for HTTP2. If not set, will default from underlying + /// transport + #[serde(skip_serializing_if = "Option::is_none")] + pub max_frame_size: Option, + /// set the concurrency limit applied to on requests inbound per connection. Defaults to 32 + pub concurrency_limit_per_connection: usize, + /// sets the SETTINGS_MAX_CONCURRENT_STREAMS spec option for HTTP2 connections. Default is no + /// limit (`None`) + #[serde(skip_serializing_if = "Option::is_none")] + pub max_concurrent_streams: Option, + /// set whether HTTP2 Ping frames are enabled on accepted connections. Default is no HTTP2 + /// keepalive (`None`) + #[serde( + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_duration_option" + )] + pub http2_keepalive_interval: Option, + /// sets a timeout for receiving an acknowledgement of the keepalive ping. Default is 20 + /// seconds + #[serde( + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_duration_option" + )] + pub http2_keepalive_timeout: Option, + /// sets whether to use an adaptive flow control. Defaults to false + #[serde(skip_serializing_if = "Option::is_none")] + pub http2_adaptive_window: Option, + /// set the value of `TCP_NODELAY` option for accepted connections. Enabled by default + pub tcp_nodelay: bool, + /// when looking for next draw we want to look at max `draw_lookahead_period_count`\ + #[allow(dead_code)] + pub draw_lookahead_period_count: u64, +} + +fn deserialize_duration_from_usize<'de, D>( + deserializer: D, +) -> Result +where + D: Deserializer<'de>, +{ + let seconds = u64::deserialize(deserializer)?; + Ok(Duration::from_secs(seconds)) +} + +fn deserialize_duration_option<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let seconds: Option = Option::deserialize(deserializer)?; + if seconds.is_none() { + return Ok(None); + } + Ok(seconds.map(Duration::from_secs)) +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(deny_unknown_fields, rename_all = "kebab-case")] +pub struct TomlConfig { + pub grpc: GrpcConfig, +} + +impl TomlConfig { + pub async fn new(path: impl AsRef + Send) -> Result { + read_to_string(path).await?.parse() + } +} + +impl FromStr for TomlConfig { + type Err = Error; + + fn from_str(s: &str) -> Result { + toml::from_str(s).map_err(Error::ParseConfig) + } +} + +async fn read_to_string( + path: impl AsRef + Send, +) -> Result { + let mut file = File::open(path).await.map_err(Error::OpenConfig)?; + let meta = file.metadata().await.map_err(Error::ReadMeta)?; + let mut contents = + Vec::with_capacity(usize::try_from(meta.len()).unwrap_or(0)); + file.read_to_end(&mut contents) + .await + .map_err(Error::ReadConfig)?; + String::from_utf8(contents).map_err(Error::ParseUtf8) +} + +// Cli args and config + +#[derive(Clone, Debug, Parser, Default, Deserialize, Serialize)] +pub(crate) struct CliConfig { + /// toml configuration path + #[arg(long)] + toml: PathBuf, + /// http port + #[arg(long)] + http_port: u16, +} + +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) toml: PathBuf, +} + +#[allow(dead_code)] +pub(crate) fn load_config() -> Result { + // First parse from cli + let cli_config = CliConfig::parse(); + // Initialize settings from file if specified + let file_config = confy::load_path::(&cli_config.toml) + .map_err(Error::Confy)?; + tracing::info!("Loaded grpc config from file: {:?}", &cli_config.toml); + + let config = Config { + toml: file_config.toml, + }; + + Ok(config) +} diff --git a/crates/fuel-streams-publisher/src/grpc/fuel_streamer.bin b/crates/fuel-streams-publisher/src/grpc/fuel_streamer.bin new file mode 100644 index 0000000000000000000000000000000000000000..8b6891d80ab17f90c2513259e3bbd7bd8e9e6782 GIT binary patch literal 670 zcmaKqK~IA~5QTSPSz(|Brq#5@gVsw=nCQuiCOvpC@gV&HO|9F+NTtHo#NXl%_3G>j zrd+(un>X*{Z329GFy2(Ap{|r!_bxD5hC!6AWQngRuMoRX038bT|XF{AY8yAixVaj_-6(u zEDnSP7~!D{0t~, +} +/// Generated client implementations. +pub mod btc_server_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct BtcServerClient { + inner: tonic::client::Grpc, + } + impl BtcServerClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl BtcServerClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> BtcServerClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + BtcServerClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn stream_blocks( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/fuel_streamer.BtcServer/StreamBlocks", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("fuel_streamer.BtcServer", "StreamBlocks")); + self.inner.unary(req, path, codec).await + } + } +} diff --git a/crates/fuel-streams-publisher/src/grpc/mod.rs b/crates/fuel-streams-publisher/src/grpc/mod.rs new file mode 100644 index 00000000..b0fc32f8 --- /dev/null +++ b/crates/fuel-streams-publisher/src/grpc/mod.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod shutdown; diff --git a/crates/fuel-streams-publisher/src/grpc/shutdown.rs b/crates/fuel-streams-publisher/src/grpc/shutdown.rs new file mode 100644 index 00000000..80546bb4 --- /dev/null +++ b/crates/fuel-streams-publisher/src/grpc/shutdown.rs @@ -0,0 +1,37 @@ +use tokio::signal::unix::{signal, SignalKind}; + +pub struct StopHandle { + pub stop_cmd_sender: tokio::sync::oneshot::Sender<()>, +} + +impl StopHandle { + /// stop the gRPC API gracefully + pub fn stop(self) { + if let Err(e) = self.stop_cmd_sender.send(()) { + tracing::error!("Grpc Api thread panicked: {:?}", e); + } else { + tracing::info!("Grpc Api finished cleanly"); + } + } +} + +pub async fn stop_signal(grpc_stop_tx: Option) { + let mut sigint = + signal(SignalKind::interrupt()).expect("sigint shutdown_listener"); + let mut sigterm = + signal(SignalKind::terminate()).expect("sigterm shutdown_listener"); + tokio::select! { + _ = sigint.recv() => { + tracing::info!("Received SIGINT ..."); + if let Some(grpc_handle) = grpc_stop_tx { + grpc_handle.stop() + } + } + _ = sigterm.recv() => { + tracing::info!("Received SIGTERM ..."); + if let Some(grpc_handle) = grpc_stop_tx { + grpc_handle.stop() + } + } + } +} diff --git a/crates/fuel-streams-publisher/src/lib.rs b/crates/fuel-streams-publisher/src/lib.rs index 5fa4a08d..4a2f97d3 100644 --- a/crates/fuel-streams-publisher/src/lib.rs +++ b/crates/fuel-streams-publisher/src/lib.rs @@ -1,4 +1,5 @@ pub mod cli; +pub mod grpc; pub mod publisher; pub mod server; pub mod telemetry;