From 2cec83f26e0b9309387135ca43718af4fcd6f6b1 Mon Sep 17 00:00:00 2001 From: Artem Fomiuk <88630083+Artemka374@users.noreply.github.com> Date: Thu, 19 Sep 2024 17:40:22 +0300 Subject: [PATCH] feat(prover): Add endpoint to PJM to get queue reports (#2918) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Add `/queue_report` endpoint, which will get the data about queue and send it. ## Why ❔ To work with new autoscaler ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --------- Co-authored-by: EmilLuta --- core/lib/basic_types/src/prover_dal.rs | 3 +- .../config/src/configs/prover_job_monitor.rs | 2 + core/lib/config/src/testonly.rs | 1 + core/lib/env_config/src/prover_job_monitor.rs | 3 + .../src/proto/config/prover_job_monitor.proto | 1 + .../protobuf_config/src/prover_job_monitor.rs | 4 + etc/env/base/prover_job_monitor.toml | 1 + etc/env/file_based/general.yaml | 1 + prover/Cargo.lock | 29 ++- prover/Cargo.toml | 1 + .../crates/bin/prover_job_monitor/Cargo.toml | 3 + .../src/autoscaler_queue_reporter.rs | 176 ++++++++++++++++++ .../crates/bin/prover_job_monitor/src/lib.rs | 1 + .../crates/bin/prover_job_monitor/src/main.rs | 30 ++- .../witness_generator_queue_reporter.rs | 2 +- ...dbd694e1781e013247d090a280a1f894de464.json | 38 ++++ .../lib/prover_dal/src/fri_prover_dal.rs | 47 ++++- .../src/fri_witness_generator_dal.rs | 4 +- 18 files changed, 334 insertions(+), 13 deletions(-) create mode 100644 prover/crates/bin/prover_job_monitor/src/autoscaler_queue_reporter.rs create mode 100644 prover/crates/lib/prover_dal/.sqlx/query-97adb49780c9edde6a3cfda09dadbd694e1781e013247d090a280a1f894de464.json diff --git a/core/lib/basic_types/src/prover_dal.rs b/core/lib/basic_types/src/prover_dal.rs index 7eb671448608..36f6c89135a0 100644 --- a/core/lib/basic_types/src/prover_dal.rs +++ b/core/lib/basic_types/src/prover_dal.rs @@ -2,6 +2,7 @@ use std::{net::IpAddr, ops::Add, str::FromStr}; use chrono::{DateTime, Duration, NaiveDateTime, NaiveTime, Utc}; +use serde::{Deserialize, Serialize}; use strum::{Display, EnumString}; use crate::{ @@ -27,7 +28,7 @@ pub struct ExtendedJobCountStatistics { pub successful: usize, } -#[derive(Debug, Clone, Copy, Default)] +#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] pub struct JobCountStatistics { pub queued: usize, pub in_progress: usize, diff --git a/core/lib/config/src/configs/prover_job_monitor.rs b/core/lib/config/src/configs/prover_job_monitor.rs index c16b1db81b7a..d60a0e90c204 100644 --- a/core/lib/config/src/configs/prover_job_monitor.rs +++ b/core/lib/config/src/configs/prover_job_monitor.rs @@ -61,6 +61,8 @@ pub struct ProverJobMonitorConfig { /// The interval between runs for Witness Job Queuer. #[serde(default = "ProverJobMonitorConfig::default_witness_job_queuer_run_interval_ms")] pub witness_job_queuer_run_interval_ms: u64, + /// HTTP port of the ProverJobMonitor to send requests to. + pub http_port: u16, } impl ProverJobMonitorConfig { diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 4a2858b9cbfc..21141ddefff6 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -1113,6 +1113,7 @@ impl Distribution for Encod prover_queue_reporter_run_interval_ms: self.sample(rng), witness_generator_queue_reporter_run_interval_ms: self.sample(rng), witness_job_queuer_run_interval_ms: self.sample(rng), + http_port: self.sample(rng), } } } diff --git a/core/lib/env_config/src/prover_job_monitor.rs b/core/lib/env_config/src/prover_job_monitor.rs index 3a8f80473eb1..884ebecacbb8 100644 --- a/core/lib/env_config/src/prover_job_monitor.rs +++ b/core/lib/env_config/src/prover_job_monitor.rs @@ -31,6 +31,7 @@ mod tests { prover_queue_reporter_run_interval_ms: 10000, witness_generator_queue_reporter_run_interval_ms: 10000, witness_job_queuer_run_interval_ms: 10000, + http_port: 3074, } } @@ -55,6 +56,7 @@ mod tests { fn from_env_with_default() { let config = r#" PROVER_JOB_MONITOR_PROMETHEUS_PORT=3317 + PROVER_JOB_MONITOR_HTTP_PORT=3074 PROVER_JOB_MONITOR_MAX_DB_CONNECTIONS=9 "#; let mut lock = MUTEX.lock(); @@ -80,6 +82,7 @@ mod tests { PROVER_JOB_MONITOR_PROVER_QUEUE_REPORTER_RUN_INTERVAL_MS=10001 PROVER_JOB_MONITOR_WITNESS_GENERATOR_QUEUE_REPORTER_RUN_INTERVAL_MS=10001 PROVER_JOB_MONITOR_WITNESS_JOB_QUEUER_RUN_INTERVAL_MS=10001 + PROVER_JOB_MONITOR_HTTP_PORT=3074 "#; let mut lock = MUTEX.lock(); lock.set_env(config); diff --git a/core/lib/protobuf_config/src/proto/config/prover_job_monitor.proto b/core/lib/protobuf_config/src/proto/config/prover_job_monitor.proto index 7b505aa3bcfb..9aabf6e34832 100644 --- a/core/lib/protobuf_config/src/proto/config/prover_job_monitor.proto +++ b/core/lib/protobuf_config/src/proto/config/prover_job_monitor.proto @@ -17,4 +17,5 @@ message ProverJobMonitor { optional uint64 prover_queue_reporter_run_interval_ms = 12; // optional; ms optional uint64 witness_generator_queue_reporter_run_interval_ms = 13; // optional; ms optional uint64 witness_job_queuer_run_interval_ms = 14; // optional; ms + optional uint32 http_port = 15; // required; u32 } diff --git a/core/lib/protobuf_config/src/prover_job_monitor.rs b/core/lib/protobuf_config/src/prover_job_monitor.rs index a1c5a7c05995..a174d0882406 100644 --- a/core/lib/protobuf_config/src/prover_job_monitor.rs +++ b/core/lib/protobuf_config/src/prover_job_monitor.rs @@ -95,6 +95,9 @@ impl ProtoRepr for proto::ProverJobMonitor { .or_else(|| Some(Self::Type::default_witness_job_queuer_run_interval_ms())), ) .context("witness_job_queuer_run_interval_ms")?, + http_port: required(&self.http_port) + .and_then(|x| Ok((*x).try_into()?)) + .context("http_port")?, }) } @@ -126,6 +129,7 @@ impl ProtoRepr for proto::ProverJobMonitor { this.witness_generator_queue_reporter_run_interval_ms, ), witness_job_queuer_run_interval_ms: Some(this.witness_job_queuer_run_interval_ms), + http_port: Some(this.http_port.into()), } } } diff --git a/etc/env/base/prover_job_monitor.toml b/etc/env/base/prover_job_monitor.toml index 40cdf76b8b10..ce206c74ffde 100644 --- a/etc/env/base/prover_job_monitor.toml +++ b/etc/env/base/prover_job_monitor.toml @@ -13,3 +13,4 @@ proof_compressor_queue_reporter_run_interval_ms = 10000 prover_queue_reporter_run_interval_ms = 10000 witness_generator_queue_reporter_run_interval_ms = 10000 witness_job_queuer_run_interval_ms = 10000 +http_port = 3074 diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index ca9c3fd0c796..6a36f65c97c3 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -287,6 +287,7 @@ prover_job_monitor: prover_queue_reporter_run_interval_ms: 10000 witness_generator_queue_reporter_run_interval_ms: 10000 witness_job_queuer_run_interval_ms: 10000 + http_port: 3074 base_token_adjuster: diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 9a6c4b424232..5624403d7853 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -313,6 +313,8 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", + "hyper 1.3.1", + "hyper-util", "itoa", "matchit", "memchr", @@ -321,10 +323,15 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper 1.0.1", + "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -345,6 +352,7 @@ dependencies = [ "sync_wrapper 0.1.2", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -5112,9 +5120,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.203" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" +checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" dependencies = [ "serde_derive", ] @@ -5131,9 +5139,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.203" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" +checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2 1.0.85", "quote 1.0.36", @@ -5151,6 +5159,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -7928,13 +7946,16 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "axum", "clap 4.5.4", "ctrlc", + "serde", "tokio", "tracing", "vise", "zksync_config", "zksync_core_leftovers", + "zksync_db_connection", "zksync_prover_dal", "zksync_types", "zksync_utils", diff --git a/prover/Cargo.toml b/prover/Cargo.toml index 624661adc8dc..fd171b254d5a 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -19,6 +19,7 @@ categories = ["cryptography"] [workspace.dependencies] # Common dependencies anyhow = "1.0" +axum = "0.7.5" async-trait = "0.1" bincode = "1" chrono = "0.4.38" diff --git a/prover/crates/bin/prover_job_monitor/Cargo.toml b/prover/crates/bin/prover_job_monitor/Cargo.toml index 160d3a603e36..a4bf8765a946 100644 --- a/prover/crates/bin/prover_job_monitor/Cargo.toml +++ b/prover/crates/bin/prover_job_monitor/Cargo.toml @@ -16,6 +16,7 @@ zksync_prover_dal.workspace = true zksync_utils.workspace = true zksync_types.workspace = true zksync_config = { workspace = true, features = ["observability_ext"] } +zksync_db_connection.workspace = true vise.workspace = true @@ -25,3 +26,5 @@ clap = { workspace = true, features = ["derive"] } ctrlc = { workspace = true, features = ["termination"] } tracing.workspace = true async-trait.workspace = true +serde.workspace = true +axum.workspace = true diff --git a/prover/crates/bin/prover_job_monitor/src/autoscaler_queue_reporter.rs b/prover/crates/bin/prover_job_monitor/src/autoscaler_queue_reporter.rs new file mode 100644 index 000000000000..aff78409dbb3 --- /dev/null +++ b/prover/crates/bin/prover_job_monitor/src/autoscaler_queue_reporter.rs @@ -0,0 +1,176 @@ +use std::collections::HashMap; + +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; +use serde::{Deserialize, Serialize}; +use zksync_db_connection::error::DalError; +use zksync_prover_dal::{ConnectionPool, Prover, ProverDal}; +use zksync_types::{ + basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion, + prover_dal::JobCountStatistics, +}; + +#[derive(Debug, Clone)] +pub struct AutoscalerQueueReporter { + connection_pool: ConnectionPool, +} + +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct QueueReport { + pub basic_witness_jobs: JobCountStatistics, + pub leaf_witness_jobs: JobCountStatistics, + pub node_witness_jobs: JobCountStatistics, + pub recursion_tip_witness_jobs: JobCountStatistics, + pub scheduler_witness_jobs: JobCountStatistics, + pub prover_jobs: JobCountStatistics, + pub proof_compressor_jobs: JobCountStatistics, +} + +#[derive(Default, Debug, Serialize, Deserialize)] +pub struct VersionedQueueReport { + pub version: ProtocolSemanticVersion, + pub report: QueueReport, +} + +impl AutoscalerQueueReporter { + pub fn new(connection_pool: ConnectionPool) -> Self { + Self { connection_pool } + } + + pub async fn get_report(&self) -> Result>, ProcessorError> { + tracing::debug!("Received request to get queue report"); + + let mut result = HashMap::::new(); + + for round in AggregationRound::ALL_ROUNDS { + self.get_witness_jobs_report(round, &mut result).await?; + } + + self.get_prover_jobs_report(&mut result).await?; + self.get_proof_compressor_jobs_report(&mut result).await?; + + Ok(Json( + result + .into_iter() + .map(|(version, report)| VersionedQueueReport { version, report }) + .collect(), + )) + } + + async fn get_witness_jobs_report( + &self, + aggregation_round: AggregationRound, + state: &mut HashMap, + ) -> anyhow::Result<()> { + let stats = self + .connection_pool + .connection() + .await? + .fri_witness_generator_dal() + .get_witness_jobs_stats(aggregation_round) + .await; + + for (protocol_version, job_stats) in stats { + let report = state.entry(protocol_version).or_default(); + + match aggregation_round { + AggregationRound::BasicCircuits => report.basic_witness_jobs = job_stats, + AggregationRound::LeafAggregation => report.leaf_witness_jobs = job_stats, + AggregationRound::NodeAggregation => report.node_witness_jobs = job_stats, + AggregationRound::RecursionTip => report.recursion_tip_witness_jobs = job_stats, + AggregationRound::Scheduler => report.scheduler_witness_jobs = job_stats, + } + } + Ok(()) + } + + async fn get_prover_jobs_report( + &self, + state: &mut HashMap, + ) -> anyhow::Result<()> { + let stats = self + .connection_pool + .connection() + .await? + .fri_prover_jobs_dal() + .get_generic_prover_jobs_stats() + .await; + + for (protocol_version, stats) in stats { + let report = state.entry(protocol_version).or_default(); + + report.prover_jobs = stats; + } + Ok(()) + } + + async fn get_proof_compressor_jobs_report( + &self, + state: &mut HashMap, + ) -> anyhow::Result<()> { + let stats = self + .connection_pool + .connection() + .await? + .fri_proof_compressor_dal() + .get_jobs_stats() + .await; + + for (protocol_version, stats) in stats { + let report = state.entry(protocol_version).or_default(); + + report.proof_compressor_jobs = stats; + } + + Ok(()) + } +} + +pub fn get_queue_reporter_router(connection_pool: ConnectionPool) -> Router { + let autoscaler_queue_reporter = AutoscalerQueueReporter::new(connection_pool); + + Router::new().route( + "/queue_report", + get(move || async move { autoscaler_queue_reporter.get_report().await }), + ) +} + +pub enum ProcessorError { + Dal(DalError), + Custom(String), +} + +impl From for ProcessorError { + fn from(err: DalError) -> Self { + ProcessorError::Dal(err) + } +} + +impl From for ProcessorError { + fn from(err: anyhow::Error) -> Self { + ProcessorError::Custom(err.to_string()) + } +} + +impl IntoResponse for ProcessorError { + fn into_response(self) -> Response { + let (status_code, message) = match self { + ProcessorError::Dal(err) => { + tracing::error!("Sqlx error: {:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed getting data from database", + ) + } + ProcessorError::Custom(err) => { + tracing::error!("Custom error invoked: {:?}", &err); + (StatusCode::INTERNAL_SERVER_ERROR, "Internal error") + } + }; + (status_code, message).into_response() + } +} diff --git a/prover/crates/bin/prover_job_monitor/src/lib.rs b/prover/crates/bin/prover_job_monitor/src/lib.rs index 60d8be297cfe..0d6a0ebe104c 100644 --- a/prover/crates/bin/prover_job_monitor/src/lib.rs +++ b/prover/crates/bin/prover_job_monitor/src/lib.rs @@ -1,4 +1,5 @@ pub mod archiver; +pub mod autoscaler_queue_reporter; pub mod job_requeuer; pub(crate) mod metrics; pub mod queue_reporter; diff --git a/prover/crates/bin/prover_job_monitor/src/main.rs b/prover/crates/bin/prover_job_monitor/src/main.rs index 734a4bac38a2..9195b92882dd 100644 --- a/prover/crates/bin/prover_job_monitor/src/main.rs +++ b/prover/crates/bin/prover_job_monitor/src/main.rs @@ -1,3 +1,5 @@ +use std::{future::IntoFuture, net::SocketAddr}; + use anyhow::Context as _; use clap::Parser; use tokio::{ @@ -12,6 +14,7 @@ use zksync_core_leftovers::temp_config_store::{load_database_secrets, load_gener use zksync_prover_dal::{ConnectionPool, Prover}; use zksync_prover_job_monitor::{ archiver::{GpuProverArchiver, ProverJobsArchiver}, + autoscaler_queue_reporter::get_queue_reporter_router, job_requeuer::{ProofCompressorJobRequeuer, ProverJobRequeuer, WitnessGeneratorJobRequeuer}, queue_reporter::{ ProofCompressorQueueReporter, ProverQueueReporter, WitnessGeneratorQueueReporter, @@ -85,21 +88,42 @@ async fn main() -> anyhow::Result<()> { let mut tasks = vec![tokio::spawn(exporter_config.run(stop_receiver.clone()))]; tasks.extend(get_tasks( - connection_pool, - prover_job_monitor_config, + connection_pool.clone(), + prover_job_monitor_config.clone(), proof_compressor_config, prover_config, witness_generator_config, prover_group_config, - stop_receiver, + stop_receiver.clone(), )?); let mut tasks = ManagedTasks::new(tasks); + let bind_address = SocketAddr::from(([0, 0, 0, 0], prover_job_monitor_config.http_port)); + + tracing::info!("Starting PJM server on {bind_address}"); + + let listener = tokio::net::TcpListener::bind(bind_address) + .await + .with_context(|| format!("Failed binding PJM server to {bind_address}"))?; + + let mut receiver = stop_receiver.clone(); + let app = axum::serve(listener, get_queue_reporter_router(connection_pool)) + .with_graceful_shutdown(async move { + if receiver.changed().await.is_err() { + tracing::warn!( + "Stop signal sender for PJM server was dropped without sending a signal" + ); + } + tracing::info!("Stop signal received, PJM server is shutting down"); + }) + .into_future(); + tokio::select! { _ = tasks.wait_single() => {}, _ = stop_signal_receiver => { tracing::info!("Stop signal received, shutting down"); } + _ = app => {} } stop_sender.send(true).ok(); tasks.complete(graceful_shutdown_timeout).await; diff --git a/prover/crates/bin/prover_job_monitor/src/queue_reporter/witness_generator_queue_reporter.rs b/prover/crates/bin/prover_job_monitor/src/queue_reporter/witness_generator_queue_reporter.rs index 5f507a753649..914f2e9ca856 100644 --- a/prover/crates/bin/prover_job_monitor/src/queue_reporter/witness_generator_queue_reporter.rs +++ b/prover/crates/bin/prover_job_monitor/src/queue_reporter/witness_generator_queue_reporter.rs @@ -58,7 +58,7 @@ impl Task for WitnessGeneratorQueueReporter { .fri_witness_generator_dal() .get_witness_jobs_stats(round) .await; - for ((round, semantic_protocol_version), job_stats) in stats { + for (semantic_protocol_version, job_stats) in stats { Self::emit_metrics_for_round(round, semantic_protocol_version, &job_stats); } } diff --git a/prover/crates/lib/prover_dal/.sqlx/query-97adb49780c9edde6a3cfda09dadbd694e1781e013247d090a280a1f894de464.json b/prover/crates/lib/prover_dal/.sqlx/query-97adb49780c9edde6a3cfda09dadbd694e1781e013247d090a280a1f894de464.json new file mode 100644 index 000000000000..ce9e492a7d4a --- /dev/null +++ b/prover/crates/lib/prover_dal/.sqlx/query-97adb49780c9edde6a3cfda09dadbd694e1781e013247d090a280a1f894de464.json @@ -0,0 +1,38 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n protocol_version AS \"protocol_version!\",\n protocol_version_patch AS \"protocol_version_patch!\",\n COUNT(*) FILTER (WHERE status = 'queued') as queued,\n COUNT(*) FILTER (WHERE status = 'in_progress') as in_progress\n FROM\n prover_jobs_fri\n WHERE\n status IN ('queued', 'in_progress')\n AND protocol_version IS NOT NULL\n GROUP BY\n protocol_version,\n protocol_version_patch\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "protocol_version!", + "type_info": "Int4" + }, + { + "ordinal": 1, + "name": "protocol_version_patch!", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "queued", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "in_progress", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + true, + false, + null, + null + ] + }, + "hash": "97adb49780c9edde6a3cfda09dadbd694e1781e013247d090a280a1f894de464" +} diff --git a/prover/crates/lib/prover_dal/src/fri_prover_dal.rs b/prover/crates/lib/prover_dal/src/fri_prover_dal.rs index 4e68154290da..1b6c43f4c177 100644 --- a/prover/crates/lib/prover_dal/src/fri_prover_dal.rs +++ b/prover/crates/lib/prover_dal/src/fri_prover_dal.rs @@ -6,8 +6,10 @@ use zksync_basic_types::{ AggregationRound, CircuitIdRoundTuple, CircuitProverStatsEntry, ProtocolVersionedCircuitProverStats, }, - protocol_version::{ProtocolSemanticVersion, ProtocolVersionId}, - prover_dal::{FriProverJobMetadata, ProverJobFriInfo, ProverJobStatus, StuckJobs}, + protocol_version::{ProtocolSemanticVersion, ProtocolVersionId, VersionPatch}, + prover_dal::{ + FriProverJobMetadata, JobCountStatistics, ProverJobFriInfo, ProverJobStatus, StuckJobs, + }, L1BatchNumber, }; use zksync_db_connection::{ @@ -445,6 +447,47 @@ impl FriProverDal<'_, '_> { } } + pub async fn get_generic_prover_jobs_stats( + &mut self, + ) -> HashMap { + { + sqlx::query!( + r#" + SELECT + protocol_version AS "protocol_version!", + protocol_version_patch AS "protocol_version_patch!", + COUNT(*) FILTER (WHERE status = 'queued') as queued, + COUNT(*) FILTER (WHERE status = 'in_progress') as in_progress + FROM + prover_jobs_fri + WHERE + status IN ('queued', 'in_progress') + AND protocol_version IS NOT NULL + GROUP BY + protocol_version, + protocol_version_patch + "# + ) + .fetch_all(self.storage.conn()) + .await + .unwrap() + .into_iter() + .map(|row| { + let protocol_semantic_version = ProtocolSemanticVersion::new( + ProtocolVersionId::try_from(row.protocol_version as u16).unwrap(), + VersionPatch(row.protocol_version_patch as u32), + ); + let key = protocol_semantic_version; + let value = JobCountStatistics { + queued: row.queued.unwrap() as usize, + in_progress: row.in_progress.unwrap() as usize, + }; + (key, value) + }) + .collect() + } + } + pub async fn min_unproved_l1_batch_number(&mut self) -> HashMap<(u8, u8), L1BatchNumber> { { sqlx::query!( diff --git a/prover/crates/lib/prover_dal/src/fri_witness_generator_dal.rs b/prover/crates/lib/prover_dal/src/fri_witness_generator_dal.rs index 9958527a98b0..791038be0bb8 100644 --- a/prover/crates/lib/prover_dal/src/fri_witness_generator_dal.rs +++ b/prover/crates/lib/prover_dal/src/fri_witness_generator_dal.rs @@ -1378,7 +1378,7 @@ impl FriWitnessGeneratorDal<'_, '_> { pub async fn get_witness_jobs_stats( &mut self, aggregation_round: AggregationRound, - ) -> HashMap<(AggregationRound, ProtocolSemanticVersion), JobCountStatistics> { + ) -> HashMap { let table_name = Self::input_table_name_for(aggregation_round); let sql = format!( r#" @@ -1407,7 +1407,7 @@ impl FriWitnessGeneratorDal<'_, '_> { .unwrap(), VersionPatch(row.get::("protocol_version_patch") as u32), ); - let key = (aggregation_round, protocol_semantic_version); + let key = protocol_semantic_version; let value = JobCountStatistics { queued: row.get::("queued") as usize, in_progress: row.get::("in_progress") as usize,