Skip to content

Commit

Permalink
Merge pull request input-output-hk#1353 from input-output-hk/greg/132…
Browse files Browse the repository at this point in the history
…7/message_service

HTTP message service
  • Loading branch information
ghubertpalo authored Nov 21, 2023
2 parents b17533f + 9a20205 commit d02447f
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.4.11"
version = "0.4.12"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
120 changes: 92 additions & 28 deletions mithril-aggregator/src/database/provider/certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ use mithril_common::{
HexEncodedAgregateVerificationKey, HexEncodedKey, ProtocolMessage, ProtocolParameters,
ProtocolVersion, StakeDistributionParty,
},
messages::{
CertificateListItemMessage, CertificateListItemMessageMetadata, CertificateMessage,
CertificateMetadataMessagePart,
},
sqlite::{
EntityCursor, HydrationError, Projection, Provider, SourceAlias, SqLiteEntity,
WhereCondition,
Expand Down Expand Up @@ -159,6 +163,57 @@ impl From<CertificateRecord> for Certificate {
}
}

impl From<CertificateRecord> for CertificateMessage {
fn from(value: CertificateRecord) -> Self {
let metadata = CertificateMetadataMessagePart {
protocol_version: value.protocol_version,
protocol_parameters: value.protocol_parameters,
initiated_at: value.initiated_at,
sealed_at: value.sealed_at,
signers: value.signers,
};
let (multi_signature, genesis_signature) = if value.parent_certificate_id.is_none() {
(String::new(), value.signature)
} else {
(value.signature, String::new())
};

CertificateMessage {
hash: value.certificate_id,
previous_hash: value.parent_certificate_id.unwrap_or_default(),
beacon: value.beacon,
metadata,
protocol_message: value.protocol_message,
signed_message: value.message,
aggregate_verification_key: value.aggregate_verification_key,
multi_signature,
genesis_signature,
}
}
}

impl From<CertificateRecord> for CertificateListItemMessage {
fn from(value: CertificateRecord) -> Self {
let metadata = CertificateListItemMessageMetadata {
protocol_version: value.protocol_version,
protocol_parameters: value.protocol_parameters,
initiated_at: value.initiated_at,
sealed_at: value.sealed_at,
total_signers: value.signers.len(),
};

CertificateListItemMessage {
hash: value.certificate_id,
previous_hash: value.parent_certificate_id.unwrap_or_default(),
beacon: value.beacon,
metadata,
protocol_message: value.protocol_message,
signed_message: value.message,
aggregate_verification_key: value.aggregate_verification_key,
}
}
}

impl SqLiteEntity for CertificateRecord {
fn hydrate(row: sqlite::Row) -> Result<Self, HydrationError>
where
Expand Down Expand Up @@ -549,15 +604,21 @@ impl CertificateRepository {
}

/// Return the certificate corresponding to the given hash if any.
pub async fn get_certificate(&self, hash: &str) -> StdResult<Option<Certificate>> {
pub async fn get_certificate<T>(&self, hash: &str) -> StdResult<Option<T>>
where
T: From<CertificateRecord>,
{
let provider = CertificateRecordProvider::new(&self.connection);
let mut cursor = provider.get_by_certificate_id(hash)?;

Ok(cursor.next().map(|v| v.into()))
}

/// Return the latest certificates.
pub async fn get_latest_certificates(&self, last_n: usize) -> StdResult<Vec<Certificate>> {
pub async fn get_latest_certificates<T>(&self, last_n: usize) -> StdResult<Vec<T>>
where
T: From<CertificateRecord>,
{
let provider = CertificateRecordProvider::new(&self.connection);
let cursor = provider.get_all()?;

Expand All @@ -567,10 +628,10 @@ impl CertificateRepository {
/// Return the first certificate signed per epoch as the reference
/// certificate for this Epoch. This will be the parent certificate for all
/// other certificates issued within this Epoch.
pub async fn get_master_certificate_for_epoch(
&self,
epoch: Epoch,
) -> StdResult<Option<Certificate>> {
pub async fn get_master_certificate_for_epoch<T>(&self, epoch: Epoch) -> StdResult<Option<T>>
where
T: From<CertificateRecord>,
{
let provider = MasterCertificateProvider::new(&self.connection);
let mut cursor = provider.find(provider.get_master_certificate_condition(epoch))?;

Expand Down Expand Up @@ -1037,12 +1098,15 @@ protocol_message, signers, initiated_at, sealed_at) values \
}
}

let repository = CertificateRepository::new(connection);
let certificate = repository.get_certificate("whatever").await.unwrap();
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_certificate::<Certificate>("whatever")
.await
.unwrap();
assert!(certificate.is_none());

let certificate = repository
.get_certificate(&expected_hash)
.get_certificate::<Certificate>(&expected_hash)
.await
.unwrap()
.expect("The certificate exist and should be returned.");
Expand Down Expand Up @@ -1090,9 +1154,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
let certificates = vec![];
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(1))
.get_master_certificate_for_epoch::<Certificate>(Epoch(1))
.await
.unwrap();

Expand All @@ -1107,9 +1171,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
let expected_certificate: Certificate = certificate.clone().into();
insert_certificate_records(connection.clone(), vec![certificate]).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(1))
.get_master_certificate_for_epoch::<Certificate>(Epoch(1))
.await
.unwrap()
.expect("This should return a certificate.");
Expand All @@ -1130,9 +1194,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
let expected_certificate: Certificate = certificates.first().unwrap().clone().into();
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(1))
.get_master_certificate_for_epoch::<Certificate>(Epoch(1))
.await
.unwrap()
.expect("This should return a certificate.");
Expand All @@ -1153,9 +1217,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
let expected_certificate: Certificate = certificates.first().unwrap().clone().into();
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(2))
.get_master_certificate_for_epoch::<Certificate>(Epoch(2))
.await
.unwrap()
.expect("This should return a certificate.");
Expand All @@ -1177,9 +1241,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
let expected_certificate: Certificate = certificates.last().unwrap().clone().into();
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(2))
.get_master_certificate_for_epoch::<Certificate>(Epoch(2))
.await
.unwrap()
.expect("This should return a certificate.");
Expand All @@ -1203,7 +1267,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
let expected_certificate: Certificate = certificates.get(3).unwrap().clone().into();
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(2))
.await
Expand All @@ -1224,9 +1288,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
];
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(3))
.get_master_certificate_for_epoch::<Certificate>(Epoch(3))
.await
.unwrap();

