Skip to content

Commit

Permalink
Merge #2249
Browse files Browse the repository at this point in the history
2249: Speculative execution support r=damip a=damip

# Intro

This is a rewrite of the execution system following the spec we already agreed upon, as well as good practices and with the big refactoring constraints in mind.

# Goals

* [x] enable speculative execution
* [x] get ready for ledger unification

# Practices

* [x] no async because it's not needed
* [x] short functions (max 50 lines of code)
* [x] no panics, unless described
* crates split between worker and exports
  * [x] execution
  * [x] ledger: it will be refactored for on-disk storage, we will split it then => #2342
* thorough documentation
  * [x] function docs
  * [x] algorithm description
  * [x] file-level documentation
  * [x] crate-level documentation
* [x] test exports
* [x] unit and functional tests: to be added in a followup #2296
* [x] use genericity whenever possible
* [x] clippy lints

# Checklist

* [x] implement speculative execution
* [x] split execution into worker/exports crates
* [x] create massa-ledger crate
* [x] integrate execution and ledger into the existing program
* [x] repair bootstrap tests
* [x] repair consensus tests
* [x] improve documentation
* [x] try on labnet
* [x] reactivate execution tests => will be done in a followup #2296
* [x] add specific tests => will be done in the followup #2296


Co-authored-by: Damir Vodenicarevic <[email protected]>
Co-authored-by: damip <[email protected]>
  • Loading branch information
