Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
1

3

Alter domain-operator - cargo.toml

add block requests

add block requests

Disable light-state error for sync

Cargo.lock: add block requests

add block requests

Archiver encode-decode hack

Remove empty bundle check.

Networking tweak

Tweak segment header downloader

Modify snap-sync

Update synchronizer

More synchronizer

Before getting domain block

got domain block

domain state imported

block impoted with “invalid block number”

Add LigthState sync mode

domain block import sync

Import blocks before regular sync!!

- more block types
- more aux writes

fixed wait_for_block_import

2
  • Loading branch information
shamil-gadelshin committed Sep 3, 2024
1 parent a41f16f commit ac089fe
Show file tree
Hide file tree
Showing 20 changed files with 823 additions and 106 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,14 @@ fn main() -> Result<(), Error> {

let keystore = partial_components.keystore_container.keystore();

let consensus_chain_node = subspace_service::new_full::<PosTable, _>(
let consensus_chain_node =
subspace_service::new_full::<PosTable, _>(
consensus_chain_config,
partial_components,
None,
true,
SlotProportion::new(3f32 / 4f32),
None,
)
.await
.map_err(|error| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl DomainInstanceStarter {
evm_domain_runtime::RuntimeApi,
AccountId20,
_,
>(domain_params)
>(domain_params, None, Box::new(()))
.await?;

let malicious_bundle_producer = MaliciousBundleProducer::new(
Expand Down Expand Up @@ -231,7 +231,7 @@ impl DomainInstanceStarter {
auto_id_domain_runtime::RuntimeApi,
AccountId32,
_,
>(domain_params)
>(domain_params, None, Box::new(()))
.await?;

let malicious_bundle_producer = MaliciousBundleProducer::new(
Expand Down
92 changes: 62 additions & 30 deletions crates/subspace-node/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ use domain_runtime_primitives::opaque::Block as DomainBlock;
use futures::FutureExt;
use sc_cli::Signals;
use sc_consensus_slots::SlotProportion;
use sc_service::{BlocksPruning, PruningMode};
use sc_service::PruningMode;
use sc_storage_monitor::StorageMonitorService;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sc_utils::mpsc::tracing_unbounded;
use sp_core::traits::SpawnEssentialNamed;
use sp_messenger::messages::ChainId;
use std::env;
use std::sync::Arc;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_runtime::{Block, RuntimeApi};
use subspace_service::config::ChainSyncMode;
use subspace_service::domains::LastDomainBlockInfoReceiver;
use subspace_service::sync_from_dsn::synchronizer::Synchronizer;
use tracing::{debug, error, info, info_span, warn};

/// Options for running a node
Expand Down Expand Up @@ -93,6 +96,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
} = create_consensus_chain_configuration(consensus, enable_color, domain_options.is_some())?;

let maybe_domain_configuration = domain_options
.clone()
.map(|domain_options| {
create_domain_configuration(&subspace_configuration, dev, domain_options, enable_color)
})
Expand All @@ -102,6 +106,17 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {

let base_path = subspace_configuration.base_path.path().to_path_buf();

// TODO
let synchronizer = if let Some(ref domain_opt) = domain_options {
if domain_opt.domain_sync {
Some(Arc::new(Synchronizer::new()))
} else {
None
}
} else {
None
};

info!("Subspace");
info!("✌️ version {}", env!("SUBSTRATE_CLI_IMPL_VERSION"));
info!("❤️ by {}", env!("CARGO_PKG_AUTHORS"));
Expand All @@ -112,27 +127,27 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
info!("🏷 Node name: {}", subspace_configuration.network.node_name);
info!("💾 Node path: {}", base_path.display());

if maybe_domain_configuration.is_some() && subspace_configuration.sync == ChainSyncMode::Snap {
return Err(Error::Other(
"Snap sync mode is not supported for domains, use full sync".to_string(),
));
}

if maybe_domain_configuration.is_some()
&& (matches!(
subspace_configuration.blocks_pruning,
BlocksPruning::Some(_)
) || matches!(
subspace_configuration.state_pruning,
Some(PruningMode::Constrained(_))
))
{
return Err(Error::Other(
"Running an operator requires both `--blocks-pruning` and `--state-pruning` to be set \
to either `archive` or `archive-canonical`"
.to_string(),
));
}
// if maybe_domain_configuration.is_some() && subspace_configuration.sync == ChainSyncMode::Snap {
// return Err(Error::Other(
// "Snap sync mode is not supported for domains".to_string(),
// ));
// }
//
// if maybe_domain_configuration.is_some()
// && (matches!(
// subspace_configuration.blocks_pruning,
// BlocksPruning::Some(_)
// ) || matches!(
// subspace_configuration.state_pruning,
// Some(PruningMode::Constrained(_))
// ))
// {
// return Err(Error::Other(
// "Running an operator requires both `--blocks-pruning` and `--state-pruning` to be set \
// to either `archive` or `archive-canonical`"
// .to_string(),
// ));
// }

let mut task_manager = {
let consensus_chain_node = {
Expand Down Expand Up @@ -184,6 +199,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
}),
true,
SlotProportion::new(3f32 / 4f32),
synchronizer.clone(),
);

full_node_fut.await.map_err(|error| {
Expand Down Expand Up @@ -291,25 +307,27 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
};

let domain_start_options = DomainStartOptions {
consensus_client: consensus_chain_node.client,
consensus_client: consensus_chain_node.client.clone(),
consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(
consensus_chain_node.transaction_pool,
),
consensus_network: consensus_chain_node.network_service,
consensus_network: consensus_chain_node.network_service.clone(),
block_importing_notification_stream: consensus_chain_node
.block_importing_notification_stream,
pot_slot_info_stream: consensus_chain_node.pot_slot_info_stream,
consensus_network_sync_oracle: consensus_chain_node.sync_service,
consensus_network_sync_oracle: consensus_chain_node.sync_service.clone(),
domain_message_receiver,
gossip_message_sink,
};

consensus_chain_node
.task_manager
.spawn_essential_handle()
.spawn_essential_blocking(
"domain",
Some("domains"),
.spawn_essential_blocking("domain", Some("domains"), {
let consensus_chain_client = consensus_chain_node.client.clone();
let consensus_chain_network_service =
consensus_chain_node.network_service.clone();
let consensus_chain_sync_service = consensus_chain_node.sync_service.clone();
Box::pin(async move {
let span = info_span!("Domain");
let _enter = span.enter();
Expand All @@ -318,6 +336,8 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
&*domain_start_options.consensus_client,
domain_configuration.domain_id,
);

println!("before bootstrap_result_fut");
let bootstrap_result = match bootstrap_result_fut.await {
Ok(bootstrap_result) => bootstrap_result,
Err(error) => {
Expand All @@ -326,17 +346,29 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
}
};

println!("before run_domain");

let receipt_provider = LastDomainBlockInfoReceiver::new(
domain_configuration.domain_id,
None,
consensus_chain_client,
consensus_chain_network_service,
consensus_chain_sync_service,
);

let start_domain = run_domain(
bootstrap_result,
domain_configuration,
domain_start_options,
synchronizer,
Box::new(receipt_provider),
);

if let Err(error) = start_domain.await {
error!(%error, "Domain starter exited with an error");
}
}),
);
})
});
};

consensus_chain_node.network_starter.start_network();
Expand Down
58 changes: 34 additions & 24 deletions crates/subspace-node/src/commands/run/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,13 @@ pub(super) struct DomainOptions {
/// Additional args for domain.
#[clap(raw = true)]
additional_args: Vec<String>,

// TODO: remove
#[arg(long, default_value_t = false)]
pub domain_sync: bool, // TODO
}

#[derive(Debug)]
pub(super) struct DomainConfiguration {
pub(super) domain_config: Configuration,
pub(super) domain_id: DomainId,
Expand All @@ -157,6 +162,7 @@ pub(super) fn create_domain_configuration(
keystore_options,
pool_config,
additional_args,
..
} = domain_options;

let domain_id;
Expand Down Expand Up @@ -389,6 +395,8 @@ pub(super) async fn run_domain(
bootstrap_result: BootstrapResult<CBlock>,
domain_configuration: DomainConfiguration,
domain_start_options: DomainStartOptions,
synchronizer: Option<Arc<Synchronizer>>,
execution_receipt_provider: Box<dyn LastDomainBlockReceiptProvider<DomainBlock, CBlock>>,
) -> Result<(), Error> {
let BootstrapResult {
domain_instance_data,
Expand Down Expand Up @@ -496,18 +504,19 @@ pub(super) async fn run_domain(
confirmation_depth_k: chain_constants.confirmation_depth_k(),
};

let mut domain_node = domain_service::new_full::<
_,
_,
_,
_,
_,
_,
evm_domain_runtime::RuntimeApi,
AccountId20,
_,
>(domain_params)
.await?;
let mut domain_node =
domain_service::new_full::<
_,
_,
_,
_,
_,
_,
evm_domain_runtime::RuntimeApi,
AccountId20,
_,
>(domain_params, synchronizer, execution_receipt_provider)
.await?;

domain_node.network_starter.start_network();

Expand All @@ -534,18 +543,19 @@ pub(super) async fn run_domain(
confirmation_depth_k: chain_constants.confirmation_depth_k(),
};

let mut domain_node = domain_service::new_full::<
_,
_,
_,
_,
_,
_,
auto_id_domain_runtime::RuntimeApi,
AccountId32,
_,
>(domain_params)
.await?;
let mut domain_node =
domain_service::new_full::<
_,
_,
_,
_,
_,
_,
auto_id_domain_runtime::RuntimeApi,
AccountId32,
_,
>(domain_params, synchronizer, execution_receipt_provider)
.await?;

domain_node.network_starter.start_network();

Expand Down
10 changes: 9 additions & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)]

pub mod config;
pub(crate) mod domains;
pub mod domains; // TODO: pub(crate)
pub mod dsn;
mod metrics;
pub(crate) mod mmr;
Expand All @@ -43,6 +43,7 @@ use crate::metrics::NodeMetrics;
use crate::mmr::request_handler::MmrRequestHandler;
use crate::sync_from_dsn::piece_validator::SegmentCommitmentPieceValidator;
use crate::sync_from_dsn::snap_sync::snap_sync;
use crate::sync_from_dsn::synchronizer::Synchronizer;
use crate::transaction_pool::FullPool;
use core::sync::atomic::{AtomicU32, Ordering};
use cross_domain_message_gossip::xdm_gossip_peers_set_config;
Expand Down Expand Up @@ -727,6 +728,7 @@ pub async fn new_full<PosTable, RuntimeApi>(
prometheus_registry: Option<&mut Registry>,
enable_rpc_extensions: bool,
block_proposal_slot_portion: SlotProportion,
synchronizer: Option<Arc<Synchronizer>>,
) -> Result<FullNode<RuntimeApi>, Error>
where
PosTable: Table,
Expand Down Expand Up @@ -1024,6 +1026,7 @@ where
Arc::clone(&network_service),
sync_service.clone(),
subspace_link.erasure_coding().clone(),
synchronizer.clone(),
);

let (observer, worker) = sync_from_dsn::create_observer_and_worker(
Expand Down Expand Up @@ -1052,6 +1055,11 @@ where
snap_sync_task.await;
}

if let Some(synchronizer) = synchronizer {
println!("Waiting for resuming consensus sync");
synchronizer.resuming_consensus_sync_allowed().await;
}

if let Err(error) = worker.await {
error!(%error, "Sync from DSN exited with an error");
}
Expand Down
Loading

0 comments on commit ac089fe

Please sign in to comment.