From a2824be0cd324c0f0ca29a08094d92a05e15bd6c Mon Sep 17 00:00:00 2001 From: pgherveou Date: Thu, 19 Dec 2024 21:15:43 +0100 Subject: [PATCH] Add indexer for eth tx hash --- .cargo/config.toml | 1 + Cargo.lock | 475 +++++++++++++-- prdoc/pr_6836.prdoc | 17 + substrate/frame/revive/rpc/.gitignore | 3 + ...20958b338f9fef0d2575056f74f8c67bc3966.json | 20 + ...77e451e9e4324ee5bd235fa130e7c00892c06.json | 12 + ...810cde33043000207872088fa8903fa3c0e9d.json | 26 + substrate/frame/revive/rpc/Cargo.toml | 10 +- .../rpc/examples/westend_local_network.toml | 8 +- ...241205165418_create_transaction_hashes.sql | 15 + .../revive/rpc/src/block_info_provider.rs | 208 +++++++ substrate/frame/revive/rpc/src/cli.rs | 61 +- substrate/frame/revive/rpc/src/client.rs | 544 +++++++----------- substrate/frame/revive/rpc/src/eth-indexer.rs | 85 +++ substrate/frame/revive/rpc/src/lib.rs | 6 + .../frame/revive/rpc/src/receipt_provider.rs | 238 ++++++++ .../revive/rpc/src/receipt_provider/cache.rs | 148 +++++ .../revive/rpc/src/receipt_provider/db.rs | 159 +++++ 18 files changed, 1663 insertions(+), 373 deletions(-) create mode 100644 prdoc/pr_6836.prdoc create mode 100644 substrate/frame/revive/rpc/.gitignore create mode 100644 substrate/frame/revive/rpc/.sqlx/query-3e34e8e4bdeba2c5f2de915301d20958b338f9fef0d2575056f74f8c67bc3966.json create mode 100644 substrate/frame/revive/rpc/.sqlx/query-8a99a538df1ca7c194b8c573c9f77e451e9e4324ee5bd235fa130e7c00892c06.json create mode 100644 substrate/frame/revive/rpc/.sqlx/query-920b0500651656cf52d1394d9c1810cde33043000207872088fa8903fa3c0e9d.json create mode 100644 substrate/frame/revive/rpc/migrations/20241205165418_create_transaction_hashes.sql create mode 100644 substrate/frame/revive/rpc/src/block_info_provider.rs create mode 100644 substrate/frame/revive/rpc/src/eth-indexer.rs create mode 100644 substrate/frame/revive/rpc/src/receipt_provider.rs create mode 100644 substrate/frame/revive/rpc/src/receipt_provider/cache.rs create mode 100644 substrate/frame/revive/rpc/src/receipt_provider/db.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 68a0d7b552dc..8573f582e258 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -9,6 +9,7 @@ rustdocflags = [ CC_x86_64_unknown_linux_musl = { value = ".cargo/musl-gcc", force = true, relative = true } CXX_x86_64_unknown_linux_musl = { value = ".cargo/musl-g++", force = true, relative = true } CARGO_WORKSPACE_ROOT_DIR = { value = "", relative = true } +SQLX_OFFLINE = "true" [net] retry = 5 diff --git a/Cargo.lock b/Cargo.lock index 726c8f5a1885..2bae43979562 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1396,7 +1396,7 @@ dependencies = [ "futures-lite 2.3.0", "parking", "polling 3.4.0", - "rustix 0.38.21", + "rustix 0.38.42", "slab", "tracing", "windows-sys 0.52.0", @@ -1478,7 +1478,7 @@ dependencies = [ "cfg-if", "event-listener 5.3.1", "futures-lite 2.3.0", - "rustix 0.38.21", + "rustix 0.38.42", "tracing", ] @@ -1494,7 +1494,7 @@ dependencies = [ "cfg-if", "futures-core", "futures-io", - "rustix 0.38.21", + "rustix 0.38.42", "signal-hook-registry", "slab", "windows-sys 0.52.0", @@ -1592,6 +1592,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-take" version = "1.1.0" @@ -1880,6 +1889,9 @@ name = "bitflags" version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +dependencies = [ + "serde", +] [[package]] name = "bitvec" @@ -4391,6 +4403,21 @@ dependencies = [ "wasmtime-types", ] +[[package]] +name = "crc" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.3.2" @@ -5926,6 +5953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" dependencies = [ "const-oid", + "pem-rfc7468", "zeroize", ] @@ -6207,6 +6235,12 @@ dependencies = [ "litrs", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -6332,6 +6366,9 @@ name = "either" version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -6540,23 +6577,23 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.2" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b30f669a7961ef1631673d2766cc92f52d64f7ef354d4fe0ddfd30ed52f0f4f" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ - "errno-dragonfly", "libc", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] -name = "errno-dragonfly" -version = "0.1.2" +name = "etcetera" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" dependencies = [ - "cc", - "libc", + "cfg-if", + "home", + "windows-sys 0.48.0", ] [[package]] @@ -6753,9 +6790,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "fastrlp" @@ -6970,6 +7007,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -7819,7 +7867,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29f9df8a11882c4e3335eb2d18a0137c505d9ca927470b0cac9c6f0ae07d28f7" dependencies = [ - "rustix 0.38.21", + "rustix 0.38.42", "windows-sys 0.48.0", ] @@ -7888,6 +7936,17 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.12.3", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -7915,7 +7974,7 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" dependencies = [ - "fastrand 2.1.0", + "fastrand 2.3.0", "futures-core", "futures-io", "parking", @@ -8351,6 +8410,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.3.3" @@ -9082,7 +9150,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi 0.3.9", - "rustix 0.38.21", + "rustix 0.38.42", "windows-sys 0.48.0", ] @@ -9683,6 +9751,9 @@ name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +dependencies = [ + "spin 0.9.8", +] [[package]] name = "lazycell" @@ -9698,9 +9769,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.158" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libflate" @@ -10342,6 +10413,17 @@ dependencies = [ "libsecp256k1-core", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.12" @@ -10401,9 +10483,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" -version = "0.4.10" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "lioness" @@ -10685,6 +10767,16 @@ dependencies = [ "rawpointer", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "memchr" version = "2.7.4" @@ -10860,7 +10952,7 @@ dependencies = [ "c2-chacha", "curve25519-dalek 4.1.3", "either", - "hashlink", + "hashlink 0.8.4", "lioness", "log", "parking_lot 0.12.3", @@ -11542,6 +11634,23 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand", + "smallvec", + "zeroize", +] + [[package]] name = "num-complex" version = "0.4.4" @@ -14921,6 +15030,7 @@ dependencies = [ "sp-core 28.0.0", "sp-crypto-hashing 0.1.0", "sp-weights 27.0.0", + "sqlx", "static_init", "substrate-cli-test-utils", "substrate-prometheus-endpoint", @@ -16633,6 +16743,15 @@ dependencies = [ "serde", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "penpal-emulated-chain" version = "0.0.0" @@ -17007,6 +17126,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -20181,7 +20311,7 @@ dependencies = [ "cfg-if", "concurrent-queue", "pin-project-lite", - "rustix 0.38.21", + "rustix 0.38.42", "tracing", "windows-sys 0.52.0", ] @@ -20489,7 +20619,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix 0.38.21", + "rustix 0.38.42", ] [[package]] @@ -21022,11 +21152,11 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.4.1" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.6.0", ] [[package]] @@ -21684,6 +21814,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "rsa" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af6c4b23d99685a1408194da11270ef8e9809aff951cc70ec9b17350b087e474" +dependencies = [ + "const-oid", + "digest 0.10.7", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core 0.6.4", + "signature", + "spki", + "subtle 2.5.0", + "zeroize", +] + [[package]] name = "rstest" version = "0.18.2" @@ -21858,15 +22008,15 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.21" +version = "0.38.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" +checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" dependencies = [ "bitflags 2.6.0", "errno", "libc", - "linux-raw-sys 0.4.10", - "windows-sys 0.48.0", + "linux-raw-sys 0.4.14", + "windows-sys 0.59.0", ] [[package]] @@ -24591,6 +24741,9 @@ name = "smallvec" version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +dependencies = [ + "serde", +] [[package]] name = "smol" @@ -27962,6 +28115,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spinners" @@ -27984,6 +28140,210 @@ dependencies = [ "der", ] +[[package]] +name = "sqlformat" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" +dependencies = [ + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e" +dependencies = [ + "atoi", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener 5.3.1", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.14.5", + "hashlink 0.9.1", + "hex", + "indexmap 2.7.0", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2 0.10.8", + "smallvec", + "sqlformat", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657" +dependencies = [ + "proc-macro2 1.0.86", + "quote 1.0.37", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.87", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" +dependencies = [ + "dotenvy", + "either", + "heck 0.5.0", + "hex", + "once_cell", + "proc-macro2 1.0.86", + "quote 1.0.37", + "serde", + "serde_json", + "sha2 0.10.8", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 2.0.87", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.6.0", + "byteorder", + "bytes", + "crc", + "digest 0.10.7", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array 0.14.7", + "hex", + "hkdf", + "hmac 0.12.1", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand", + "rsa", + "serde", + "sha1", + "sha2 0.10.8", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.6.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac 0.12.1", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand", + "serde", + "serde_json", + "sha2 0.10.8", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680" +dependencies = [ + "atoi", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "tracing", + "url", +] + [[package]] name = "ss58-registry" version = "1.43.0" @@ -28311,6 +28671,17 @@ dependencies = [ "serde", ] +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.8.0" @@ -29289,15 +29660,15 @@ checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a" [[package]] name = "tempfile" -version = "3.8.1" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", - "fastrand 2.1.0", - "redox_syscall 0.4.1", - "rustix 0.38.21", - "windows-sys 0.48.0", + "fastrand 2.3.0", + "once_cell", + "rustix 0.38.42", + "windows-sys 0.59.0", ] [[package]] @@ -29326,7 +29697,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7" dependencies = [ - "rustix 0.38.21", + "rustix 0.38.42", "windows-sys 0.48.0", ] @@ -30291,6 +30662,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-segmentation" version = "1.11.0" @@ -30315,6 +30692,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -30558,6 +30941,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.95" @@ -31297,6 +31686,16 @@ dependencies = [ "westend-emulated-chain", ] +[[package]] +name = "whoami" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" +dependencies = [ + "redox_syscall 0.5.8", + "wasite", +] + [[package]] name = "wide" version = "0.7.11" diff --git a/prdoc/pr_6836.prdoc b/prdoc/pr_6836.prdoc new file mode 100644 index 000000000000..1de081bbaa40 --- /dev/null +++ b/prdoc/pr_6836.prdoc @@ -0,0 +1,17 @@ +title: '[pallet-revive-eth-rpc] persist eth transaction hash' +doc: +- audience: Runtime Dev + description: |- + Add an option to persist EVM transaction hash to a SQL db. + This make it possible to run a full archive ETH RPC node (assuming the substrate node is also a full archive node) + + Some queries such as eth_getTransactionByHash, eth_getBlockTransactionCountByHash, and other need to work with a transaction hash index, which is not available in Substrate and need to be stored by the eth-rpc proxy. + + The refactoring break down the Client into a `BlockInfoProvider` and `ReceiptProvider` + - BlockInfoProvider does not need any persistence data, as we can fetch all block info from the source substrate chain + - ReceiptProvider comes in two flavor, + - An in memory cache implementation - This is the one we had so far. + - A DB implementation - This one persist rows with the block_hash, the transaction_index and the transaction_hash, so that we can later fetch the block and extrinsic for that receipt and reconstruct the ReceiptInfo object. +crates: +- name: pallet-revive-eth-rpc + bump: minor diff --git a/substrate/frame/revive/rpc/.gitignore b/substrate/frame/revive/rpc/.gitignore new file mode 100644 index 000000000000..c7434965b841 --- /dev/null +++ b/substrate/frame/revive/rpc/.gitignore @@ -0,0 +1,3 @@ +*.db +*.db-wal +*.db-shm diff --git a/substrate/frame/revive/rpc/.sqlx/query-3e34e8e4bdeba2c5f2de915301d20958b338f9fef0d2575056f74f8c67bc3966.json b/substrate/frame/revive/rpc/.sqlx/query-3e34e8e4bdeba2c5f2de915301d20958b338f9fef0d2575056f74f8c67bc3966.json new file mode 100644 index 000000000000..fc1858a521ad --- /dev/null +++ b/substrate/frame/revive/rpc/.sqlx/query-3e34e8e4bdeba2c5f2de915301d20958b338f9fef0d2575056f74f8c67bc3966.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT COUNT(*) as count\n FROM transaction_hashes\n WHERE block_hash = ?\n ", + "describe": { + "columns": [ + { + "name": "count", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "3e34e8e4bdeba2c5f2de915301d20958b338f9fef0d2575056f74f8c67bc3966" +} diff --git a/substrate/frame/revive/rpc/.sqlx/query-8a99a538df1ca7c194b8c573c9f77e451e9e4324ee5bd235fa130e7c00892c06.json b/substrate/frame/revive/rpc/.sqlx/query-8a99a538df1ca7c194b8c573c9f77e451e9e4324ee5bd235fa130e7c00892c06.json new file mode 100644 index 000000000000..ac67147f464a --- /dev/null +++ b/substrate/frame/revive/rpc/.sqlx/query-8a99a538df1ca7c194b8c573c9f77e451e9e4324ee5bd235fa130e7c00892c06.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n\t\t\t\tINSERT INTO transaction_hashes (transaction_hash, block_hash, transaction_index)\n\t\t\t\tVALUES (?, ?, ?)\n\n\t\t\t\tON CONFLICT(transaction_hash) DO UPDATE SET\n\t\t\t\tblock_hash = EXCLUDED.block_hash,\n\t\t\t\ttransaction_index = EXCLUDED.transaction_index\n\t\t\t\t", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "8a99a538df1ca7c194b8c573c9f77e451e9e4324ee5bd235fa130e7c00892c06" +} diff --git a/substrate/frame/revive/rpc/.sqlx/query-920b0500651656cf52d1394d9c1810cde33043000207872088fa8903fa3c0e9d.json b/substrate/frame/revive/rpc/.sqlx/query-920b0500651656cf52d1394d9c1810cde33043000207872088fa8903fa3c0e9d.json new file mode 100644 index 000000000000..dcd8bc42f266 --- /dev/null +++ b/substrate/frame/revive/rpc/.sqlx/query-920b0500651656cf52d1394d9c1810cde33043000207872088fa8903fa3c0e9d.json @@ -0,0 +1,26 @@ +{ + "db_name": "SQLite", + "query": "\n\t\t\tSELECT block_hash, transaction_index\n\t\t\tFROM transaction_hashes\n\t\t\tWHERE transaction_hash = ?\n\t\t\t", + "describe": { + "columns": [ + { + "name": "block_hash", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "transaction_index", + "ordinal": 1, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false + ] + }, + "hash": "920b0500651656cf52d1394d9c1810cde33043000207872088fa8903fa3c0e9d" +} diff --git a/substrate/frame/revive/rpc/Cargo.toml b/substrate/frame/revive/rpc/Cargo.toml index 31b8f505dedc..70922f92cd54 100644 --- a/substrate/frame/revive/rpc/Cargo.toml +++ b/substrate/frame/revive/rpc/Cargo.toml @@ -7,11 +7,16 @@ license = "Apache-2.0" homepage.workspace = true repository.workspace = true description = "An Ethereum JSON-RPC server for pallet-revive." +default-run = "eth-rpc" [[bin]] name = "eth-rpc" path = "src/main.rs" +[[bin]] +name = "eth-indexer" +path = "src/eth-indexer.rs" + [[example]] name = "deploy" path = "examples/rust/deploy.rs" @@ -44,7 +49,9 @@ futures = { workspace = true, features = ["thread-pool"] } jsonrpsee = { workspace = true, features = ["full"] } thiserror = { workspace = true } sp-crypto-hashing = { workspace = true } -subxt = { workspace = true, default-features = true, features = ["reconnecting-rpc-client"] } +subxt = { workspace = true, default-features = true, features = [ + "reconnecting-rpc-client", +] } tokio = { workspace = true, features = ["full"] } codec = { workspace = true, features = ["derive"] } log = { workspace = true } @@ -62,6 +69,7 @@ subxt-signer = { workspace = true, optional = true, features = [ ] } hex = { workspace = true } ethabi = { version = "18.0.0" } +sqlx = { version = "0.8.2", features = ["macros", "runtime-tokio", "sqlite"] } [features] example = ["rlp", "subxt-signer"] diff --git a/substrate/frame/revive/rpc/examples/westend_local_network.toml b/substrate/frame/revive/rpc/examples/westend_local_network.toml index 28295db76133..76561be814ec 100644 --- a/substrate/frame/revive/rpc/examples/westend_local_network.toml +++ b/substrate/frame/revive/rpc/examples/westend_local_network.toml @@ -29,13 +29,9 @@ name = "asset-hub-westend-collator1" rpc_port = 9011 ws_port = 9944 command = "{{POLKADOT_PARACHAIN_BINARY}}" -args = [ - "-lparachain=debug,runtime::revive=debug", -] +args = ["-lparachain=debug,runtime::revive=debug"] [[parachains.collators]] name = "asset-hub-westend-collator2" command = "{{POLKADOT_PARACHAIN_BINARY}}" -args = [ - "-lparachain=debug,runtime::revive=debug", -] +args = ["-lparachain=debug,runtime::revive=debug"] diff --git a/substrate/frame/revive/rpc/migrations/20241205165418_create_transaction_hashes.sql b/substrate/frame/revive/rpc/migrations/20241205165418_create_transaction_hashes.sql new file mode 100644 index 000000000000..43405bea9d04 --- /dev/null +++ b/substrate/frame/revive/rpc/migrations/20241205165418_create_transaction_hashes.sql @@ -0,0 +1,15 @@ +-- Create DB: +-- DATABASE_URL="..." cargo sqlx database create +-- +-- Run migration: +-- DATABASE_URL="..." cargo sqlx migrate run +-- +-- Update compile time artifacts: +-- DATABASE_URL="..." cargo sqlx prepare +CREATE TABLE transaction_hashes ( + transaction_hash CHAR(64) NOT NULL PRIMARY KEY, + transaction_index INTEGER NOT NULL, + block_hash CHAR(64) NOT NULL +); + +CREATE INDEX idx_block_hash ON transaction_hashes (block_hash); diff --git a/substrate/frame/revive/rpc/src/block_info_provider.rs b/substrate/frame/revive/rpc/src/block_info_provider.rs new file mode 100644 index 000000000000..e1bb51e9fc04 --- /dev/null +++ b/substrate/frame/revive/rpc/src/block_info_provider.rs @@ -0,0 +1,208 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ + client::{SubstrateBlock, SubstrateBlockNumber}, + subxt_client::SrcChainConfig, + ClientError, LOG_TARGET, +}; +use sp_core::H256; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; +use subxt::{backend::legacy::LegacyRpcMethods, OnlineClient}; +use tokio::sync::RwLock; + +/// Provides information about blocks. +#[derive(Clone)] +pub struct BlockInfoProvider { + /// The shared in memory cache. + cache: Arc>>, + + /// The rpc client, used to fetch blocks not in the cache. + rpc: LegacyRpcMethods, + + /// The api client, used to fetch blocks not in the cache. + api: OnlineClient, +} + +impl BlockInfoProvider { + /// Create a new `BlockInfoProvider` with the given cache size, rpc client and api client. + pub fn new( + cache_size: usize, + api: OnlineClient, + rpc: LegacyRpcMethods, + ) -> Self { + Self { api, rpc, cache: Arc::new(RwLock::new(BlockCache::new(cache_size))) } + } + + /// Get a read access on the shared cache. + async fn cache(&self) -> tokio::sync::RwLockReadGuard<'_, BlockCache> { + self.cache.read().await + } + + /// Cache new block and return the pruned block hash. + pub async fn cache_block(&self, block: SubstrateBlock) -> Option { + let mut cache = self.cache.write().await; + cache.insert(block) + } + + /// Return the latest ingested block. + pub async fn latest_block(&self) -> Option> { + let cache = self.cache().await; + cache.buffer.back().cloned() + } + + /// Get block by block_number. + pub async fn block_by_number( + &self, + block_number: SubstrateBlockNumber, + ) -> Result>, ClientError> { + let cache = self.cache().await; + if let Some(block) = cache.blocks_by_number.get(&block_number).cloned() { + return Ok(Some(block)); + } + + let Some(hash) = self.rpc.chain_get_block_hash(Some(block_number.into())).await? else { + return Ok(None); + }; + + self.block_by_hash(&hash).await + } + + /// Get block by block hash. + pub async fn block_by_hash( + &self, + hash: &H256, + ) -> Result>, ClientError> { + let cache = self.cache().await; + if let Some(block) = cache.blocks_by_hash.get(hash).cloned() { + return Ok(Some(block)); + } + + match self.api.blocks().at(*hash).await { + Ok(block) => Ok(Some(Arc::new(block))), + Err(subxt::Error::Block(subxt::error::BlockError::NotFound(_))) => Ok(None), + Err(err) => Err(err.into()), + } + } +} + +/// The cache maintains a buffer of the last N blocks, +struct BlockCache { + /// The maximum buffer's size. + max_cache_size: usize, + + /// A double-ended queue of the last N blocks. + /// The most recent block is at the back of the queue, and the oldest block is at the front. + buffer: VecDeque>, + + /// A map of blocks by block number. + blocks_by_number: HashMap>, + + /// A map of blocks by block hash. + blocks_by_hash: HashMap>, +} + +/// Provides information about a block, +/// This is an abstratction on top of [`SubstrateBlock`] used to test the [`BlockCache`]. +trait BlockInfo { + /// Returns the block hash. + fn hash(&self) -> H256; + /// Returns the block number. + fn number(&self) -> SubstrateBlockNumber; +} + +impl BlockInfo for SubstrateBlock { + fn hash(&self) -> H256 { + SubstrateBlock::hash(self) + } + fn number(&self) -> u32 { + SubstrateBlock::number(self) + } +} + +impl BlockCache { + /// Create a new cache with the given maximum buffer size. + pub fn new(max_cache_size: usize) -> Self { + Self { + max_cache_size, + buffer: Default::default(), + blocks_by_number: Default::default(), + blocks_by_hash: Default::default(), + } + } + + /// Insert an entry into the cache, and prune the oldest entry if the cache is full. + pub fn insert(&mut self, block: B) -> Option { + let mut pruned_block_hash = None; + if self.buffer.len() >= self.max_cache_size { + if let Some(block) = self.buffer.pop_front() { + log::trace!(target: LOG_TARGET, "Pruning block: {}", block.number()); + let hash = block.hash(); + self.blocks_by_hash.remove(&hash); + self.blocks_by_number.remove(&block.number()); + pruned_block_hash = Some(hash); + } + } + + let block = Arc::new(block); + self.buffer.push_back(block.clone()); + self.blocks_by_number.insert(block.number(), block.clone()); + self.blocks_by_hash.insert(block.hash(), block); + pruned_block_hash + } +} + +#[cfg(test)] +mod test { + use super::*; + + struct MockBlock { + block_number: SubstrateBlockNumber, + block_hash: H256, + } + + impl BlockInfo for MockBlock { + fn hash(&self) -> H256 { + self.block_hash + } + + fn number(&self) -> u32 { + self.block_number + } + } + + #[test] + fn cache_insert_works() { + let mut cache = BlockCache::::new(2); + + let pruned = cache.insert(MockBlock { block_number: 1, block_hash: H256::from([1; 32]) }); + assert_eq!(pruned, None); + + let pruned = cache.insert(MockBlock { block_number: 2, block_hash: H256::from([2; 32]) }); + assert_eq!(pruned, None); + + let pruned = cache.insert(MockBlock { block_number: 3, block_hash: H256::from([3; 32]) }); + assert_eq!(pruned, Some(H256::from([1; 32]))); + + assert_eq!(cache.buffer.len(), 2); + assert_eq!(cache.blocks_by_number.len(), 2); + assert_eq!(cache.blocks_by_hash.len(), 2); + } +} diff --git a/substrate/frame/revive/rpc/src/cli.rs b/substrate/frame/revive/rpc/src/cli.rs index c0f81fcafd77..cda55b55f9ca 100644 --- a/substrate/frame/revive/rpc/src/cli.rs +++ b/substrate/frame/revive/rpc/src/cli.rs @@ -16,8 +16,9 @@ // limitations under the License. //! The Ethereum JSON-RPC server. use crate::{ - client::Client, EthRpcServer, EthRpcServerImpl, SystemHealthRpcServer, - SystemHealthRpcServerImpl, + client::{connect, Client}, + BlockInfoProvider, CacheReceiptProvider, DBReceiptProvider, EthRpcServer, EthRpcServerImpl, + ReceiptProvider, SystemHealthRpcServer, SystemHealthRpcServerImpl, }; use clap::Parser; use futures::{pin_mut, FutureExt}; @@ -27,6 +28,7 @@ use sc_service::{ config::{PrometheusConfig, RpcConfiguration}, start_rpc_servers, TaskManager, }; +use std::sync::Arc; // Default port if --prometheus-port is not specified const DEFAULT_PROMETHEUS_PORT: u16 = 9616; @@ -42,6 +44,21 @@ pub struct CliCommand { #[clap(long, default_value = "ws://127.0.0.1:9944")] pub node_rpc_url: String, + /// The maximum number of blocks to cache in memory. + #[clap(long, default_value = "256")] + pub cache_size: usize, + + /// The database used to store Ethereum transaction hashes. + /// This is only useful if the node needs to act as an archive node and respond to Ethereum RPC + /// querires for transactions that are not in the in memory cache. + #[clap(long)] + pub database_url: Option, + + /// If true, we will only read from the database and not write to it. + /// Only useful if `--database-url` is specified. + #[clap(long, default_value = "true")] + pub database_read_only: bool, + #[allow(missing_docs)] #[clap(flatten)] pub shared_params: SharedParams, @@ -78,7 +95,16 @@ fn init_logger(params: &SharedParams) -> anyhow::Result<()> { /// Start the JSON-RPC server using the given command line arguments. pub fn run(cmd: CliCommand) -> anyhow::Result<()> { - let CliCommand { rpc_params, prometheus_params, node_rpc_url, shared_params, .. } = cmd; + let CliCommand { + rpc_params, + prometheus_params, + node_rpc_url, + cache_size, + database_url, + database_read_only, + shared_params, + .. + } = cmd; #[cfg(not(test))] init_logger(&shared_params)?; @@ -110,19 +136,41 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { let tokio_runtime = sc_cli::build_runtime()?; let tokio_handle = tokio_runtime.handle(); - let signals = tokio_runtime.block_on(async { Signals::capture() })?; let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?; let essential_spawn_handle = task_manager.spawn_essential_handle(); let gen_rpc_module = || { let signals = tokio_runtime.block_on(async { Signals::capture() })?; - let fut = Client::from_url(&node_rpc_url, &essential_spawn_handle).fuse(); + let fut = async { + let (api, rpc_client, rpc) = connect(&node_rpc_url).await?; + let block_provider = BlockInfoProvider::new(cache_size, api.clone(), rpc.clone()); + let receipt_provider: Arc = + if let Some(database_url) = database_url.as_ref() { + Arc::new(( + CacheReceiptProvider::default(), + DBReceiptProvider::new( + database_url, + database_read_only, + block_provider.clone(), + ) + .await?, + )) + } else { + Arc::new(CacheReceiptProvider::default()) + }; + + let client = + Client::new(api, rpc_client, rpc, block_provider, receipt_provider).await?; + client.subscribe_and_cache_blocks(&essential_spawn_handle); + Ok::<_, crate::ClientError>(client) + } + .fuse(); pin_mut!(fut); match tokio_handle.block_on(signals.try_until_signal(fut)) { Ok(Ok(client)) => rpc_module(is_dev, client), Ok(Err(err)) => { - log::error!("Error connecting to the node at {node_rpc_url}: {err}"); + log::error!("Error initializing: {err:?}"); Err(sc_service::Error::Application(err.into())) }, Err(_) => Err(sc_service::Error::Application("Client connection interrupted".into())), @@ -142,6 +190,7 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { start_rpc_servers(&rpc_config, prometheus_registry, tokio_handle, gen_rpc_module, None)?; task_manager.keep_alive(rpc_server_handle); + let signals = tokio_runtime.block_on(async { Signals::capture() })?; tokio_runtime.block_on(signals.run_until_signal(task_manager.future().fuse()))?; Ok(()) } diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index 901c15e9756b..344186fefaf7 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -17,30 +17,23 @@ //! The client connects to the source substrate chain //! and is used by the rpc server to query and send transactions to the substrate chain. use crate::{ + extract_receipts_from_block, runtime::GAS_PRICE, subxt_client::{ - revive::{calls::types::EthTransact, events::ContractEmitted}, - runtime_types::pallet_revive::storage::ContractInfo, + revive::calls::types::EthTransact, runtime_types::pallet_revive::storage::ContractInfo, }, - LOG_TARGET, + BlockInfoProvider, ReceiptProvider, LOG_TARGET, }; -use futures::{stream, StreamExt}; use jsonrpsee::types::{error::CALL_EXECUTION_FAILED_CODE, ErrorObjectOwned}; use pallet_revive::{ - create1, evm::{ - Block, BlockNumberOrTag, BlockNumberOrTagOrHash, Bytes256, GenericTransaction, Log, - ReceiptInfo, SyncingProgress, SyncingStatus, TransactionSigned, H160, H256, U256, + Block, BlockNumberOrTag, BlockNumberOrTagOrHash, Bytes256, GenericTransaction, ReceiptInfo, + SyncingProgress, SyncingStatus, TransactionSigned, H160, H256, U256, }, EthTransactError, EthTransactInfo, }; -use sp_core::keccak_256; use sp_weights::Weight; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, - time::Duration, -}; +use std::{ops::ControlFlow, sync::Arc, time::Duration}; use subxt::{ backend::{ legacy::{rpc_methods::SystemHealth, LegacyRpcMethods}, @@ -54,11 +47,10 @@ use subxt::{ storage::Storage, Config, OnlineClient, }; -use subxt_client::transaction_payment::events::TransactionFeePaid; use thiserror::Error; -use tokio::sync::{watch::Sender, RwLock}; +use tokio::{sync::RwLock, try_join}; -use crate::subxt_client::{self, system::events::ExtrinsicSuccess, SrcChainConfig}; +use crate::subxt_client::{self, SrcChainConfig}; /// The substrate block type. pub type SubstrateBlock = subxt::blocks::Block>; @@ -75,29 +67,6 @@ pub type Shared = Arc>; /// The runtime balance type. pub type Balance = u128; -/// The cache maintains a buffer of the last N blocks, -#[derive(Default)] -struct BlockCache { - /// A double-ended queue of the last N blocks. - /// The most recent block is at the back of the queue, and the oldest block is at the front. - buffer: VecDeque>, - - /// A map of blocks by block number. - blocks_by_number: HashMap>, - - /// A map of blocks by block hash. - blocks_by_hash: HashMap>, - - /// A map of receipts by hash. - receipts_by_hash: HashMap, - - /// A map of Signed transaction by hash. - signed_tx_by_hash: HashMap, - - /// A map of receipt hashes by block hash. - tx_hashes_by_block_and_index: HashMap>, -} - /// Unwrap the original `jsonrpsee::core::client::Error::Call` error. fn unwrap_call_err(err: &subxt::error::RpcError) -> Option { use subxt::backend::rpc::reconnecting_rpc_client; @@ -167,6 +136,9 @@ pub enum ClientError { /// A [`RpcError`] wrapper error. #[error(transparent)] RpcError(#[from] RpcError), + /// A [`sqlx::Error`] wrapper error. + #[error(transparent)] + SqlxError(#[from] sqlx::Error), /// A [`codec::Error`] wrapper error. #[error(transparent)] CodecError(#[from] codec::Error), @@ -179,9 +151,18 @@ pub enum ClientError { /// The block hash was not found. #[error("hash not found")] BlockNotFound, + + #[error("No Ethereum extrinsic found")] + EthExtrinsicNotFound, /// The transaction fee could not be found #[error("transactionFeePaid event not found")] TxFeeNotFound, + /// Failed to decode a raw payload into a signed transaction. + #[error("Failed to decode a raw payload into a signed transaction")] + TxDecodingFailed, + /// Failed to recover eth address. + #[error("failed to recover eth address")] + RecoverEthAddressFailed, /// The cache is empty. #[error("cache is empty")] CacheEmpty, @@ -214,163 +195,18 @@ impl From for ErrorObjectOwned { } } -/// The number of recent blocks maintained by the cache. -/// For each block in the cache, we also store the EVM transaction receipts. -pub const CACHE_SIZE: usize = 256; - -impl BlockCache { - fn latest_block(&self) -> Option<&Arc> { - self.buffer.back() - } - - /// Insert an entry into the cache, and prune the oldest entry if the cache is full. - fn insert(&mut self, block: SubstrateBlock) { - if self.buffer.len() >= N { - if let Some(block) = self.buffer.pop_front() { - log::trace!(target: LOG_TARGET, "Pruning block: {}", block.number()); - let hash = block.hash(); - self.blocks_by_hash.remove(&hash); - self.blocks_by_number.remove(&block.number()); - if let Some(entries) = self.tx_hashes_by_block_and_index.remove(&hash) { - for hash in entries.values() { - self.receipts_by_hash.remove(hash); - } - } - } - } - - let block = Arc::new(block); - self.buffer.push_back(block.clone()); - self.blocks_by_number.insert(block.number(), block.clone()); - self.blocks_by_hash.insert(block.hash(), block); - } -} - /// A client connect to a node and maintains a cache of the last `CACHE_SIZE` blocks. #[derive(Clone)] pub struct Client { - /// The inner state of the client. - inner: Arc, - /// A watch channel to signal cache updates. - pub updates: tokio::sync::watch::Receiver<()>, -} - -/// The inner state of the client. -struct ClientInner { api: OnlineClient, rpc_client: ReconnectingRpcClient, rpc: LegacyRpcMethods, - cache: Shared>, + receipt_provider: Arc, + block_provider: BlockInfoProvider, chain_id: u64, max_block_weight: Weight, } -impl ClientInner { - /// Create a new client instance connecting to the substrate node at the given URL. - async fn from_url(url: &str) -> Result { - let rpc_client = ReconnectingRpcClient::builder() - .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) - .build(url.to_string()) - .await?; - - let api = OnlineClient::::from_rpc_client(rpc_client.clone()).await?; - let cache = Arc::new(RwLock::new(BlockCache::::default())); - - let rpc = LegacyRpcMethods::::new(RpcClient::new(rpc_client.clone())); - - let (chain_id, max_block_weight) = - tokio::try_join!(chain_id(&api), max_block_weight(&api))?; - - Ok(Self { api, rpc_client, rpc, cache, chain_id, max_block_weight }) - } - - /// Get the receipt infos from the extrinsics in a block. - async fn receipt_infos( - &self, - block: &SubstrateBlock, - ) -> Result, ClientError> { - // Get extrinsics from the block - let extrinsics = block.extrinsics().await?; - - // Filter extrinsics from pallet_revive - let extrinsics = extrinsics.iter().flat_map(|ext| { - let call = ext.as_extrinsic::().ok()??; - let transaction_hash = H256(keccak_256(&call.payload)); - let signed_tx = TransactionSigned::decode(&call.payload).ok()?; - let from = signed_tx.recover_eth_address().ok()?; - let tx_info = GenericTransaction::from_signed(signed_tx.clone(), Some(from)); - let contract_address = if tx_info.to.is_none() { - Some(create1(&from, tx_info.nonce.unwrap_or_default().try_into().ok()?)) - } else { - None - }; - - Some((from, signed_tx, tx_info, transaction_hash, contract_address, ext)) - }); - - // Map each extrinsic to a receipt - stream::iter(extrinsics) - .map(|(from, signed_tx, tx_info, transaction_hash, contract_address, ext)| async move { - let events = ext.events().await?; - let tx_fees = - events.find_first::()?.ok_or(ClientError::TxFeeNotFound)?; - - let gas_price = tx_info.gas_price.unwrap_or_default(); - let gas_used = (tx_fees.tip.saturating_add(tx_fees.actual_fee)) - .checked_div(gas_price.as_u128()) - .unwrap_or_default(); - - let success = events.has::()?; - let transaction_index = ext.index(); - let block_hash = block.hash(); - let block_number = block.number().into(); - - // get logs from ContractEmitted event - let logs = events.iter() - .filter_map(|event_details| { - let event_details = event_details.ok()?; - let event = event_details.as_event::().ok()??; - - Some(Log { - address: event.contract, - topics: event.topics, - data: Some(event.data.into()), - block_number: Some(block_number), - transaction_hash, - transaction_index: Some(transaction_index.into()), - block_hash: Some(block_hash), - log_index: Some(event_details.index().into()), - ..Default::default() - }) - }).collect(); - - - log::debug!(target: LOG_TARGET, "Adding receipt for tx hash: {transaction_hash:?} - block: {block_number:?}"); - let receipt = ReceiptInfo::new( - block_hash, - block_number, - contract_address, - from, - logs, - tx_info.to, - gas_price, - gas_used.into(), - success, - transaction_hash, - transaction_index.into(), - tx_info.r#type.unwrap_or_default() - ); - - Ok::<_, ClientError>((receipt.transaction_hash, (signed_tx, receipt))) - }) - .buffer_unordered(10) - .collect::>>() - .await - .into_iter() - .collect::, _>>() - } -} - /// Fetch the chain ID from the substrate chain. async fn chain_id(api: &OnlineClient) -> Result { let query = subxt_client::constants().revive().chain_id(); @@ -395,23 +231,181 @@ async fn extract_block_timestamp(block: &SubstrateBlock) -> Option { Some(ext.value.now / 1000) } +/// Connect to a node at the given URL, and return the underlying API, RPC client, and legacy RPC +/// clients. +pub async fn connect( + node_rpc_url: &str, +) -> Result< + (OnlineClient, ReconnectingRpcClient, LegacyRpcMethods), + ClientError, +> { + log::info!(target: LOG_TARGET, "Connecting to node at: {node_rpc_url} ..."); + let rpc_client = ReconnectingRpcClient::builder() + .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) + .build(node_rpc_url.to_string()) + .await?; + log::info!(target: LOG_TARGET, "Connected to node at: {node_rpc_url}"); + + let api = OnlineClient::::from_rpc_client(rpc_client.clone()).await?; + let rpc = LegacyRpcMethods::::new(RpcClient::new(rpc_client.clone())); + Ok((api, rpc_client, rpc)) +} + impl Client { /// Create a new client instance. - /// The client will subscribe to new blocks and maintain a cache of [`CACHE_SIZE`] blocks. - pub async fn from_url( - url: &str, - spawn_handle: &sc_service::SpawnEssentialTaskHandle, + pub async fn new( + api: OnlineClient, + rpc_client: ReconnectingRpcClient, + rpc: LegacyRpcMethods, + block_provider: BlockInfoProvider, + receipt_provider: Arc, ) -> Result { - log::info!(target: LOG_TARGET, "Connecting to node at: {url} ..."); - let inner: Arc = Arc::new(ClientInner::from_url(url).await?); - log::info!(target: LOG_TARGET, "Connected to node at: {url}"); + let (chain_id, max_block_weight) = + tokio::try_join!(chain_id(&api), max_block_weight(&api))?; + + Ok(Self { + api, + rpc_client, + rpc, + receipt_provider, + block_provider, + chain_id, + max_block_weight, + }) + } + + /// Subscribe to past blocks executing the callback for each block. + /// The subscription continues iterating past blocks until the closure returns + /// `ControlFlow::Break`. Blocks are iterated starting from the latest block and moving + /// backward. + #[allow(dead_code)] + async fn subscribe_past_blocks(&self, callback: F) -> Result<(), ClientError> + where + F: Fn(SubstrateBlock) -> Fut + Send + Sync, + Fut: std::future::Future, ClientError>> + Send, + { + log::info!(target: LOG_TARGET, "Subscribing to past blocks"); + let mut block = self.api.blocks().at_latest().await.inspect_err(|err| { + log::error!(target: LOG_TARGET, "Failed to fetch latest block: {err:?}"); + })?; + + loop { + let block_number = block.number(); + log::debug!(target: LOG_TARGET, "Processing block {block_number}"); + + let parent_hash = block.header().parent_hash; + let control_flow = callback(block).await.inspect_err(|err| { + log::error!(target: LOG_TARGET, "Failed to process block {block_number}: {err:?}"); + })?; + + match control_flow { + ControlFlow::Continue(_) => { + if block_number == 0 { + log::info!(target: LOG_TARGET, "All past blocks processed"); + return Ok(()); + } + block = self.api.blocks().at(parent_hash).await.inspect_err(|err| { + log::error!(target: LOG_TARGET, "Failed to fetch block at {parent_hash:?}: {err:?}"); + })?; + }, + ControlFlow::Break(_) => { + log::info!(target: LOG_TARGET, "Stopping past block subscription at {block_number}"); + return Ok(()); + }, + } + } + } + + /// Subscribe to new best blocks, and execute the async closure with + /// the extracted block and ethereum transactions + async fn subscribe_new_blocks(&self, callback: F) -> Result<(), ClientError> + where + F: Fn(SubstrateBlock) -> Fut + Send + Sync, + Fut: std::future::Future> + Send, + { + log::info!(target: LOG_TARGET, "Subscribing to new blocks"); + let mut block_stream = match self.api.blocks().subscribe_best().await { + Ok(s) => s, + Err(err) => { + log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}"); + return Err(err.into()); + }, + }; - let (tx, mut updates) = tokio::sync::watch::channel(()); + while let Some(block) = block_stream.next().await { + let block = match block { + Ok(block) => block, + Err(err) => { + if err.is_disconnected_will_reconnect() { + log::warn!( + target: LOG_TARGET, + "The RPC connection was lost and we may have missed a few blocks" + ); + continue; + } - spawn_handle.spawn("subscribe-blocks", None, Self::subscribe_blocks(inner.clone(), tx)); + log::error!(target: LOG_TARGET, "Failed to fetch block: {err:?}"); + return Err(err.into()); + }, + }; - updates.changed().await.expect("tx is not dropped"); - Ok(Self { inner, updates }) + log::debug!(target: LOG_TARGET, "Pushing block: {}", block.number()); + callback(block).await?; + } + + log::info!(target: LOG_TARGET, "Block subscription ended"); + Ok(()) + } + + /// Start the block subscription, and populate the block cache. + pub fn subscribe_and_cache_blocks(&self, spawn_handle: &sc_service::SpawnEssentialTaskHandle) { + let client = self.clone(); + spawn_handle.spawn("subscribe-blocks", None, async move { + let res = client + .subscribe_new_blocks(|block| async { + let receipts = extract_receipts_from_block(&block).await?; + + client.receipt_provider.insert(&block.hash(), &receipts).await; + if let Some(pruned) = client.block_provider.cache_block(block).await { + client.receipt_provider.remove(&pruned).await; + } + + Ok(()) + }) + .await; + + if let Err(err) = res { + log::error!(target: LOG_TARGET, "Block subscription error: {err:?}"); + } + }); + } + + /// Start the block subscription, and populate the block cache. + pub async fn subscribe_and_cache_receipts( + &self, + oldest_block: Option, + ) -> Result<(), ClientError> { + let new_blocks_fut = self.subscribe_new_blocks(|block| async move { + let receipts = extract_receipts_from_block(&block).await.inspect_err(|err| { + log::error!(target: LOG_TARGET, "Failed to extract receipts from block: {err:?}"); + })?; + self.receipt_provider.insert(&block.hash(), &receipts).await; + Ok(()) + }); + + let Some(oldest_block) = oldest_block else { return new_blocks_fut.await }; + + let old_blocks_fut = self.subscribe_past_blocks(|block| async move { + let receipts = extract_receipts_from_block(&block).await?; + self.receipt_provider.insert(&block.hash(), &receipts).await; + if block.number() == oldest_block { + Ok(ControlFlow::Break(())) + } else { + Ok(ControlFlow::Continue(())) + } + }); + + try_join!(new_blocks_fut, old_blocks_fut).map(|_| ()) } /// Expose the storage API. @@ -425,14 +419,14 @@ impl Client { (*block_number).try_into().map_err(|_| ClientError::ConversionFailed)?; let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?; - Ok(self.inner.api.storage().at(hash)) + Ok(self.api.storage().at(hash)) }, - BlockNumberOrTagOrHash::H256(hash) => Ok(self.inner.api.storage().at(*hash)), + BlockNumberOrTagOrHash::H256(hash) => Ok(self.api.storage().at(*hash)), BlockNumberOrTagOrHash::BlockTag(_) => { if let Some(block) = self.latest_block().await { - return Ok(self.inner.api.storage().at(block.hash())); + return Ok(self.api.storage().at(block.hash())); } - let storage = self.inner.api.storage().at_latest().await?; + let storage = self.api.storage().at_latest().await?; Ok(storage) }, } @@ -452,90 +446,24 @@ impl Client { (*block_number).try_into().map_err(|_| ClientError::ConversionFailed)?; let hash = self.get_block_hash(n).await?.ok_or(ClientError::BlockNotFound)?; - Ok(self.inner.api.runtime_api().at(hash)) + Ok(self.api.runtime_api().at(hash)) }, - BlockNumberOrTagOrHash::H256(hash) => Ok(self.inner.api.runtime_api().at(*hash)), + BlockNumberOrTagOrHash::H256(hash) => Ok(self.api.runtime_api().at(*hash)), BlockNumberOrTagOrHash::BlockTag(_) => { if let Some(block) = self.latest_block().await { - return Ok(self.inner.api.runtime_api().at(block.hash())); + return Ok(self.api.runtime_api().at(block.hash())); } - let api = self.inner.api.runtime_api().at_latest().await?; + let api = self.api.runtime_api().at_latest().await?; Ok(api) }, } } - /// Subscribe to new blocks and update the cache. - async fn subscribe_blocks(inner: Arc, tx: Sender<()>) { - log::info!(target: LOG_TARGET, "Subscribing to new blocks"); - let mut block_stream = match inner.as_ref().api.blocks().subscribe_best().await { - Ok(s) => s, - Err(err) => { - log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}"); - return; - }, - }; - - while let Some(block) = block_stream.next().await { - let block = match block { - Ok(block) => block, - Err(err) => { - if err.is_disconnected_will_reconnect() { - log::warn!( - target: LOG_TARGET, - "The RPC connection was lost and we may have missed a few blocks" - ); - continue; - } - - log::error!(target: LOG_TARGET, "Failed to fetch block: {err:?}"); - return; - }, - }; - - log::trace!(target: LOG_TARGET, "Pushing block: {}", block.number()); - let mut cache = inner.cache.write().await; - - let receipts = inner - .receipt_infos(&block) - .await - .inspect_err(|err| { - log::error!(target: LOG_TARGET, "Failed to get receipts: {err:?}"); - }) - .unwrap_or_default(); - - if !receipts.is_empty() { - let values = receipts - .iter() - .map(|(hash, (_, receipt))| (receipt.transaction_index, *hash)) - .collect::>(); - - cache.tx_hashes_by_block_and_index.insert(block.hash(), values); - - cache - .receipts_by_hash - .extend(receipts.iter().map(|(hash, (_, receipt))| (*hash, receipt.clone()))); - - cache.signed_tx_by_hash.extend( - receipts.iter().map(|(hash, (signed_tx, _))| (*hash, signed_tx.clone())), - ) - } - - cache.insert(block); - tx.send_replace(()); - } - - log::info!(target: LOG_TARGET, "Block subscription ended"); - } -} - -impl Client { /// Get the most recent block stored in the cache. pub async fn latest_block(&self) -> Option> { - let cache = self.inner.cache.read().await; - let block = cache.latest_block()?; - Some(block.clone()) + let block = self.block_provider.latest_block().await?; + Some(block) } /// Expose the transaction API. @@ -543,23 +471,22 @@ impl Client { &self, call: subxt::tx::DefaultPayload, ) -> Result { - let ext = self.inner.api.tx().create_unsigned(&call).map_err(ClientError::from)?; + let ext = self.api.tx().create_unsigned(&call).map_err(ClientError::from)?; let hash = ext.submit().await?; Ok(hash) } /// Get an EVM transaction receipt by hash. pub async fn receipt(&self, tx_hash: &H256) -> Option { - let cache = self.inner.cache.read().await; - cache.receipts_by_hash.get(tx_hash).cloned() + self.receipt_provider.receipt_by_hash(tx_hash).await } /// Get the syncing status of the chain. pub async fn syncing(&self) -> Result { - let health = self.inner.rpc.system_health().await?; + let health = self.rpc.system_health().await?; let status = if health.is_syncing { - let client = RpcClient::new(self.inner.rpc_client.clone()); + let client = RpcClient::new(self.rpc_client.clone()); let sync_state: sc_rpc::system::SyncState = client.request("system_syncState", Default::default()).await?; @@ -582,27 +509,23 @@ impl Client { block_hash: &H256, transaction_index: &U256, ) -> Option { - let cache = self.inner.cache.read().await; - let receipt_hash = - cache.tx_hashes_by_block_and_index.get(block_hash)?.get(transaction_index)?; - let receipt = cache.receipts_by_hash.get(receipt_hash)?; - Some(receipt.clone()) + self.receipt_provider + .receipt_by_block_hash_and_index(block_hash, transaction_index) + .await } pub async fn signed_tx_by_hash(&self, tx_hash: &H256) -> Option { - let cache = self.inner.cache.read().await; - cache.signed_tx_by_hash.get(tx_hash).cloned() + self.receipt_provider.signed_tx_by_hash(tx_hash).await } /// Get receipts count per block. pub async fn receipts_count_per_block(&self, block_hash: &SubstrateBlockHash) -> Option { - let cache = self.inner.cache.read().await; - cache.tx_hashes_by_block_and_index.get(block_hash).map(|v| v.len()) + self.receipt_provider.receipts_count_per_block(block_hash).await } /// Get the system health. pub async fn system_health(&self) -> Result { - let health = self.inner.rpc.system_health().await?; + let health = self.rpc.system_health().await?; Ok(health) } @@ -697,8 +620,8 @@ impl Client { /// Get the block number of the latest block. pub async fn block_number(&self) -> Result { - let cache = self.inner.cache.read().await; - let latest_block = cache.buffer.back().ok_or(ClientError::CacheEmpty)?; + let latest_block = + self.block_provider.latest_block().await.ok_or(ClientError::CacheEmpty)?; Ok(latest_block.number()) } @@ -707,13 +630,8 @@ impl Client { &self, block_number: SubstrateBlockNumber, ) -> Result, ClientError> { - let cache = self.inner.cache.read().await; - if let Some(block) = cache.blocks_by_number.get(&block_number) { - return Ok(Some(block.hash())); - } - - let hash = self.inner.rpc.chain_get_block_hash(Some(block_number.into())).await?; - Ok(hash) + let maybe_block = self.block_provider.block_by_number(block_number).await?; + Ok(maybe_block.map(|block| block.hash())) } /// Get a block for the specified hash or number. @@ -727,8 +645,8 @@ impl Client { self.block_by_number(n).await }, BlockNumberOrTag::BlockTag(_) => { - let cache = self.inner.cache.read().await; - Ok(cache.buffer.back().cloned()) + let block = self.block_provider.latest_block().await; + Ok(block) }, } } @@ -738,16 +656,7 @@ impl Client { &self, hash: &SubstrateBlockHash, ) -> Result>, ClientError> { - let cache = self.inner.cache.read().await; - if let Some(block) = cache.blocks_by_hash.get(hash) { - return Ok(Some(block.clone())); - } - - match self.inner.api.blocks().at(*hash).await { - Ok(block) => Ok(Some(Arc::new(block))), - Err(subxt::Error::Block(subxt::error::BlockError::NotFound(_))) => Ok(None), - Err(err) => Err(err.into()), - } + self.block_provider.block_by_hash(hash).await } /// Get a block by number @@ -755,21 +664,12 @@ impl Client { &self, block_number: SubstrateBlockNumber, ) -> Result>, ClientError> { - let cache = self.inner.cache.read().await; - if let Some(block) = cache.blocks_by_number.get(&block_number) { - return Ok(Some(block.clone())); - } - - let Some(hash) = self.get_block_hash(block_number).await? else { - return Ok(None); - }; - - self.block_by_hash(&hash).await + self.block_provider.block_by_number(block_number).await } /// Get the EVM block for the given hash. pub async fn evm_block(&self, block: Arc) -> Result { - let runtime_api = self.inner.api.runtime_api().at(block.hash()); + let runtime_api = self.api.runtime_api().at(block.hash()); let max_fee = Self::weight_to_fee(&runtime_api, self.max_block_weight()).await?; let gas_limit = U256::from(max_fee / GAS_PRICE as u128); @@ -811,11 +711,11 @@ impl Client { /// Get the chain ID. pub fn chain_id(&self) -> u64 { - self.inner.chain_id + self.chain_id } /// Get the Max Block Weight. pub fn max_block_weight(&self) -> Weight { - self.inner.max_block_weight + self.max_block_weight } } diff --git a/substrate/frame/revive/rpc/src/eth-indexer.rs b/substrate/frame/revive/rpc/src/eth-indexer.rs new file mode 100644 index 000000000000..1eab2b2f768c --- /dev/null +++ b/substrate/frame/revive/rpc/src/eth-indexer.rs @@ -0,0 +1,85 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//! The Ethereum JSON-RPC server. +use clap::Parser; +use pallet_revive_eth_rpc::{ + client::{connect, Client, SubstrateBlockNumber}, + BlockInfoProvider, DBReceiptProvider, ReceiptProvider, +}; +use sc_cli::SharedParams; +use std::sync::Arc; + +// Parsed command instructions from the command line +#[derive(Parser, Debug)] +#[clap(author, about, version)] +pub struct CliCommand { + /// The node url to connect to + #[clap(long, default_value = "ws://127.0.0.1:9944")] + pub node_rpc_url: String, + + /// If specified, indexing will walk back and index block until this block number. + #[clap(long)] + pub oldest_block: Option, + + /// The database used to store Ethereum transaction hashes. + #[clap(long)] + pub database_url: String, + + #[allow(missing_docs)] + #[clap(flatten)] + pub shared_params: SharedParams, +} + +/// Initialize the logger +#[cfg(not(test))] +fn init_logger(params: &SharedParams) -> anyhow::Result<()> { + let mut logger = sc_cli::LoggerBuilder::new(params.log_filters().join(",")); + logger + .with_log_reloading(params.enable_log_reloading) + .with_detailed_output(params.detailed_log_output); + + if let Some(tracing_targets) = ¶ms.tracing_targets { + let tracing_receiver = params.tracing_receiver.into(); + logger.with_profiling(tracing_receiver, tracing_targets); + } + + if params.disable_log_color { + logger.with_colors(false); + } + + logger.init()?; + Ok(()) +} + +#[tokio::main] +pub async fn main() -> anyhow::Result<()> { + let CliCommand { node_rpc_url, database_url, shared_params, oldest_block, .. } = + CliCommand::parse(); + + #[cfg(not(test))] + init_logger(&shared_params)?; + + let (api, rpc_client, rpc) = connect(&node_rpc_url).await?; + let block_provider = BlockInfoProvider::new(0, api.clone(), rpc.clone()); + let receipt_provider: Arc = + Arc::new(DBReceiptProvider::new(&database_url, false, block_provider.clone()).await?); + + let client = Client::new(api, rpc_client, rpc, block_provider, receipt_provider).await?; + client.subscribe_and_cache_receipts(oldest_block).await?; + + Ok(()) +} diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index ccd8bb043e90..f4f59b518c39 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -35,6 +35,12 @@ pub mod subxt_client; #[cfg(test)] mod tests; +mod block_info_provider; +pub use block_info_provider::*; + +mod receipt_provider; +pub use receipt_provider::*; + mod rpc_health; pub use rpc_health::*; diff --git a/substrate/frame/revive/rpc/src/receipt_provider.rs b/substrate/frame/revive/rpc/src/receipt_provider.rs new file mode 100644 index 000000000000..12f90bc341a0 --- /dev/null +++ b/substrate/frame/revive/rpc/src/receipt_provider.rs @@ -0,0 +1,238 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ + client::SubstrateBlock, + subxt_client::{ + revive::{calls::types::EthTransact, events::ContractEmitted}, + system::events::ExtrinsicSuccess, + transaction_payment::events::TransactionFeePaid, + SrcChainConfig, + }, + ClientError, LOG_TARGET, +}; +use futures::{stream, StreamExt}; +use jsonrpsee::core::async_trait; +use pallet_revive::{ + create1, + evm::{GenericTransaction, Log, ReceiptInfo, TransactionSigned, H256, U256}, +}; +use sp_core::keccak_256; +use tokio::join; + +mod cache; +pub use cache::CacheReceiptProvider; + +mod db; +pub use db::DBReceiptProvider; + +/// Provide means to store and retrieve receipts. +#[async_trait] +pub trait ReceiptProvider: Send + Sync { + /// Insert receipts into the provider. + async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]); + + /// Remove receipts with the given block hash. + async fn remove(&self, block_hash: &H256); + + /// Get the receipt for the given block hash and transaction index. + async fn receipt_by_block_hash_and_index( + &self, + block_hash: &H256, + transaction_index: &U256, + ) -> Option; + + /// Get the number of receipts per block. + async fn receipts_count_per_block(&self, block_hash: &H256) -> Option; + + /// Get the receipt for the given transaction hash. + async fn receipt_by_hash(&self, transaction_hash: &H256) -> Option; + + /// Get the signed transaction for the given transaction hash. + async fn signed_tx_by_hash(&self, transaction_hash: &H256) -> Option; +} + +#[async_trait] +impl ReceiptProvider for (Main, Fallback) { + async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { + join!(self.0.insert(block_hash, receipts), self.1.insert(block_hash, receipts)); + } + + async fn remove(&self, block_hash: &H256) { + join!(self.0.remove(block_hash), self.1.remove(block_hash)); + } + + async fn receipt_by_block_hash_and_index( + &self, + block_hash: &H256, + transaction_index: &U256, + ) -> Option { + if let Some(receipt) = + self.0.receipt_by_block_hash_and_index(block_hash, transaction_index).await + { + return Some(receipt); + } + + self.1.receipt_by_block_hash_and_index(block_hash, transaction_index).await + } + + async fn receipts_count_per_block(&self, block_hash: &H256) -> Option { + if let Some(count) = self.0.receipts_count_per_block(block_hash).await { + return Some(count); + } + self.1.receipts_count_per_block(block_hash).await + } + + async fn receipt_by_hash(&self, hash: &H256) -> Option { + if let Some(receipt) = self.0.receipt_by_hash(hash).await { + return Some(receipt); + } + self.1.receipt_by_hash(hash).await + } + + async fn signed_tx_by_hash(&self, hash: &H256) -> Option { + if let Some(tx) = self.0.signed_tx_by_hash(hash).await { + return Some(tx); + } + self.1.signed_tx_by_hash(hash).await + } +} + +/// Extract a [`TransactionSigned`] and a [`ReceiptInfo`] and from an extrinsic. +pub async fn extract_receipt_from_extrinsic( + block: &SubstrateBlock, + ext: subxt::blocks::ExtrinsicDetails>, + call: EthTransact, +) -> Result<(TransactionSigned, ReceiptInfo), ClientError> { + let events = ext.events().await?; + let tx_fees = events + .find_first::()? + .ok_or(ClientError::TxFeeNotFound) + .inspect_err( + |_| log::debug!(target: LOG_TARGET, "TransactionFeePaid not found in events for {}", block.number()), + )?; + let transaction_hash = H256(keccak_256(&call.payload)); + + let signed_tx = + TransactionSigned::decode(&call.payload).map_err(|_| ClientError::TxDecodingFailed)?; + let from = signed_tx + .recover_eth_address() + .map_err(|_| ClientError::RecoverEthAddressFailed)?; + + let tx_info = GenericTransaction::from_signed(signed_tx.clone(), Some(from)); + let gas_price = tx_info.gas_price.unwrap_or_default(); + let gas_used = (tx_fees.tip.saturating_add(tx_fees.actual_fee)) + .checked_div(gas_price.as_u128()) + .unwrap_or_default(); + + let success = events.has::()?; + let transaction_index = ext.index(); + let block_number = U256::from(block.number()); + let block_hash = block.hash(); + + // get logs from ContractEmitted event + let logs = events + .iter() + .filter_map(|event_details| { + let event_details = event_details.ok()?; + let event = event_details.as_event::().ok()??; + + Some(Log { + address: event.contract, + topics: event.topics, + data: Some(event.data.into()), + block_number: Some(block_number), + transaction_hash, + transaction_index: Some(transaction_index.into()), + block_hash: Some(block_hash), + log_index: Some(event_details.index().into()), + ..Default::default() + }) + }) + .collect(); + + let contract_address = if tx_info.to.is_none() { + Some(create1( + &from, + tx_info + .nonce + .unwrap_or_default() + .try_into() + .map_err(|_| ClientError::ConversionFailed)?, + )) + } else { + None + }; + + log::debug!(target: LOG_TARGET, "Adding receipt for tx hash: {transaction_hash:?} - block: + {block_number:?}"); + let receipt = ReceiptInfo::new( + block_hash, + block_number, + contract_address, + from, + logs, + tx_info.to, + gas_price, + gas_used.into(), + success, + transaction_hash, + transaction_index.into(), + tx_info.r#type.unwrap_or_default(), + ); + Ok((signed_tx, receipt)) +} + +/// Extract receipts from block. +pub async fn extract_receipts_from_block( + block: &SubstrateBlock, +) -> Result, ClientError> { + // Filter extrinsics from pallet_revive + let extrinsics = block.extrinsics().await.inspect_err(|err| { + log::debug!(target: LOG_TARGET, "Error fetching for #{:?} extrinsics: {err:?}", block.number()); + })?; + + let extrinsics = extrinsics.iter().flat_map(|ext| { + let call = ext.as_extrinsic::().ok()??; + Some((ext, call)) + }); + + stream::iter(extrinsics) + .map(|(ext, call)| async move { extract_receipt_from_extrinsic(block, ext, call).await }) + .buffer_unordered(10) + .collect::>>() + .await + .into_iter() + .collect::, _>>() +} + +/// Extract receipt from transaction +pub async fn extract_receipts_from_transaction( + block: &SubstrateBlock, + transaction_index: usize, +) -> Result<(TransactionSigned, ReceiptInfo), ClientError> { + let extrinsics = block.extrinsics().await?; + let ext = extrinsics + .iter() + .nth(transaction_index) + .ok_or(ClientError::EthExtrinsicNotFound)?; + + let call = ext + .as_extrinsic::()? + .ok_or_else(|| ClientError::EthExtrinsicNotFound)?; + extract_receipt_from_extrinsic(block, ext, call).await +} diff --git a/substrate/frame/revive/rpc/src/receipt_provider/cache.rs b/substrate/frame/revive/rpc/src/receipt_provider/cache.rs new file mode 100644 index 000000000000..39124929ec07 --- /dev/null +++ b/substrate/frame/revive/rpc/src/receipt_provider/cache.rs @@ -0,0 +1,148 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use super::ReceiptProvider; +use jsonrpsee::core::async_trait; +use pallet_revive::evm::{ReceiptInfo, TransactionSigned, H256, U256}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; + +/// A `[ReceiptProvider]` that caches receipts in memory. +#[derive(Clone, Default)] +pub struct CacheReceiptProvider { + cache: Arc>, +} + +impl CacheReceiptProvider { + /// Get a read access on the shared cache. + async fn cache(&self) -> tokio::sync::RwLockReadGuard<'_, ReceiptCache> { + self.cache.read().await + } +} + +#[async_trait] +impl ReceiptProvider for CacheReceiptProvider { + async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { + let mut cache = self.cache.write().await; + cache.insert(block_hash, receipts); + } + + async fn remove(&self, block_hash: &H256) { + let mut cache = self.cache.write().await; + cache.remove(block_hash); + } + + async fn receipt_by_block_hash_and_index( + &self, + block_hash: &H256, + transaction_index: &U256, + ) -> Option { + let cache = self.cache().await; + let receipt_hash = cache + .transaction_hashes_by_block_and_index + .get(block_hash)? + .get(transaction_index)?; + let receipt = cache.receipts_by_hash.get(receipt_hash)?; + Some(receipt.clone()) + } + + async fn receipts_count_per_block(&self, block_hash: &H256) -> Option { + let cache = self.cache().await; + cache.transaction_hashes_by_block_and_index.get(block_hash).map(|v| v.len()) + } + + async fn receipt_by_hash(&self, hash: &H256) -> Option { + let cache = self.cache().await; + cache.receipts_by_hash.get(hash).cloned() + } + + async fn signed_tx_by_hash(&self, hash: &H256) -> Option { + let cache = self.cache().await; + cache.signed_tx_by_hash.get(hash).cloned() + } +} + +#[derive(Default)] +struct ReceiptCache { + /// A map of receipts by transaction hash. + receipts_by_hash: HashMap, + + /// A map of Signed transaction by transaction hash. + signed_tx_by_hash: HashMap, + + /// A map of receipt hashes by block hash. + transaction_hashes_by_block_and_index: HashMap>, +} + +impl ReceiptCache { + /// Insert new receipts into the cache. + pub fn insert(&mut self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { + if !receipts.is_empty() { + let values = receipts + .iter() + .map(|(_, receipt)| (receipt.transaction_index, receipt.transaction_hash)) + .collect::>(); + + self.transaction_hashes_by_block_and_index.insert(*block_hash, values); + + self.receipts_by_hash.extend( + receipts.iter().map(|(_, receipt)| (receipt.transaction_hash, receipt.clone())), + ); + + self.signed_tx_by_hash.extend( + receipts + .iter() + .map(|(signed_tx, receipt)| (receipt.transaction_hash, signed_tx.clone())), + ) + } + } + + /// Remove entry from the cache. + pub fn remove(&mut self, hash: &H256) { + if let Some(entries) = self.transaction_hashes_by_block_and_index.remove(hash) { + for hash in entries.values() { + self.receipts_by_hash.remove(hash); + self.signed_tx_by_hash.remove(hash); + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn cache_insert_and_remove_works() { + let mut cache = ReceiptCache::default(); + + for i in 1u8..=3 { + let hash = H256::from([i; 32]); + cache.insert( + &hash, + &[( + TransactionSigned::default(), + ReceiptInfo { transaction_hash: hash, ..Default::default() }, + )], + ); + } + + cache.remove(&H256::from([1u8; 32])); + assert_eq!(cache.transaction_hashes_by_block_and_index.len(), 2); + assert_eq!(cache.receipts_by_hash.len(), 2); + assert_eq!(cache.signed_tx_by_hash.len(), 2); + } +} diff --git a/substrate/frame/revive/rpc/src/receipt_provider/db.rs b/substrate/frame/revive/rpc/src/receipt_provider/db.rs new file mode 100644 index 000000000000..4657bb20818b --- /dev/null +++ b/substrate/frame/revive/rpc/src/receipt_provider/db.rs @@ -0,0 +1,159 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use super::*; +use crate::BlockInfoProvider; +use jsonrpsee::core::async_trait; +use pallet_revive::evm::{ReceiptInfo, TransactionSigned}; +use sp_core::{H256, U256}; +use sqlx::SqlitePool; + +/// A `[ReceiptProvider]` that stores receipts in a SQLite database. +#[derive(Clone)] +pub struct DBReceiptProvider { + /// The SQLite database pool. + pool: SqlitePool, + /// The block provider used to fetch blocks, and reconstruct receipts. + block_provider: BlockInfoProvider, + /// weather or not we should write to the DB. + read_only: bool, +} + +impl DBReceiptProvider { + /// Create a new `DBReceiptProvider` with the given database URL and block provider. + pub async fn new( + database_url: &str, + read_only: bool, + block_provider: BlockInfoProvider, + ) -> Result { + let pool = SqlitePool::connect(database_url).await?; + Ok(Self { pool, block_provider, read_only }) + } +} + +#[async_trait] +impl ReceiptProvider for DBReceiptProvider { + async fn insert(&self, block_hash: &H256, receipts: &[(TransactionSigned, ReceiptInfo)]) { + if self.read_only { + return + } + + let block_hash_str = hex::encode(block_hash); + for (_, receipt) in receipts { + let transaction_hash = hex::encode(receipt.transaction_hash); + let transaction_index = receipt.transaction_index.as_u32() as i32; + let result = sqlx::query!( + r#" + INSERT INTO transaction_hashes (transaction_hash, block_hash, transaction_index) + VALUES (?, ?, ?) + + ON CONFLICT(transaction_hash) DO UPDATE SET + block_hash = EXCLUDED.block_hash, + transaction_index = EXCLUDED.transaction_index + "#, + transaction_hash, + block_hash_str, + transaction_index + ) + .execute(&self.pool) + .await; + + if let Err(err) = result { + log::error!( + "Error inserting transaction for block hash {block_hash:?}: {:?}", + err + ); + } + } + } + + async fn remove(&self, _block_hash: &H256) {} + + async fn receipts_count_per_block(&self, block_hash: &H256) -> Option { + let block_hash = hex::encode(block_hash); + let row = sqlx::query!( + r#" + SELECT COUNT(*) as count + FROM transaction_hashes + WHERE block_hash = ? + "#, + block_hash + ) + .fetch_one(&self.pool) + .await + .ok()?; + + Some(row.count as usize) + } + + async fn receipt_by_block_hash_and_index( + &self, + block_hash: &H256, + transaction_index: &U256, + ) -> Option { + let block = self.block_provider.block_by_hash(block_hash).await.ok()??; + let transaction_index: usize = transaction_index.as_usize(); // TODO: check for overflow + let (_, receipt) = + extract_receipts_from_transaction(&block, transaction_index).await.ok()?; + Some(receipt) + } + + async fn receipt_by_hash(&self, transaction_hash: &H256) -> Option { + let transaction_hash = hex::encode(transaction_hash); + let result = sqlx::query!( + r#" + SELECT block_hash, transaction_index + FROM transaction_hashes + WHERE transaction_hash = ? + "#, + transaction_hash + ) + .fetch_optional(&self.pool) + .await + .ok()??; + + let block_hash = result.block_hash.parse::().ok()?; + let transaction_index = result.transaction_index.try_into().ok()?; + + let block = self.block_provider.block_by_hash(&block_hash).await.ok()??; + let (_, receipt) = + extract_receipts_from_transaction(&block, transaction_index).await.ok()?; + Some(receipt) + } + + async fn signed_tx_by_hash(&self, transaction_hash: &H256) -> Option { + let transaction_hash = hex::encode(transaction_hash); + let result = sqlx::query!( + r#" + SELECT block_hash, transaction_index + FROM transaction_hashes + WHERE transaction_hash = ? + "#, + transaction_hash + ) + .fetch_optional(&self.pool) + .await + .ok()??; + + let block_hash = result.block_hash.parse::().ok()?; + let transaction_index = result.transaction_index.try_into().ok()?; + + let block = self.block_provider.block_by_hash(&block_hash).await.ok()??; + let (signed_tx, _) = + extract_receipts_from_transaction(&block, transaction_index).await.ok()?; + Some(signed_tx) + } +}