From 3e17cbea9ce9ad438e935b990fd61f60b2a53f7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT=20/=20PALO-IT?= Date: Fri, 17 Nov 2023 10:38:27 +0100 Subject: [PATCH 1/4] [wip] message list --- .../src/database/provider/message.rs | 94 ++++++++++++ .../src/database/provider/mod.rs | 2 + .../src/dependency_injection/builder.rs | 37 ++++- .../src/services/http_message.rs | 145 ++++++++++++++++++ mithril-aggregator/src/services/mod.rs | 2 + 5 files changed, 273 insertions(+), 7 deletions(-) create mode 100644 mithril-aggregator/src/database/provider/message.rs create mode 100644 mithril-aggregator/src/services/http_message.rs diff --git a/mithril-aggregator/src/database/provider/message.rs b/mithril-aggregator/src/database/provider/message.rs new file mode 100644 index 00000000000..36d99934589 --- /dev/null +++ b/mithril-aggregator/src/database/provider/message.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use mithril_common::{ + messages::{ + CertificateListItemMessage, CertificateListItemMessageMetadata, CertificateMessage, + CertificateMetadataMessagePart, + }, + StdResult, +}; +use sqlite::ConnectionWithFullMutex; + +use super::{CertificateRecord, CertificateRecordProvider}; + +pub struct CertificateMessageRepository { + connection: Arc, +} + +impl CertificateMessageRepository { + pub fn new(connection: Arc) -> Self { + Self { connection } + } +} + +impl From for CertificateMessage { + fn from(value: CertificateRecord) -> Self { + let metadata = CertificateMetadataMessagePart { + protocol_version: value.protocol_version, + protocol_parameters: value.protocol_parameters, + initiated_at: value.initiated_at, + sealed_at: value.sealed_at, + signers: value.signers, + }; + let (multi_signature, genesis_signature) = if value.parent_certificate_id.is_none() { + (String::new(), value.signature) + } else { + (value.signature, String::new()) + }; + + CertificateMessage { + hash: value.certificate_id, + previous_hash: value.parent_certificate_id.unwrap_or_else(|| String::new()), + beacon: value.beacon, + metadata, + protocol_message: value.protocol_message, + signed_message: value.message, + aggregate_verification_key: value.aggregate_verification_key, + multi_signature, + genesis_signature, + } + } +} + +impl From for CertificateListItemMessage { + fn from(value: CertificateRecord) -> Self { + let metadata = CertificateListItemMessageMetadata { + protocol_version: value.protocol_version, + protocol_parameters: value.protocol_parameters, + initiated_at: value.initiated_at, + sealed_at: value.sealed_at, + total_signers: value.signers.len(), + }; + let (multi_signature, genesis_signature) = if value.parent_certificate_id.is_none() { + (String::new(), value.signature) + } else { + (value.signature, String::new()) + }; + + CertificateListItemMessage { + hash: value.certificate_id, + previous_hash: value.parent_certificate_id.unwrap_or_else(|| String::new()), + beacon: value.beacon, + metadata, + protocol_message: value.protocol_message, + signed_message: value.message, + aggregate_verification_key: value.aggregate_verification_key, + } + } +} + +impl CertificateMessageRepository { + pub async fn get_certificate(&self, hash: &str) -> StdResult> { + let provider = CertificateRecordProvider::new(&self.connection); + let mut cursor = provider.get_by_certificate_id(hash)?; + + Ok(cursor.next().map(|v| v.into())) + } + + pub async fn get_last(&self, limit: usize) -> StdResult> { + let provider = CertificateRecordProvider::new(&self.connection); + let cursor = provider.get_all()?; + + Ok(cursor.into_iter().take(limit).map(|v| v.into()).collect()) + } +} diff --git a/mithril-aggregator/src/database/provider/mod.rs b/mithril-aggregator/src/database/provider/mod.rs index 271a37f2634..6f24163cf7e 100644 --- a/mithril-aggregator/src/database/provider/mod.rs +++ b/mithril-aggregator/src/database/provider/mod.rs @@ -1,6 +1,7 @@ //! Aggregator related database providers mod certificate; mod epoch_setting; +mod message; mod open_message; mod signed_entity; mod signer; @@ -12,6 +13,7 @@ mod test_helper; pub use certificate::*; pub use epoch_setting::*; +pub use message::*; pub use open_message::*; pub use signed_entity::*; pub use signer::*; diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 69b7eb0f083..181215714af 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -45,16 +45,16 @@ use crate::{ }, configuration::ExecutionEnvironment, database::provider::{ - CertificateRepository, EpochSettingStore, OpenMessageRepository, SignedEntityStoreAdapter, - SignedEntityStorer, SignerRegistrationStore, SignerStore, SingleSignatureRepository, - StakePoolStore, + CertificateMessageRepository, CertificateRepository, EpochSettingStore, + OpenMessageRepository, SignedEntityStoreAdapter, SignedEntityStorer, + SignerRegistrationStore, SignerStore, SingleSignatureRepository, StakePoolStore, }, event_store::{EventMessage, EventStore, TransmitterService}, http_server::routes::router, services::{ - CertifierService, MithrilCertifierService, MithrilEpochService, MithrilSignedEntityService, - MithrilStakeDistributionService, MithrilTickerService, SignedEntityService, - StakeDistributionService, TickerService, + CertifierService, HttpMessageService, MithrilCertifierService, MithrilEpochService, + MithrilHttpMessageService, MithrilSignedEntityService, MithrilStakeDistributionService, + MithrilTickerService, SignedEntityService, StakeDistributionService, TickerService, }, tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter}, AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore, @@ -185,6 +185,9 @@ pub struct DependenciesBuilder { /// Signed Entity storer pub signed_entity_storer: Option>, + + /// HTTP Message service + pub http_message_service: Option>, } impl DependenciesBuilder { @@ -225,6 +228,7 @@ impl DependenciesBuilder { certifier_service: None, epoch_service: None, signed_entity_storer: None, + http_message_service: None, } } @@ -1208,7 +1212,26 @@ impl DependenciesBuilder { Ok(self.certifier_service.as_ref().cloned().unwrap()) } - /// Remove the dependencies builder from memory to release Arc. + /// build HTTP message service + pub async fn build_http_message_service(&mut self) -> Result> { + let certificate_message_repository = Arc::new(CertificateMessageRepository::new( + self.get_sqlite_connection().await?, + )); + let service = MithrilHttpMessageService::new(certificate_message_repository); + + Ok(Arc::new(service)) + } + + /// [HttpMessageService] service + pub async fn get_http_message_service(&mut self) -> Result> { + if self.http_message_service.is_none() { + self.http_message_service = Some(self.build_http_message_service().await?); + } + + Ok(self.http_message_service.as_ref().cloned().unwrap()) + } + + /// Remove the dependencies builder from memory to release Arc instances. pub async fn vanish(self) { self.drop_sqlite_connection().await; } diff --git a/mithril-aggregator/src/services/http_message.rs b/mithril-aggregator/src/services/http_message.rs new file mode 100644 index 00000000000..1a4967ec891 --- /dev/null +++ b/mithril-aggregator/src/services/http_message.rs @@ -0,0 +1,145 @@ +//! This service is responsible of provinding HTTP server with messages as fast as possible. + +use std::sync::Arc; + +use async_trait::async_trait; +use thiserror::Error; + +use mithril_common::{ + messages::{CertificateListMessage, CertificateMessage, CertificatePendingMessage}, + StdResult, +}; + +use crate::database::provider::CertificateMessageRepository; + +/// Error related to the [HttpMessageService] +#[derive(Debug, Error)] +pub enum HttpMessageServiceError { + /// There is no current PendingCertificate + #[error("There is no current pending certificate.")] + PendingCertificateDoesNotExist, +} +/// HTTP Message service trait. +#[async_trait] +pub trait HttpMessageService { + /// Return the message representation of a certificate if it exists. + async fn get_certificate( + &self, + certificate_hash: &str, + ) -> StdResult>; + + /// Return the message representation of the last N certificates + async fn get_last_certificates(&self, limit: usize) -> StdResult; + + /// Return the message representation of the current pending certificate + async fn get_pending_certificate(&self) -> StdResult; +} + +pub struct MithrilHttpMessageService { + certificate_message_repository: Arc, +} + +impl MithrilHttpMessageService { + pub fn new(certificate_message_repository: Arc) -> Self { + Self { + certificate_message_repository, + } + } +} + +#[async_trait] +impl HttpMessageService for MithrilHttpMessageService { + async fn get_certificate( + &self, + certificate_hash: &str, + ) -> StdResult> { + self.certificate_message_repository + .get_certificate(certificate_hash) + .await + } + + async fn get_last_certificates(&self, limit: usize) -> StdResult { + self.certificate_message_repository.get_last(limit).await + } + + async fn get_pending_certificate(&self) -> StdResult { + todo!() + } +} + +#[cfg(test)] +mod tests { + use mithril_common::{entities::Beacon, test_utils::MithrilFixtureBuilder}; + + use crate::{dependency_injection::DependenciesBuilder, Configuration}; + + use super::*; + + #[tokio::test] + async fn get_no_certificate() { + // setup + let configuration = Configuration::new_sample(); + let mut dep_builder = DependenciesBuilder::new(configuration); + let service = dep_builder.get_http_message_service().await.unwrap(); + + // test + let certificate_hash = "whatever"; + let certficate_message = service.get_certificate(certificate_hash).await.unwrap(); + assert!(certficate_message.is_none()); + } + + #[tokio::test] + async fn get_certificate() { + // setup + let configuration = Configuration::new_sample(); + let mut dep_builder = DependenciesBuilder::new(configuration); + let service = dep_builder.get_http_message_service().await.unwrap(); + let beacon = Beacon::new("devnet".to_string(), 3, 1); + let fixture = MithrilFixtureBuilder::default().with_signers(3).build(); + let genesis_beacon = Beacon { + epoch: beacon.epoch - 1, + ..beacon.clone() + }; + let genesis_certificate = fixture.create_genesis_certificate(&genesis_beacon); + dep_builder + .get_certificate_repository() + .await + .unwrap() + .create_certificate(genesis_certificate.clone()) + .await + .unwrap(); + + // test + let certficate_message = service + .get_certificate(&genesis_certificate.hash) + .await + .unwrap() + .expect("There should be a certificate."); + assert_eq!(genesis_certificate.hash, certficate_message.hash); + } + + #[tokio::test] + async fn get_last_certificates() { + let configuration = Configuration::new_sample(); + let mut dep_builder = DependenciesBuilder::new(configuration); + let service = dep_builder.get_http_message_service().await.unwrap(); + let beacon = Beacon::new("devnet".to_string(), 3, 1); + let fixture = MithrilFixtureBuilder::default().with_signers(3).build(); + let genesis_beacon = Beacon { + epoch: beacon.epoch - 1, + ..beacon.clone() + }; + let genesis_certificate = fixture.create_genesis_certificate(&genesis_beacon); + dep_builder + .get_certificate_repository() + .await + .unwrap() + .create_certificate(genesis_certificate.clone()) + .await + .unwrap(); + + // test + let certficate_messages = service.get_last_certificates(5).await.unwrap(); + assert_eq!(genesis_certificate.hash, certficate_messages[0].hash); + } +} diff --git a/mithril-aggregator/src/services/mod.rs b/mithril-aggregator/src/services/mod.rs index 1ad4c141c15..70f30985b05 100644 --- a/mithril-aggregator/src/services/mod.rs +++ b/mithril-aggregator/src/services/mod.rs @@ -11,12 +11,14 @@ mod certifier; mod epoch_service; +mod http_message; mod signed_entity; mod stake_distribution; mod ticker; pub use certifier::*; pub use epoch_service::*; +pub use http_message::*; pub use signed_entity::*; pub use stake_distribution::*; pub use ticker::*; From de33dc0c5cd8b9a7f68866cb397ffc3794eeaa69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT=20/=20PALO-IT?= Date: Fri, 17 Nov 2023 17:27:23 +0100 Subject: [PATCH 2/4] http message service --- .../src/database/provider/message.rs | 13 ++++---- .../src/dependency_injection/builder.rs | 1 + .../src/dependency_injection/containers.rs | 5 ++- .../http_server/routes/certificate_routes.rs | 32 +++++++------------ .../src/http_server/routes/middlewares.rs | 9 +++++- .../src/services/http_message.rs | 25 ++++++++------- 6 files changed, 45 insertions(+), 40 deletions(-) diff --git a/mithril-aggregator/src/database/provider/message.rs b/mithril-aggregator/src/database/provider/message.rs index 36d99934589..6f236a97aec 100644 --- a/mithril-aggregator/src/database/provider/message.rs +++ b/mithril-aggregator/src/database/provider/message.rs @@ -11,11 +11,13 @@ use sqlite::ConnectionWithFullMutex; use super::{CertificateRecord, CertificateRecordProvider}; +/// Repository that turn inner Record entities into Message entities. pub struct CertificateMessageRepository { connection: Arc, } impl CertificateMessageRepository { + /// Constructor pub fn new(connection: Arc) -> Self { Self { connection } } @@ -38,7 +40,7 @@ impl From for CertificateMessage { CertificateMessage { hash: value.certificate_id, - previous_hash: value.parent_certificate_id.unwrap_or_else(|| String::new()), + previous_hash: value.parent_certificate_id.unwrap_or_default(), beacon: value.beacon, metadata, protocol_message: value.protocol_message, @@ -59,15 +61,10 @@ impl From for CertificateListItemMessage { sealed_at: value.sealed_at, total_signers: value.signers.len(), }; - let (multi_signature, genesis_signature) = if value.parent_certificate_id.is_none() { - (String::new(), value.signature) - } else { - (value.signature, String::new()) - }; CertificateListItemMessage { hash: value.certificate_id, - previous_hash: value.parent_certificate_id.unwrap_or_else(|| String::new()), + previous_hash: value.parent_certificate_id.unwrap_or_default(), beacon: value.beacon, metadata, protocol_message: value.protocol_message, @@ -78,6 +75,7 @@ impl From for CertificateListItemMessage { } impl CertificateMessageRepository { + /// Return the certificate matching the given hash. pub async fn get_certificate(&self, hash: &str) -> StdResult> { let provider = CertificateRecordProvider::new(&self.connection); let mut cursor = provider.get_by_certificate_id(hash)?; @@ -85,6 +83,7 @@ impl CertificateMessageRepository { Ok(cursor.next().map(|v| v.into())) } + /// Return the last N certificates pub async fn get_last(&self, limit: usize) -> StdResult> { let provider = CertificateRecordProvider::new(&self.connection); let cursor = provider.get_all()?; diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 181215714af..9ea778a3ea2 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -1027,6 +1027,7 @@ impl DependenciesBuilder { ticker_service: self.get_ticker_service().await?, signed_entity_storer: self.get_signed_entity_storer().await?, signer_getter: self.get_signer_store().await?, + http_message_service: self.get_http_message_service().await?, }; Ok(dependency_manager) diff --git a/mithril-aggregator/src/dependency_injection/containers.rs b/mithril-aggregator/src/dependency_injection/containers.rs index a677f9c193b..6d9be1210ac 100644 --- a/mithril-aggregator/src/dependency_injection/containers.rs +++ b/mithril-aggregator/src/dependency_injection/containers.rs @@ -16,7 +16,7 @@ use mithril_common::{ BeaconProvider, }; -use crate::services::EpochService; +use crate::services::{EpochService, HttpMessageService}; use crate::{ configuration::*, database::provider::{CertificateRepository, SignedEntityStorer, SignerGetter, StakePoolStore}, @@ -132,6 +132,9 @@ pub struct DependencyContainer { /// Signer getter service pub signer_getter: Arc, + + /// HTTP message service + pub http_message_service: Arc, } #[doc(hidden)] diff --git a/mithril-aggregator/src/http_server/routes/certificate_routes.rs b/mithril-aggregator/src/http_server/routes/certificate_routes.rs index 235008d8051..3665fae99af 100644 --- a/mithril-aggregator/src/http_server/routes/certificate_routes.rs +++ b/mithril-aggregator/src/http_server/routes/certificate_routes.rs @@ -29,7 +29,7 @@ fn certificate_certificates( ) -> impl Filter + Clone { warp::path!("certificates") .and(warp::get()) - .and(middlewares::with_certifier_service(dependency_manager)) + .and(middlewares::with_http_message_service(dependency_manager)) .and_then(handlers::certificate_certificates) } @@ -39,16 +39,14 @@ fn certificate_certificate_hash( ) -> impl Filter + Clone { warp::path!("certificate" / String) .and(warp::get()) - .and(middlewares::with_certifier_service(dependency_manager)) + .and(middlewares::with_http_message_service(dependency_manager)) .and_then(handlers::certificate_certificate_hash) } mod handlers { use crate::{ - http_server::routes::reply, - message_adapters::{ToCertificateListMessageAdapter, ToCertificateMessageAdapter}, - services::CertifierService, - CertificatePendingStore, ToCertificatePendingMessageAdapter, + http_server::routes::reply, services::HttpMessageService, CertificatePendingStore, + ToCertificatePendingMessageAdapter, }; use mithril_common::messages::ToMessageAdapter; @@ -80,18 +78,15 @@ mod handlers { /// List all Certificates pub async fn certificate_certificates( - certifier_service: Arc, + http_message_service: Arc, ) -> Result { debug!("⇄ HTTP SERVER: certificate_certificates",); - match certifier_service - .get_latest_certificates(LIST_MAX_ITEMS) + match http_message_service + .get_last_certificates(LIST_MAX_ITEMS) .await { - Ok(certificates) => Ok(reply::json( - &ToCertificateListMessageAdapter::adapt(certificates), - StatusCode::OK, - )), + Ok(certificates) => Ok(reply::json(&certificates, StatusCode::OK)), Err(err) => { warn!("certificate_certificates::error"; "error" => ?err); Ok(reply::internal_server_error(err)) @@ -102,21 +97,18 @@ mod handlers { /// Certificate by certificate hash pub async fn certificate_certificate_hash( certificate_hash: String, - certifier_service: Arc, + http_message_service: Arc, ) -> Result { debug!( "⇄ HTTP SERVER: certificate_certificate_hash/{}", certificate_hash ); - match certifier_service - .get_certificate_by_hash(&certificate_hash) + match http_message_service + .get_certificate(&certificate_hash) .await { - Ok(Some(certificate)) => Ok(reply::json( - &ToCertificateMessageAdapter::adapt(certificate), - StatusCode::OK, - )), + Ok(Some(certificate)) => Ok(reply::json(&certificate, StatusCode::OK)), Ok(None) => Ok(reply::empty(StatusCode::NOT_FOUND)), Err(err) => { warn!("certificate_certificate_hash::error"; "error" => ?err); diff --git a/mithril-aggregator/src/http_server/routes/middlewares.rs b/mithril-aggregator/src/http_server/routes/middlewares.rs index 4beabe92cd7..8695d7a3c10 100644 --- a/mithril-aggregator/src/http_server/routes/middlewares.rs +++ b/mithril-aggregator/src/http_server/routes/middlewares.rs @@ -2,7 +2,7 @@ use crate::{ database::provider::SignerGetter, dependency_injection::EpochServiceWrapper, event_store::{EventMessage, TransmitterService}, - services::{CertifierService, SignedEntityService, TickerService}, + services::{CertifierService, HttpMessageService, SignedEntityService, TickerService}, CertificatePendingStore, Configuration, DependencyContainer, SignerRegisterer, VerificationKeyStorer, }; @@ -95,3 +95,10 @@ pub fn with_api_version_provider( ) -> impl Filter,), Error = Infallible> + Clone { warp::any().map(move || dependency_manager.api_version_provider.clone()) } + +/// With Message service +pub fn with_http_message_service( + dependency_manager: Arc, +) -> impl Filter,), Error = Infallible> + Clone { + warp::any().map(move || dependency_manager.http_message_service.clone()) +} diff --git a/mithril-aggregator/src/services/http_message.rs b/mithril-aggregator/src/services/http_message.rs index 1a4967ec891..497b18f8b9d 100644 --- a/mithril-aggregator/src/services/http_message.rs +++ b/mithril-aggregator/src/services/http_message.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use thiserror::Error; use mithril_common::{ - messages::{CertificateListMessage, CertificateMessage, CertificatePendingMessage}, + messages::{CertificateListMessage, CertificateMessage}, StdResult, }; @@ -21,7 +21,7 @@ pub enum HttpMessageServiceError { } /// HTTP Message service trait. #[async_trait] -pub trait HttpMessageService { +pub trait HttpMessageService: Sync + Send { /// Return the message representation of a certificate if it exists. async fn get_certificate( &self, @@ -30,16 +30,15 @@ pub trait HttpMessageService { /// Return the message representation of the last N certificates async fn get_last_certificates(&self, limit: usize) -> StdResult; - - /// Return the message representation of the current pending certificate - async fn get_pending_certificate(&self) -> StdResult; } +/// Implementation of the [HttpMessageService] pub struct MithrilHttpMessageService { certificate_message_repository: Arc, } impl MithrilHttpMessageService { + /// Constructor pub fn new(certificate_message_repository: Arc) -> Self { Self { certificate_message_repository, @@ -61,10 +60,6 @@ impl HttpMessageService for MithrilHttpMessageService { async fn get_last_certificates(&self, limit: usize) -> StdResult { self.certificate_message_repository.get_last(limit).await } - - async fn get_pending_certificate(&self) -> StdResult { - todo!() - } } #[cfg(test)] @@ -73,8 +68,6 @@ mod tests { use crate::{dependency_injection::DependenciesBuilder, Configuration}; - use super::*; - #[tokio::test] async fn get_no_certificate() { // setup @@ -130,6 +123,14 @@ mod tests { ..beacon.clone() }; let genesis_certificate = fixture.create_genesis_certificate(&genesis_beacon); + dep_builder + .get_certificate_repository() + .await + .unwrap() + .create_certificate(genesis_certificate.clone()) + .await + .unwrap(); + let genesis_certificate = fixture.create_genesis_certificate(&beacon); dep_builder .get_certificate_repository() .await @@ -140,6 +141,8 @@ mod tests { // test let certficate_messages = service.get_last_certificates(5).await.unwrap(); + + assert_eq!(2, certficate_messages.len()); assert_eq!(genesis_certificate.hash, certficate_messages[0].hash); } } From 9facfeaae01168567f201ff8e65f7dbe6e57fb36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT=20/=20PALO-IT?= Date: Tue, 21 Nov 2023 11:43:57 +0100 Subject: [PATCH 3/4] use generic certificate result --- .../src/database/provider/certificate.rs | 120 ++++++++++++++---- .../src/database/provider/message.rs | 93 -------------- .../src/database/provider/mod.rs | 2 - .../src/dependency_injection/builder.rs | 10 +- mithril-aggregator/src/services/certifier.rs | 4 +- .../src/services/http_message.rs | 14 +- .../src/tools/certificates_hash_migrator.rs | 10 +- 7 files changed, 113 insertions(+), 140 deletions(-) delete mode 100644 mithril-aggregator/src/database/provider/message.rs diff --git a/mithril-aggregator/src/database/provider/certificate.rs b/mithril-aggregator/src/database/provider/certificate.rs index 806c7e74116..535a43bbcd0 100644 --- a/mithril-aggregator/src/database/provider/certificate.rs +++ b/mithril-aggregator/src/database/provider/certificate.rs @@ -11,6 +11,10 @@ use mithril_common::{ HexEncodedAgregateVerificationKey, HexEncodedKey, ProtocolMessage, ProtocolParameters, ProtocolVersion, StakeDistributionParty, }, + messages::{ + CertificateListItemMessage, CertificateListItemMessageMetadata, CertificateMessage, + CertificateMetadataMessagePart, + }, sqlite::{ EntityCursor, HydrationError, Projection, Provider, SourceAlias, SqLiteEntity, WhereCondition, @@ -159,6 +163,57 @@ impl From for Certificate { } } +impl From for CertificateMessage { + fn from(value: CertificateRecord) -> Self { + let metadata = CertificateMetadataMessagePart { + protocol_version: value.protocol_version, + protocol_parameters: value.protocol_parameters, + initiated_at: value.initiated_at, + sealed_at: value.sealed_at, + signers: value.signers, + }; + let (multi_signature, genesis_signature) = if value.parent_certificate_id.is_none() { + (String::new(), value.signature) + } else { + (value.signature, String::new()) + }; + + CertificateMessage { + hash: value.certificate_id, + previous_hash: value.parent_certificate_id.unwrap_or_default(), + beacon: value.beacon, + metadata, + protocol_message: value.protocol_message, + signed_message: value.message, + aggregate_verification_key: value.aggregate_verification_key, + multi_signature, + genesis_signature, + } + } +} + +impl From for CertificateListItemMessage { + fn from(value: CertificateRecord) -> Self { + let metadata = CertificateListItemMessageMetadata { + protocol_version: value.protocol_version, + protocol_parameters: value.protocol_parameters, + initiated_at: value.initiated_at, + sealed_at: value.sealed_at, + total_signers: value.signers.len(), + }; + + CertificateListItemMessage { + hash: value.certificate_id, + previous_hash: value.parent_certificate_id.unwrap_or_default(), + beacon: value.beacon, + metadata, + protocol_message: value.protocol_message, + signed_message: value.message, + aggregate_verification_key: value.aggregate_verification_key, + } + } +} + impl SqLiteEntity for CertificateRecord { fn hydrate(row: sqlite::Row) -> Result where @@ -549,7 +604,10 @@ impl CertificateRepository { } /// Return the certificate corresponding to the given hash if any. - pub async fn get_certificate(&self, hash: &str) -> StdResult> { + pub async fn get_certificate(&self, hash: &str) -> StdResult> + where + T: From, + { let provider = CertificateRecordProvider::new(&self.connection); let mut cursor = provider.get_by_certificate_id(hash)?; @@ -557,7 +615,10 @@ impl CertificateRepository { } /// Return the latest certificates. - pub async fn get_latest_certificates(&self, last_n: usize) -> StdResult> { + pub async fn get_latest_certificates(&self, last_n: usize) -> StdResult> + where + T: From, + { let provider = CertificateRecordProvider::new(&self.connection); let cursor = provider.get_all()?; @@ -567,10 +628,10 @@ impl CertificateRepository { /// Return the first certificate signed per epoch as the reference /// certificate for this Epoch. This will be the parent certificate for all /// other certificates issued within this Epoch. - pub async fn get_master_certificate_for_epoch( - &self, - epoch: Epoch, - ) -> StdResult> { + pub async fn get_master_certificate_for_epoch(&self, epoch: Epoch) -> StdResult> + where + T: From, + { let provider = MasterCertificateProvider::new(&self.connection); let mut cursor = provider.find(provider.get_master_certificate_condition(epoch))?; @@ -1037,12 +1098,15 @@ protocol_message, signers, initiated_at, sealed_at) values \ } } - let repository = CertificateRepository::new(connection); - let certificate = repository.get_certificate("whatever").await.unwrap(); + let repository: CertificateRepository = CertificateRepository::new(connection); + let certificate = repository + .get_certificate::("whatever") + .await + .unwrap(); assert!(certificate.is_none()); let certificate = repository - .get_certificate(&expected_hash) + .get_certificate::(&expected_hash) .await .unwrap() .expect("The certificate exist and should be returned."); @@ -1090,9 +1154,9 @@ protocol_message, signers, initiated_at, sealed_at) values \ let certificates = vec![]; insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository - .get_master_certificate_for_epoch(Epoch(1)) + .get_master_certificate_for_epoch::(Epoch(1)) .await .unwrap(); @@ -1107,9 +1171,9 @@ protocol_message, signers, initiated_at, sealed_at) values \ let expected_certificate: Certificate = certificate.clone().into(); insert_certificate_records(connection.clone(), vec![certificate]).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository - .get_master_certificate_for_epoch(Epoch(1)) + .get_master_certificate_for_epoch::(Epoch(1)) .await .unwrap() .expect("This should return a certificate."); @@ -1130,9 +1194,9 @@ protocol_message, signers, initiated_at, sealed_at) values \ let expected_certificate: Certificate = certificates.first().unwrap().clone().into(); insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository - .get_master_certificate_for_epoch(Epoch(1)) + .get_master_certificate_for_epoch::(Epoch(1)) .await .unwrap() .expect("This should return a certificate."); @@ -1153,9 +1217,9 @@ protocol_message, signers, initiated_at, sealed_at) values \ let expected_certificate: Certificate = certificates.first().unwrap().clone().into(); insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository - .get_master_certificate_for_epoch(Epoch(2)) + .get_master_certificate_for_epoch::(Epoch(2)) .await .unwrap() .expect("This should return a certificate."); @@ -1177,9 +1241,9 @@ protocol_message, signers, initiated_at, sealed_at) values \ let expected_certificate: Certificate = certificates.last().unwrap().clone().into(); insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository - .get_master_certificate_for_epoch(Epoch(2)) + .get_master_certificate_for_epoch::(Epoch(2)) .await .unwrap() .expect("This should return a certificate."); @@ -1203,7 +1267,7 @@ protocol_message, signers, initiated_at, sealed_at) values \ let expected_certificate: Certificate = certificates.get(3).unwrap().clone().into(); insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository .get_master_certificate_for_epoch(Epoch(2)) .await @@ -1224,9 +1288,9 @@ protocol_message, signers, initiated_at, sealed_at) values \ ]; insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository - .get_master_certificate_for_epoch(Epoch(3)) + .get_master_certificate_for_epoch::(Epoch(3)) .await .unwrap(); @@ -1247,7 +1311,7 @@ protocol_message, signers, initiated_at, sealed_at) values \ let expected_certificate: Certificate = certificates.last().unwrap().clone().into(); insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository .get_master_certificate_for_epoch(Epoch(2)) .await @@ -1273,7 +1337,7 @@ protocol_message, signers, initiated_at, sealed_at) values \ let expected_certificate: Certificate = certificates.last().unwrap().clone().into(); insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository .get_master_certificate_for_epoch(Epoch(2)) .await @@ -1297,7 +1361,7 @@ protocol_message, signers, initiated_at, sealed_at) values \ let expected_certificate: Certificate = certificates.last().unwrap().clone().into(); insert_certificate_records(connection.clone(), certificates).await; - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository .get_master_certificate_for_epoch(Epoch(2)) .await @@ -1322,9 +1386,9 @@ protocol_message, signers, initiated_at, sealed_at) values \ } } - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository - .get_master_certificate_for_epoch(*epoch) + .get_master_certificate_for_epoch::(*epoch) .await .unwrap() .expect("This should return a certificate."); @@ -1337,7 +1401,7 @@ protocol_message, signers, initiated_at, sealed_at) values \ let (certificates, _) = setup_certificate_chain(5, 3); let mut deps = DependenciesBuilder::new(Configuration::new_sample()); let connection = deps.get_sqlite_connection().await.unwrap(); - let repository = CertificateRepository::new(connection); + let repository: CertificateRepository = CertificateRepository::new(connection); let certificate = repository .create_certificate(certificates[4].clone()) .await diff --git a/mithril-aggregator/src/database/provider/message.rs b/mithril-aggregator/src/database/provider/message.rs deleted file mode 100644 index 6f236a97aec..00000000000 --- a/mithril-aggregator/src/database/provider/message.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::sync::Arc; - -use mithril_common::{ - messages::{ - CertificateListItemMessage, CertificateListItemMessageMetadata, CertificateMessage, - CertificateMetadataMessagePart, - }, - StdResult, -}; -use sqlite::ConnectionWithFullMutex; - -use super::{CertificateRecord, CertificateRecordProvider}; - -/// Repository that turn inner Record entities into Message entities. -pub struct CertificateMessageRepository { - connection: Arc, -} - -impl CertificateMessageRepository { - /// Constructor - pub fn new(connection: Arc) -> Self { - Self { connection } - } -} - -impl From for CertificateMessage { - fn from(value: CertificateRecord) -> Self { - let metadata = CertificateMetadataMessagePart { - protocol_version: value.protocol_version, - protocol_parameters: value.protocol_parameters, - initiated_at: value.initiated_at, - sealed_at: value.sealed_at, - signers: value.signers, - }; - let (multi_signature, genesis_signature) = if value.parent_certificate_id.is_none() { - (String::new(), value.signature) - } else { - (value.signature, String::new()) - }; - - CertificateMessage { - hash: value.certificate_id, - previous_hash: value.parent_certificate_id.unwrap_or_default(), - beacon: value.beacon, - metadata, - protocol_message: value.protocol_message, - signed_message: value.message, - aggregate_verification_key: value.aggregate_verification_key, - multi_signature, - genesis_signature, - } - } -} - -impl From for CertificateListItemMessage { - fn from(value: CertificateRecord) -> Self { - let metadata = CertificateListItemMessageMetadata { - protocol_version: value.protocol_version, - protocol_parameters: value.protocol_parameters, - initiated_at: value.initiated_at, - sealed_at: value.sealed_at, - total_signers: value.signers.len(), - }; - - CertificateListItemMessage { - hash: value.certificate_id, - previous_hash: value.parent_certificate_id.unwrap_or_default(), - beacon: value.beacon, - metadata, - protocol_message: value.protocol_message, - signed_message: value.message, - aggregate_verification_key: value.aggregate_verification_key, - } - } -} - -impl CertificateMessageRepository { - /// Return the certificate matching the given hash. - pub async fn get_certificate(&self, hash: &str) -> StdResult> { - let provider = CertificateRecordProvider::new(&self.connection); - let mut cursor = provider.get_by_certificate_id(hash)?; - - Ok(cursor.next().map(|v| v.into())) - } - - /// Return the last N certificates - pub async fn get_last(&self, limit: usize) -> StdResult> { - let provider = CertificateRecordProvider::new(&self.connection); - let cursor = provider.get_all()?; - - Ok(cursor.into_iter().take(limit).map(|v| v.into()).collect()) - } -} diff --git a/mithril-aggregator/src/database/provider/mod.rs b/mithril-aggregator/src/database/provider/mod.rs index 6f24163cf7e..271a37f2634 100644 --- a/mithril-aggregator/src/database/provider/mod.rs +++ b/mithril-aggregator/src/database/provider/mod.rs @@ -1,7 +1,6 @@ //! Aggregator related database providers mod certificate; mod epoch_setting; -mod message; mod open_message; mod signed_entity; mod signer; @@ -13,7 +12,6 @@ mod test_helper; pub use certificate::*; pub use epoch_setting::*; -pub use message::*; pub use open_message::*; pub use signed_entity::*; pub use signer::*; diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 9ea778a3ea2..b8c5f21b702 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -45,9 +45,9 @@ use crate::{ }, configuration::ExecutionEnvironment, database::provider::{ - CertificateMessageRepository, CertificateRepository, EpochSettingStore, - OpenMessageRepository, SignedEntityStoreAdapter, SignedEntityStorer, - SignerRegistrationStore, SignerStore, SingleSignatureRepository, StakePoolStore, + CertificateRepository, EpochSettingStore, OpenMessageRepository, SignedEntityStoreAdapter, + SignedEntityStorer, SignerRegistrationStore, SignerStore, SingleSignatureRepository, + StakePoolStore, }, event_store::{EventMessage, EventStore, TransmitterService}, http_server::routes::router, @@ -1215,10 +1215,10 @@ impl DependenciesBuilder { /// build HTTP message service pub async fn build_http_message_service(&mut self) -> Result> { - let certificate_message_repository = Arc::new(CertificateMessageRepository::new( + let certificate_repository = Arc::new(CertificateRepository::new( self.get_sqlite_connection().await?, )); - let service = MithrilHttpMessageService::new(certificate_message_repository); + let service = MithrilHttpMessageService::new(certificate_repository); Ok(Arc::new(service)) } diff --git a/mithril-aggregator/src/services/certifier.rs b/mithril-aggregator/src/services/certifier.rs index 5cc4cd4d8a8..4df7d09c9e9 100644 --- a/mithril-aggregator/src/services/certifier.rs +++ b/mithril-aggregator/src/services/certifier.rs @@ -362,7 +362,7 @@ impl CertifierService for MithrilCertifierService { ); let parent_certificate_hash = self .certificate_repository - .get_master_certificate_for_epoch(open_message.epoch) + .get_master_certificate_for_epoch::(open_message.epoch) .await .with_context(|| { format!( @@ -425,7 +425,7 @@ impl CertifierService for MithrilCertifierService { async fn verify_certificate_chain(&self, epoch: Epoch) -> StdResult<()> { if let Some(certificate) = self .certificate_repository - .get_latest_certificates(1) + .get_latest_certificates::(1) .await? .first() { diff --git a/mithril-aggregator/src/services/http_message.rs b/mithril-aggregator/src/services/http_message.rs index 497b18f8b9d..806815a7eb1 100644 --- a/mithril-aggregator/src/services/http_message.rs +++ b/mithril-aggregator/src/services/http_message.rs @@ -10,7 +10,7 @@ use mithril_common::{ StdResult, }; -use crate::database::provider::CertificateMessageRepository; +use crate::database::provider::CertificateRepository; /// Error related to the [HttpMessageService] #[derive(Debug, Error)] @@ -34,14 +34,14 @@ pub trait HttpMessageService: Sync + Send { /// Implementation of the [HttpMessageService] pub struct MithrilHttpMessageService { - certificate_message_repository: Arc, + certificate_repository: Arc, } impl MithrilHttpMessageService { /// Constructor - pub fn new(certificate_message_repository: Arc) -> Self { + pub fn new(certificate_repository: Arc) -> Self { Self { - certificate_message_repository, + certificate_repository, } } } @@ -52,13 +52,15 @@ impl HttpMessageService for MithrilHttpMessageService { &self, certificate_hash: &str, ) -> StdResult> { - self.certificate_message_repository + self.certificate_repository .get_certificate(certificate_hash) .await } async fn get_last_certificates(&self, limit: usize) -> StdResult { - self.certificate_message_repository.get_last(limit).await + self.certificate_repository + .get_latest_certificates(limit) + .await } } diff --git a/mithril-aggregator/src/tools/certificates_hash_migrator.rs b/mithril-aggregator/src/tools/certificates_hash_migrator.rs index 21b6439a186..ac1027a84f0 100644 --- a/mithril-aggregator/src/tools/certificates_hash_migrator.rs +++ b/mithril-aggregator/src/tools/certificates_hash_migrator.rs @@ -46,7 +46,7 @@ impl CertificatesHashMigrator { let old_certificates = self .certificate_repository // arbitrary high value to get all existing certificates - .get_latest_certificates(usize::MAX) + .get_latest_certificates::(usize::MAX) .await?; let mut certificates_to_remove = vec![]; @@ -253,7 +253,8 @@ mod test { connection: Arc, certificates_and_signed_entity: &[(Certificate, Option)], ) -> StdResult)>> { - let certificate_repository = CertificateRepository::new(connection.clone()); + let certificate_repository: CertificateRepository = + CertificateRepository::new(connection.clone()); let signed_entity_store = SignedEntityStoreAdapter::new(connection.clone()); let mut result = vec![]; @@ -416,11 +417,12 @@ mod test { connection: Arc, ) -> StdResult)>> { let mut result = vec![]; - let certificate_repository = CertificateRepository::new(connection.clone()); + let certificate_repository: CertificateRepository = + CertificateRepository::new(connection.clone()); let signed_entity_store = SignedEntityStoreAdapter::new(connection.clone()); let certificates = certificate_repository - .get_latest_certificates(usize::MAX) + .get_latest_certificates::(usize::MAX) .await?; for certificate in certificates { From 9a2020544f9b7a07495f17ee7ed15e18fa87700a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20HUBERT=20/=20PALO-IT?= Date: Tue, 21 Nov 2023 14:57:21 +0100 Subject: [PATCH 4/4] update version --- Cargo.lock | 2 +- mithril-aggregator/Cargo.toml | 2 +- mithril-aggregator/src/services/http_message.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b888b9350b..b1383a53315 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3169,7 +3169,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.4.11" +version = "0.4.12" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index 2177c8fa524..977483268da 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.4.11" +version = "0.4.12" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/services/http_message.rs b/mithril-aggregator/src/services/http_message.rs index 806815a7eb1..4f1546cc935 100644 --- a/mithril-aggregator/src/services/http_message.rs +++ b/mithril-aggregator/src/services/http_message.rs @@ -1,4 +1,4 @@ -//! This service is responsible of provinding HTTP server with messages as fast as possible. +//! This service is responsible of providing HTTP server with messages as fast as possible. use std::sync::Arc;