bors[bot] and damip authored Mar 1, 2022
2 parents 00464b8 + 68c6eb2 commit 5c8f019
Show file tree
Hide file tree
Showing 82 changed files with 5,048 additions and 3,354 deletions.
302 changes: 219 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ members = [
"massa-client",
"massa-consensus-exports",
"massa-consensus-worker",
"massa-execution",
"massa-execution-exports",
"massa-execution-worker",
"massa-graph",
"massa-hash",
"massa-logging",
Expand All @@ -19,6 +20,7 @@ members = [
"massa-signature",
"massa-time",
"massa-wallet",
"massa-ledger"
]
resolver = "2"

Expand Down
4 changes: 2 additions & 2 deletions massa-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ tokio = { version = "1.15", features = ["full"] }
tracing = "0.1"
# custom modules
massa_consensus_exports = { path = "../massa-consensus-exports" }
massa_execution = { path = "../massa-execution" }
massa_execution_exports = { path = "../massa-execution-exports" }
massa_graph = { path = "../massa-graph" }
massa_hash = { path = "../massa-hash" }
massa_models = { path = "../massa-models" }
Expand All @@ -27,4 +27,4 @@ massa_signature = { path = "../massa-signature" }
massa_time = { path = "../massa-time" }

[features]
instrument = ["tokio/tracing", "massa_consensus_exports/instrument", "massa_execution/instrument", "massa_graph/instrument", "massa_models/instrument", "massa_network/instrument", "massa_pool/instrument", "massa_protocol_exports/instrument", "massa_time/instrument"]
instrument = ["tokio/tracing", "massa_consensus_exports/instrument", "massa_graph/instrument", "massa_models/instrument", "massa_network/instrument", "massa_pool/instrument", "massa_protocol_exports/instrument", "massa_time/instrument"]
2 changes: 1 addition & 1 deletion massa-api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use displaydoc::Display;
use massa_consensus_exports::error::ConsensusError;
use massa_execution::ExecutionError;
use massa_execution_exports::ExecutionError;
use massa_hash::MassaHashError;
use massa_models::ModelsError;
use massa_network::NetworkError;
Expand Down
10 changes: 5 additions & 5 deletions massa-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use jsonrpc_core::{BoxFuture, IoHandler, Value};
use jsonrpc_derive::rpc;
use jsonrpc_http_server::{CloseHandle, ServerBuilder};
use massa_consensus_exports::{ConsensusCommandSender, ConsensusConfig};
use massa_execution::ExecutionCommandSender;
use massa_execution_exports::ExecutionController;
use massa_models::api::{
APISettings, AddressInfo, BlockInfo, BlockSummary, EndorsementInfo, EventFilter, NodeStatus,
OperationInfo, ReadOnlyExecution, TimeInterval,
Expand Down Expand Up @@ -36,7 +36,7 @@ mod public;

pub struct Public {
pub consensus_command_sender: ConsensusCommandSender,
pub execution_command_sender: ExecutionCommandSender,
pub execution_controller: Box<dyn ExecutionController>,
pub pool_command_sender: PoolCommandSender,
pub consensus_config: ConsensusConfig,
pub api_settings: &'static APISettings,
Expand All @@ -50,7 +50,7 @@ pub struct Public {
pub struct Private {
pub consensus_command_sender: ConsensusCommandSender,
pub network_command_sender: NetworkCommandSender,
execution_command_sender: ExecutionCommandSender,
pub execution_controller: Box<dyn ExecutionController>,
pub consensus_config: ConsensusConfig,
pub api_settings: &'static APISettings,
pub stop_node_channel: mpsc::Sender<()>,
Expand Down Expand Up @@ -116,8 +116,8 @@ pub trait Endpoints {
#[rpc(name = "execute_read_only_request")]
fn execute_read_only_request(
&self,
_: ReadOnlyExecution,
) -> BoxFuture<Result<ExecuteReadOnlyResponse, ApiError>>;
_: Vec<ReadOnlyExecution>,
) -> BoxFuture<Result<Vec<ExecuteReadOnlyResponse>, ApiError>>;

/// Remove a vec of addresses used to stake.
/// No confirmation to expect.
Expand Down
23 changes: 6 additions & 17 deletions massa-api/src/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{Endpoints, Private, RpcServer, StopHandle, API};
use jsonrpc_core::BoxFuture;
use jsonrpc_http_server::tokio::sync::mpsc;
use massa_consensus_exports::{ConsensusCommandSender, ConsensusConfig};
use massa_execution::ExecutionCommandSender;
use massa_execution_exports::ExecutionController;
use massa_models::api::{
APISettings, AddressInfo, BlockInfo, BlockSummary, EndorsementInfo, EventFilter, NodeStatus,
OperationInfo, ReadOnlyExecution, TimeInterval,
Expand All @@ -24,7 +24,7 @@ impl API<Private> {
pub fn new(
consensus_command_sender: ConsensusCommandSender,
network_command_sender: NetworkCommandSender,
execution_command_sender: ExecutionCommandSender,
execution_controller: Box<dyn ExecutionController>,
api_settings: &'static APISettings,
consensus_settings: ConsensusConfig,
) -> (Self, mpsc::Receiver<()>) {
Expand All @@ -33,7 +33,7 @@ impl API<Private> {
API(Private {
consensus_command_sender,
network_command_sender,
execution_command_sender,
execution_controller,
consensus_config: consensus_settings,
api_settings,
stop_node_channel,
Expand Down Expand Up @@ -76,20 +76,9 @@ impl Endpoints for API<Private> {

fn execute_read_only_request(
&self,
ReadOnlyExecution {
max_gas,
simulated_gas_price,
bytecode,
address,
}: ReadOnlyExecution,
) -> BoxFuture<Result<ExecuteReadOnlyResponse, ApiError>> {
let cmd_sender = self.0.execution_command_sender.clone();
let closure = async move || {
Ok(cmd_sender
.execute_read_only_request(max_gas, simulated_gas_price, bytecode, address)
.await?)
};
Box::pin(closure())
_: Vec<ReadOnlyExecution>,
) -> BoxFuture<Result<Vec<ExecuteReadOnlyResponse>, ApiError>> {
crate::wrong_api::<Vec<ExecuteReadOnlyResponse>>()
}

fn remove_staking_addresses(&self, keys: Vec<Address>) -> BoxFuture<Result<(), ApiError>> {
Expand Down
136 changes: 109 additions & 27 deletions massa-api/src/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ use crate::{Endpoints, Public, RpcServer, StopHandle, API};
use futures::{stream::FuturesUnordered, StreamExt};
use jsonrpc_core::BoxFuture;
use massa_consensus_exports::{ConsensusCommandSender, ConsensusConfig};
use massa_execution::ExecutionCommandSender;
use massa_execution_exports::{
ExecutionController, ExecutionStackElement, ReadOnlyExecutionRequest,
};
use massa_graph::{DiscardReason, ExportBlockStatus};
use massa_models::api::SCELedgerInfo;
use massa_models::execution::ReadOnlyResult;
use massa_models::{
api::{
APISettings, AddressInfo, BlockInfo, BlockInfoContent, BlockSummary, EndorsementInfo,
Expand All @@ -23,14 +27,14 @@ use massa_models::{
};
use massa_network::{NetworkCommandSender, NetworkSettings};
use massa_pool::PoolCommandSender;
use massa_signature::PrivateKey;
use massa_signature::{derive_public_key, generate_random_private_key, PrivateKey};
use massa_time::MassaTime;
use std::net::{IpAddr, SocketAddr};

impl API<Public> {
pub fn new(
consensus_command_sender: ConsensusCommandSender,
execution_command_sender: ExecutionCommandSender,
execution_controller: Box<dyn ExecutionController>,
api_settings: &'static APISettings,
consensus_settings: ConsensusConfig,
pool_command_sender: PoolCommandSender,
Expand All @@ -50,7 +54,7 @@ impl API<Public> {
network_command_sender,
compensation_millis,
node_id,
execution_command_sender,
execution_controller,
})
}
}
Expand All @@ -77,9 +81,63 @@ impl Endpoints for API<Public> {

fn execute_read_only_request(
&self,
_: ReadOnlyExecution,
) -> BoxFuture<Result<ExecuteReadOnlyResponse, ApiError>> {
crate::wrong_api::<ExecuteReadOnlyResponse>()
reqs: Vec<ReadOnlyExecution>,
) -> BoxFuture<Result<Vec<ExecuteReadOnlyResponse>, ApiError>> {
if reqs.len() > self.0.api_settings.max_arguments as usize {
let closure =
async move || Err(ApiError::TooManyArguments("too many arguments".into()));
return Box::pin(closure());
}

let mut res: Vec<ExecuteReadOnlyResponse> = Vec::with_capacity(reqs.len());
for ReadOnlyExecution {
max_gas,
address,
simulated_gas_price,
bytecode,
} in reqs
{
let address = address.unwrap_or_else(|| {
// if no addr provided, use a random one
Address::from_public_key(&derive_public_key(&generate_random_private_key()))
});

// TODO:
// * set a maximum gas value for read-only executions to prevent attacks
// * stop mapping request and result, reuse execution's structures
// * remove async stuff

// translate request
let req = ReadOnlyExecutionRequest {
max_gas,
simulated_gas_price,
bytecode,
call_stack: vec![ExecutionStackElement {
address,
coins: Default::default(),
owned_addresses: vec![address],
}],
};

// run
let result = self.0.execution_controller.execute_readonly_request(req);

// map result
let result = ExecuteReadOnlyResponse {
executed_at: result.as_ref().map_or_else(|_| Slot::new(0, 0), |v| v.slot),
result: result.as_ref().map_or_else(
|err| ReadOnlyResult::Error(format!("readonly call failed: {}", err)),
|_| ReadOnlyResult::Ok,
),
output_events: result.map_or_else(|_| Default::default(), |v| v.events.export()),
};

res.push(result);
}

// return result
let closure = async move || Ok(res);
Box::pin(closure())
}

fn remove_staking_addresses(&self, _: Vec<Address>) -> BoxFuture<Result<(), ApiError>> {
Expand Down Expand Up @@ -356,11 +414,37 @@ impl Endpoints for API<Public> {
let api_cfg = self.0.api_settings;
let pool_command_sender = self.0.pool_command_sender.clone();
let compensation_millis = self.0.compensation_millis;
let sce_command_sender = self.0.execution_command_sender.clone();
let closure = async move || {
if addresses.len() as u64 > api_cfg.max_arguments {
return Err(ApiError::TooManyArguments("too many arguments".into()));

// todo make better use of SCE ledger info

// map SCE ledger info and check for address length
let sce_ledger_info = if addresses.len() as u64 > api_cfg.max_arguments {
Err(ApiError::TooManyArguments("too many arguments".into()))
} else {
// get SCE ledger info
let mut sce_ledger_info: Map<Address, SCELedgerInfo> =
Map::with_capacity_and_hasher(addresses.len(), BuildMap::default());
for addr in &addresses {
let active_entry = match self
.0
.execution_controller
.get_final_and_active_ledger_entry(addr)
.1
{
None => continue,
Some(v) => SCELedgerInfo {
balance: v.parallel_balance,
module: Some(v.bytecode),
datastore: v.datastore.into_iter().collect(),
},
};
sce_ledger_info.insert(*addr, active_entry);
}
Ok(sce_ledger_info)
};

let closure = async move || {
let sce_ledger_info = sce_ledger_info?;

let mut res = Vec::with_capacity(addresses.len());

Expand All @@ -383,11 +467,10 @@ impl Endpoints for API<Public> {

// roll and balance info
let states = cmd_sender.get_addresses_info(addresses.iter().copied().collect());
let sce_info = sce_command_sender.get_sce_ledger_for_addresses(addresses.clone());

// wait for both simultaneously
let (next_draws, states, sce_info) = tokio::join!(next_draws, states, sce_info);
let (next_draws, mut states, sce_info) = (next_draws?, states?, sce_info?);
let (next_draws, states) = tokio::join!(next_draws, states);
let (next_draws, mut states) = (next_draws?, states?);

// operations block and endorsement info
let mut operations: Map<Address, Set<OperationId>> =
Expand Down Expand Up @@ -469,7 +552,7 @@ impl Endpoints for API<Public> {
.remove(&address)
.ok_or(ApiError::NotFound)?,
production_stats: state.production_stats,
sce_ledger_info: sce_info.get(&address).cloned().unwrap_or_default(),
sce_ledger_info: sce_ledger_info.get(&address).cloned().unwrap_or_default(),
})
}
Ok(res)
Expand Down Expand Up @@ -514,18 +597,17 @@ impl Endpoints for API<Public> {
original_operation_id,
}: EventFilter,
) -> BoxFuture<Result<Vec<SCOutputEvent>, ApiError>> {
let execution_command_sender = self.0.execution_command_sender.clone();
let closure = async move || {
Ok(execution_command_sender
.get_filtered_sc_output_event(
start,
end,
emitter_address,
original_caller_address,
original_operation_id,
)
.await?)
};
// get events
let events = self.0.execution_controller.get_filtered_sc_output_event(
start,
end,
emitter_address,
original_caller_address,
original_operation_id,
);

// TODO get rid of the async part
let closure = async move || Ok(events);
Box::pin(closure())
}
}
13 changes: 8 additions & 5 deletions massa-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.15", features = ["full"] }
tracing = "0.1"
# custom modules
parking_lot = "0.12"
tokio = { version = "1.11", features = ["full"] }
tracing = "0.1"# custom modules
massa_consensus_exports = { path = "../massa-consensus-exports" }
massa_execution = { path = "../massa-execution" }
massa_ledger = { path = "../massa-ledger" }
# custom modules
massa_graph = { path = "../massa-graph" }
massa_hash = { path = "../massa-hash" }
massa_logging = { path = "../massa-logging" }
Expand All @@ -33,6 +34,8 @@ massa_time = { path = "../massa-time" }
bitvec = { version = "0.22", features = ["serde"] }
pretty_assertions = "1.0"
serial_test = "0.5"
massa_ledger = { path = "../massa-ledger", features=["testing"] }


[features]
instrument = ["tokio/tracing", "massa_consensus_exports/instrument", "massa_execution/instrument", "massa_graph/instrument", "massa_models/instrument", "massa_network/instrument", "massa_proof_of_stake_exports/instrument", "massa_time/instrument"]
instrument = ["tokio/tracing", "massa_consensus_exports/instrument", "massa_graph/instrument", "massa_models/instrument", "massa_network/instrument", "massa_proof_of_stake_exports/instrument", "massa_time/instrument"]
6 changes: 3 additions & 3 deletions massa-bootstrap/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use crate::messages::BootstrapMessage;
use displaydoc::Display;
use massa_consensus_exports::error::ConsensusError;
use massa_execution::ExecutionError;
use massa_hash::MassaHashError;
use massa_ledger::LedgerError;
use massa_network::NetworkError;
use massa_time::TimeError;
use thiserror::Error;
Expand All @@ -30,8 +30,8 @@ pub enum BootstrapError {
ConsensusError(#[from] ConsensusError),
/// network error: {0}
NetworkError(#[from] NetworkError),
/// execution error: {0}
ExecutionError(#[from] ExecutionError),
/// ledger error: {0}
LedgerError(#[from] LedgerError),
/// join error: {0}
JoinError(#[from] tokio::task::JoinError),
/// missing private key file
Expand Down
Loading

0 comments on commit 5c8f019

Please sign in to comment.