Expand All @@ -1247,7 +1311,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
let expected_certificate: Certificate = certificates.last().unwrap().clone().into();
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(2))
.await
Expand All @@ -1273,7 +1337,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
let expected_certificate: Certificate = certificates.last().unwrap().clone().into();
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(2))
.await
Expand All @@ -1297,7 +1361,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
let expected_certificate: Certificate = certificates.last().unwrap().clone().into();
insert_certificate_records(connection.clone(), certificates).await;

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(Epoch(2))
.await
Expand All @@ -1322,9 +1386,9 @@ protocol_message, signers, initiated_at, sealed_at) values \
}
}

let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.get_master_certificate_for_epoch(*epoch)
.get_master_certificate_for_epoch::<Certificate>(*epoch)
.await
.unwrap()
.expect("This should return a certificate.");
Expand All @@ -1337,7 +1401,7 @@ protocol_message, signers, initiated_at, sealed_at) values \
let (certificates, _) = setup_certificate_chain(5, 3);
let mut deps = DependenciesBuilder::new(Configuration::new_sample());
let connection = deps.get_sqlite_connection().await.unwrap();
let repository = CertificateRepository::new(connection);
let repository: CertificateRepository = CertificateRepository::new(connection);
let certificate = repository
.create_certificate(certificates[4].clone())
.await
Expand Down
32 changes: 28 additions & 4 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ use crate::{
event_store::{EventMessage, EventStore, TransmitterService},
http_server::routes::router,
services::{
CertifierService, MithrilCertifierService, MithrilEpochService, MithrilSignedEntityService,
MithrilStakeDistributionService, MithrilTickerService, SignedEntityService,
StakeDistributionService, TickerService,
CertifierService, HttpMessageService, MithrilCertifierService, MithrilEpochService,
MithrilHttpMessageService, MithrilSignedEntityService, MithrilStakeDistributionService,
MithrilTickerService, SignedEntityService, StakeDistributionService, TickerService,
},
tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter},
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore,
Expand Down Expand Up @@ -185,6 +185,9 @@ pub struct DependenciesBuilder {

/// Signed Entity storer
pub signed_entity_storer: Option<Arc<dyn SignedEntityStorer>>,

/// HTTP Message service
pub http_message_service: Option<Arc<dyn HttpMessageService>>,
}

impl DependenciesBuilder {
Expand Down Expand Up @@ -225,6 +228,7 @@ impl DependenciesBuilder {
certifier_service: None,
epoch_service: None,
signed_entity_storer: None,
http_message_service: None,
}
}

Expand Down Expand Up @@ -1023,6 +1027,7 @@ impl DependenciesBuilder {
ticker_service: self.get_ticker_service().await?,
signed_entity_storer: self.get_signed_entity_storer().await?,
signer_getter: self.get_signer_store().await?,
http_message_service: self.get_http_message_service().await?,
};

Ok(dependency_manager)
Expand Down Expand Up @@ -1208,7 +1213,26 @@ impl DependenciesBuilder {
Ok(self.certifier_service.as_ref().cloned().unwrap())
}

/// Remove the dependencies builder from memory to release Arc.
/// build HTTP message service
pub async fn build_http_message_service(&mut self) -> Result<Arc<dyn HttpMessageService>> {
let certificate_repository = Arc::new(CertificateRepository::new(
self.get_sqlite_connection().await?,
));
let service = MithrilHttpMessageService::new(certificate_repository);

Ok(Arc::new(service))
}

/// [HttpMessageService] service
pub async fn get_http_message_service(&mut self) -> Result<Arc<dyn HttpMessageService>> {
if self.http_message_service.is_none() {
self.http_message_service = Some(self.build_http_message_service().await?);
}

Ok(self.http_message_service.as_ref().cloned().unwrap())
}

/// Remove the dependencies builder from memory to release Arc instances.
pub async fn vanish(self) {
self.drop_sqlite_connection().await;
}
Expand Down
5 changes: 4 additions & 1 deletion mithril-aggregator/src/dependency_injection/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use mithril_common::{
BeaconProvider,
};

use crate::services::EpochService;
use crate::services::{EpochService, HttpMessageService};
use crate::{
configuration::*,
database::provider::{CertificateRepository, SignedEntityStorer, SignerGetter, StakePoolStore},
Expand Down Expand Up @@ -132,6 +132,9 @@ pub struct DependencyContainer {

/// Signer getter service
pub signer_getter: Arc<dyn SignerGetter>,

/// HTTP message service
pub http_message_service: Arc<dyn HttpMessageService>,
}

#[doc(hidden)]
Expand Down
Loading

0 comments on commit d02447f

Please sign in to comment.