diff --git a/operator/control-plane/src/aws.rs b/operator/control-plane/src/aws.rs index b0417942..0f67c719 100644 --- a/operator/control-plane/src/aws.rs +++ b/operator/control-plane/src/aws.rs @@ -852,7 +852,6 @@ EOF &self, job: &JobId, instance_type: InstanceType, - image_url: &str, family: &str, architecture: &str, region: &str, @@ -1343,7 +1342,6 @@ EOF pub async fn spin_up_instance( &self, - image_url: &str, job: &JobId, instance_type: &str, family: &str, @@ -1397,7 +1395,7 @@ EOF return Err(anyhow!("Required memory or vcpus are more than available")); } let instance = self - .launch_instance(job, instance_type, image_url, family, &architecture, region) + .launch_instance(job, instance_type, family, &architecture, region) .await .context("could not launch instance")?; sleep(Duration::from_secs(100)).await; @@ -1538,7 +1536,6 @@ EOF impl InfraProvider for Aws { async fn spin_up( &mut self, - eif_url: &str, job: &JobId, instance_type: &str, family: &str, @@ -1548,15 +1545,7 @@ impl InfraProvider for Aws { _bandwidth: u64, ) -> Result { let instance = self - .spin_up_instance( - eif_url, - job, - instance_type, - family, - region, - req_mem, - req_vcpu, - ) + .spin_up_instance(job, instance_type, family, region, req_mem, req_vcpu) .await .context("could not spin up instance")?; Ok(instance) diff --git a/operator/control-plane/src/market.rs b/operator/control-plane/src/market.rs index 3dd324f2..d3d299ff 100644 --- a/operator/control-plane/src/market.rs +++ b/operator/control-plane/src/market.rs @@ -15,7 +15,23 @@ use tokio::time::{Duration, Instant}; use tokio_stream::StreamExt; use tracing::{error, info, info_span, Instrument}; -// IMPORTANT: do not import SystemTime, use the now_timestamp helper +// IMPORTANT: do not import SystemTime, use a SystemContext + +// Trait to encapsulate behaviour that should be simulated in tests +trait SystemContext { + fn now_timestamp(&self) -> Duration; +} + +struct RealSystemContext {} + +impl SystemContext for RealSystemContext { + fn now_timestamp(&self) -> Duration { + use std::time::SystemTime; + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + } +} // Basic architecture: // One future listening to new jobs @@ -34,7 +50,6 @@ pub struct JobId { pub trait InfraProvider { fn spin_up( &mut self, - eif_url: &str, job: &JobId, instance_type: &str, family: &str, @@ -100,7 +115,6 @@ where { async fn spin_up( &mut self, - eif_url: &str, job: &JobId, instance_type: &str, family: &str, @@ -111,7 +125,6 @@ where ) -> Result { (**self) .spin_up( - eif_url, job, instance_type, family, @@ -455,6 +468,7 @@ async fn job_manager( let job_stream = std::pin::pin!(res.unwrap()); let res = job_manager_once( + RealSystemContext {}, job_stream, infra_provider.clone(), job_id.clone(), @@ -511,6 +525,9 @@ fn whitelist_blacklist_check( } struct JobState<'a> { + // NOTE: not sure if dyn is a good idea, revisit later + context: &'a (dyn SystemContext + Send + Sync), + job_id: JobId, launch_delay: u64, allowed_regions: &'a [String], @@ -541,15 +558,21 @@ struct JobState<'a> { } impl<'a> JobState<'a> { - fn new(job_id: JobId, launch_delay: u64, allowed_regions: &[String]) -> JobState { + fn new( + context: &'a (dyn SystemContext + Send + Sync), + job_id: JobId, + launch_delay: u64, + allowed_regions: &'a [String], + ) -> JobState<'a> { // solvency metrics // default of 60s JobState { + context, job_id, launch_delay, allowed_regions, balance: U256::from(360), - last_settled: now_timestamp(), + last_settled: context.now_timestamp(), rate: U256::from(1), original_rate: U256::from(1), instance_id: String::new(), @@ -571,7 +594,7 @@ impl<'a> JobState<'a> { } fn insolvency_duration(&self) -> Duration { - let now_ts = now_timestamp(); + let now_ts = self.context.now_timestamp(); if self.rate == U256::ZERO { Duration::from_secs(0) @@ -733,7 +756,6 @@ impl<'a> JobState<'a> { info!("Launching new instance"); let res = infra_provider .spin_up( - self.eif_url.as_str(), &self.job_id, self.instance_type.as_str(), self.family.as_str(), @@ -1205,6 +1227,8 @@ impl<'a> JobState<'a> { } self.eif_url = url.to_string(); self.eif_update = true; + // WARN: this effectively delays the launch + // should revisit and see if it is desirable self.schedule_launch(self.launch_delay); } else { error!(topic = ?log.topics()[0], "Unknown event"); @@ -1218,6 +1242,7 @@ impl<'a> JobState<'a> { // manage the complete lifecycle of a job // returns true if "done" async fn job_manager_once( + context: impl SystemContext + Send + Sync, mut job_stream: impl StreamExt + Unpin, mut infra_provider: impl InfraProvider + Send + Sync, job_id: JobId, @@ -1228,7 +1253,7 @@ async fn job_manager_once( address_whitelist: &[String], address_blacklist: &[String], ) -> i8 { - let mut state = JobState::new(job_id, aws_delay_duration, allowed_regions); + let mut state = JobState::new(&context, job_id, aws_delay_duration, allowed_regions); let res = 'event: loop { // compute time to insolvency @@ -1346,32 +1371,13 @@ async fn job_logs( Ok(stream) } -#[cfg(not(test))] -fn now_timestamp() -> Duration { - // import here to ensure it is used only through this function - use std::time::SystemTime; - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() -} - -#[cfg(test)] -static START: std::sync::OnceLock = std::sync::OnceLock::new(); - -#[cfg(test)] -fn now_timestamp() -> Duration { - Instant::now() - *START.get().unwrap() -} - // -------------------------------------------------------------------------------------------------------------------------------------------------------- // TESTS // -------------------------------------------------------------------------------------------------------------------------------------------------------- #[cfg(test)] mod tests { - use std::str::FromStr; - - use alloy::hex::ToHexExt; + use alloy::hex::{FromHex, ToHexExt}; use alloy::primitives::{Bytes, B256, U256}; use alloy::rpc::types::eth::Log; use alloy::sol_types::SolValue; @@ -1379,19 +1385,48 @@ mod tests { use tokio_stream::StreamExt; use crate::market; - use crate::test::{self, Action, TestAws, TestAwsOutcome}; + use crate::test::{ + self, compute_address_word, compute_instance_id, Action, TestAws, TestAwsOutcome, + }; - #[tokio::test(start_paused = true)] - async fn test_instance_launch_after_delay_on_spin_up() { - let _ = market::START.set(Instant::now()); + use super::SystemContext; - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), - (301, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + struct TestSystemContext { + start: Instant, + } + + impl SystemContext for TestSystemContext { + fn now_timestamp(&self) -> Duration { + Instant::now() - self.start + } + } + + struct JobManagerParams { + job_id: market::JobId, + allowed_regions: Vec, + address_whitelist: Vec, + address_blacklist: Vec, + } + + struct TestResults { + res: i8, + outcomes: Vec, + } + + async fn run_test( + start_time: Instant, + logs: Vec<(u64, Action, Vec)>, + job_manager_params: JobManagerParams, + test_results: TestResults, + ) { + let context = TestSystemContext { start: start_time }; + + let job_num = B256::from_hex(&job_manager_params.job_id.id).unwrap(); + let job_logs: Vec<(u64, Log)> = logs + .into_iter() + .map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))) + .collect(); - let start_time = Instant::now(); // pending stream appended so job stream never ends let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) .then(|(moment, log)| async move { @@ -1400,1170 +1435,892 @@ mod tests { log }) .chain(tokio_stream::pending())); + let mut aws: TestAws = Default::default(); let res = market::job_manager_once( + context, job_stream, &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), - operator: "abc".into(), - contract: "xyz".into(), - chain: "123".into(), - }, - &["ap-south-1".into()], + job_manager_params.job_id, + &job_manager_params.allowed_regions, 300, &test::get_rates(), &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), + &job_manager_params.address_whitelist, + &job_manager_params.address_blacklist, ) .await; - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_type == "c6a.xlarge"); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.contract_address == "xyz"); - assert!(out.chain_id == "123"); - } else { - panic!(); - }; + assert!(aws.instances.is_empty()); - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.debug == false); - } else { - panic!(); + assert_eq!(res, test_results.res); + assert_eq!(aws.outcomes, test_results.outcomes); + } + + #[tokio::test(start_paused = true)] + async fn test_instance_launch_after_delay_on_spin_up() { + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); + + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), + (301, Action::Close, [].into()), + ]; + + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), + operator: "abc".into(), + contract: "xyz".into(), + chain: "123".into(), + }, + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], }; - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 1); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(301), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_instance_launch_with_debug_mode_on_spin_up() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2,\"debug\":true}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2,\"debug\":true}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (301, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_type == "c6a.xlarge"); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.contract_address == "xyz"); - assert!(out.chain_id == "123"); - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], }; - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.debug == true); - } else { - panic!(); - }; - - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 1); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: true, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(301), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_instance_launch_after_delay_on_spin_up_with_specific_family() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2,\"family\":\"tuna\"}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2,\"family\":\"tuna\"}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (301, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "tuna"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - } else { - panic!(); - }; - - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "tuna"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.debug == false); - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], }; - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 1); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "tuna".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "tuna".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(301), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_deposit_withdraw_settle() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (40, Action::Deposit, 500.abi_encode()), (60, Action::Withdraw, 500.abi_encode()), (100, Action::Settle, (2, 6).abi_encode_sequence()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_type == "c6a.xlarge"); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.contract_address == "xyz"); - assert!(out.chain_id == "123"); - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], }; - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.debug == false); - } else { - panic!(); - }; - - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 205); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(505), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_revise_rate_cancel() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (50, Action::ReviseRateInitiated, 32000000000000u64.abi_encode()), (100, Action::ReviseRateFinalized, 32000000000000u64.abi_encode()), (150, Action::ReviseRateInitiated, 60000000000000u64.abi_encode()), (200, Action::ReviseRateCancelled, [].into()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_type == "c6a.xlarge"); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.contract_address == "xyz"); - assert!(out.chain_id == "123"); - } else { - panic!(); - }; - - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.debug == false); - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], }; - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 205); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(505), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_unsupported_region() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-east-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-east-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; + + let test_results = TestResults { + res: -2, + outcomes: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, -2); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_region_not_found() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; + + let test_results = TestResults { + res: -2, + outcomes: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, -2); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_instance_type_not_found() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; + + let test_results = TestResults { + res: -2, + outcomes: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, -2); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_unsupported_instance() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.vsmall\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.vsmall\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, -2); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + let test_results = TestResults { + res: -2, + outcomes: vec![], + }; + + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_eif_url_not_found() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"instance\":\"c6a.vsmall\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"instance\":\"c6a.vsmall\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, -2); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + let test_results = TestResults { + res: -2, + outcomes: vec![], + }; + + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_min_rate() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),29000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),29000000000000u64,31000u64,0).abi_encode_sequence()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, 0); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + let test_results = TestResults { + res: 0, + outcomes: vec![], + }; + + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_rate_exceed_balance() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,0u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,0u64,0).abi_encode_sequence()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; + + let test_results = TestResults { + res: 0, + outcomes: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, 0); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } + // NOTE: This scenario should be impossible based on how the contract should be written + // Nevertheless, the cp should handle it to be defensive, so we test #[tokio::test(start_paused = true)] async fn test_withdrawal_exceed_rate() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (350, Action::Withdraw, 30000u64.abi_encode()), (500, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!( - B256::from_str(&out.job).unwrap() == job_num - && out.instance_type == "c6a.xlarge" - && out.family == "salmon" - && out.region == "ap-south-1" - && out.req_mem == 4096 - && out.req_vcpu == 2 - && out.bandwidth == 76 - && out.eif_url == "https://example.com/enclave.eif" - && out.contract_address == "xyz" - && out.chain_id == "123" - ) - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], }; - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!( - B256::from_str(&out.job).unwrap() == job_num - && out.instance_id == instance_id - && out.family == "salmon" - && out.region == "ap-south-1" - && out.req_mem == 4096 - && out.req_vcpu == 2 - && out.bandwidth == 76 - && out.eif_url == "https://example.com/enclave.eif" - && out.debug == false - ) - } else { - panic!(); - }; - - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 50); - assert!(B256::from_str(&out.job).unwrap() == job_num && out.region == *"ap-south-1") - } else { - panic!(); + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(350), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_revise_rate_lower_higher() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (350, Action::ReviseRateInitiated, 29000000000000u64.abi_encode()), (400, Action::ReviseRateFinalized, 29000000000000u64.abi_encode()), (450, Action::ReviseRateInitiated, 31000000000000u64.abi_encode()), (500, Action::ReviseRateFinalized, 31000000000000u64.abi_encode()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_type == "c6a.xlarge"); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.contract_address == "xyz"); - assert!(out.chain_id == "123"); - } else { - panic!(); - }; - - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.debug == false); - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], }; - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 50); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(350), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_address_whitelisted() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (500, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::from([ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ec".to_string(), - ]), - &Vec::new(), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_type == "c6a.xlarge"); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.contract_address == "xyz"); - assert!(out.chain_id == "123"); - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![compute_address_word("owner").encode_hex_with_prefix()], + address_blacklist: vec![], }; - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.debug == false); - } else { - panic!(); - }; - - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 200); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + // real owner of the job is compute_address_word("owner") + // expected to deploy + + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(500), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_address_not_whitelisted() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (500, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::from([ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ed".to_string(), - ]), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![compute_address_word("notowner").encode_hex_with_prefix()], + address_blacklist: vec![], + }; + + // real owner of the job is compute_address_word("owner") + // expected to not deploy + + let test_results = TestResults { + res: 0, + outcomes: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, 0); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_address_blacklisted() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (500, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::from([ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ec".to_string(), - ]), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![compute_address_word("owner").encode_hex_with_prefix()], + }; + + // real owner of the job is compute_address_word("owner") + // expected to not deploy + + let test_results = TestResults { + res: 0, + outcomes: vec![], + }; - // job manager should have finished successfully - assert_eq!(res, 0); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_address_not_blacklisted() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (500, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::from([ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ed".to_string(), - ]), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_type == "c6a.xlarge"); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.contract_address == "xyz"); - assert!(out.chain_id == "123"); - } else { - panic!(); - }; - - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/enclave.eif"); - assert!(out.debug == false); - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![compute_address_word("notowner").encode_hex_with_prefix()], }; - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 200); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + // real owner of the job is compute_address_word("owner") + // expected to deploy + + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(300), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(500), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } // Tests for whitelist blacklist checks #[tokio::test] async fn test_whitelist_blacklist_check_no_list() { - let _ = market::START.set(Instant::now()); - let log = test::get_log(Action::Open, - Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), B256::ZERO); let address_whitelist = vec![]; let address_blacklist = vec![]; + // real owner of the job is compute_address_word("owner") + assert!(market::whitelist_blacklist_check( log.clone(), &address_whitelist, @@ -2573,17 +2330,17 @@ mod tests { #[tokio::test] async fn test_whitelist_blacklist_check_whitelisted() { - let _ = market::START.set(Instant::now()); - let log = test::get_log(Action::Open, - Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), B256::ZERO); let address_whitelist = vec![ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ec".to_string(), - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ed".to_string(), + compute_address_word("owner").encode_hex_with_prefix(), + compute_address_word("notowner").encode_hex_with_prefix(), ]; let address_blacklist = vec![]; + // real owner of the job is compute_address_word("owner") + assert!(market::whitelist_blacklist_check( log.clone(), &address_whitelist, @@ -2593,17 +2350,17 @@ mod tests { #[tokio::test] async fn test_whitelist_blacklist_check_not_whitelisted() { - let _ = market::START.set(Instant::now()); - let log = test::get_log(Action::Open, - Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), B256::ZERO); let address_whitelist = vec![ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49eb".to_string(), - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ed".to_string(), + compute_address_word("notownereither").encode_hex_with_prefix(), + compute_address_word("notowner").encode_hex_with_prefix(), ]; let address_blacklist = vec![]; + // real owner of the job is compute_address_word("owner") + assert!(!market::whitelist_blacklist_check( log.clone(), &address_whitelist, @@ -2613,17 +2370,17 @@ mod tests { #[tokio::test] async fn test_whitelist_blacklist_check_blacklisted() { - let _ = market::START.set(Instant::now()); - let log = test::get_log(Action::Open, - Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), B256::ZERO); let address_whitelist = vec![]; let address_blacklist = vec![ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ec".to_string(), - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ed".to_string(), + compute_address_word("owner").encode_hex_with_prefix(), + compute_address_word("notowner").encode_hex_with_prefix(), ]; + // real owner of the job is compute_address_word("owner") + assert!(!market::whitelist_blacklist_check( log.clone(), &address_whitelist, @@ -2633,17 +2390,17 @@ mod tests { #[tokio::test] async fn test_whitelist_blacklist_check_not_blacklisted() { - let _ = market::START.set(Instant::now()); - let log = test::get_log(Action::Open, - Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), B256::ZERO); let address_whitelist = vec![]; let address_blacklist = vec![ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49eb".to_string(), - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ed".to_string(), + compute_address_word("notownereither").encode_hex_with_prefix(), + compute_address_word("notowner").encode_hex_with_prefix(), ]; + // real owner of the job is compute_address_word("owner") + assert!(market::whitelist_blacklist_check( log.clone(), &address_whitelist, @@ -2653,20 +2410,20 @@ mod tests { #[tokio::test] async fn test_whitelist_blacklist_check_neither() { - let _ = market::START.set(Instant::now()); - let log = test::get_log(Action::Open, - Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), B256::ZERO); let address_whitelist = vec![ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49eb".to_string(), - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ed".to_string(), + compute_address_word("notownereither").encode_hex_with_prefix(), + compute_address_word("notowner").encode_hex_with_prefix(), ]; let address_blacklist = vec![ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ea".to_string(), - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ee".to_string(), + compute_address_word("definitelynotownereither").encode_hex_with_prefix(), + compute_address_word("definitelynotowner").encode_hex_with_prefix(), ]; + // real owner of the job is compute_address_word("owner") + assert!(!market::whitelist_blacklist_check( log.clone(), &address_whitelist, @@ -2676,20 +2433,20 @@ mod tests { #[tokio::test] async fn test_whitelist_blacklist_check_both() { - let _ = market::START.set(Instant::now()); - let log = test::get_log(Action::Open, - Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + Bytes::from(("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), B256::ZERO); let address_whitelist = vec![ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ec".to_string(), - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ed".to_string(), + compute_address_word("owner").encode_hex_with_prefix(), + compute_address_word("notowner").encode_hex_with_prefix(), ]; let address_blacklist = vec![ - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49ec".to_string(), - "0x0000000000000000000000000f5f91ba30a00bd43bd19466f020b3e5fc7a49eb".to_string(), + compute_address_word("owner").encode_hex_with_prefix(), + compute_address_word("definitelynotowner").encode_hex_with_prefix(), ]; + // real owner of the job is compute_address_word("owner") + assert!(!market::whitelist_blacklist_check( log.clone(), &address_whitelist, @@ -2753,178 +2510,128 @@ mod tests { #[tokio::test(start_paused = true)] async fn test_eif_update_after_spin_up() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (100, Action::MetadataUpdated, "{\"region\":\"ap-south-1\",\"url\":\"https://example.com/updated-enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string().abi_encode()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; - - // job manager should have finished successfully - assert_eq!(res, 0); - let spin_up_tv_sec: Instant; - let instance_id: String; - if let TestAwsOutcome::SpinUp(out) = &aws.outcomes[0] { - spin_up_tv_sec = out.time; - instance_id = out.instance_id.clone(); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_type == "c6a.xlarge"); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/updated-enclave.eif"); - assert!(out.contract_address == "xyz"); - assert!(out.chain_id == "123"); - } else { - panic!(); + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], }; - if let TestAwsOutcome::RunEnclave(out) = &aws.outcomes[1] { - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.instance_id == instance_id); - assert!(out.family == "salmon"); - assert!(out.region == "ap-south-1"); - assert!(out.req_mem == 4096); - assert!(out.req_vcpu == 2); - assert!(out.bandwidth == 76); - assert!(out.eif_url == "https://example.com/updated-enclave.eif"); - assert!(out.debug == false); - } else { - panic!(); - }; - - if let TestAwsOutcome::SpinDown(out) = &aws.outcomes[2] { - assert_eq!((out.time - spin_up_tv_sec).as_secs(), 105); - assert!(B256::from_str(&out.job).unwrap() == job_num); - assert!(out.region == *"ap-south-1"); - } else { - panic!(); + let test_results = TestResults { + res: 0, + outcomes: vec![ + TestAwsOutcome::SpinUp(test::SpinUpOutcome { + time: start_time + Duration::from_secs(400), + job: job_id.clone(), + instance_type: "c6a.xlarge".into(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + contract_address: "xyz".into(), + chain_id: "123".into(), + instance_id: compute_instance_id(0), + }), + TestAwsOutcome::RunEnclave(test::RunEnclaveOutcome { + time: start_time + Duration::from_secs(400), + job: job_id.clone(), + family: "salmon".into(), + region: "ap-south-1".into(), + req_mem: 4096, + req_vcpu: 2, + bandwidth: 76, + instance_id: compute_instance_id(0), + eif_url: "https://example.com/updated-enclave.eif".into(), + debug: false, + }), + TestAwsOutcome::SpinDown(test::SpinDownOutcome { + time: start_time + Duration::from_secs(505), + job: job_id, + instance_id: compute_instance_id(0), + region: "ap-south-1".into(), + }), + ], }; - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } #[tokio::test(start_paused = true)] async fn test_other_metadata_update_after_spin_up() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), // instance type has also been updated in the metadata. should fail this job. (100, Action::MetadataUpdated, "{\"region\":\"ap-south-1\",\"url\":\"https://example.com/updated-enclave.eif\",\"instance\":\"c6a.large\",\"memory\":4096,\"vcpu\":2}".to_string().abi_encode()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; - // job manager should have errored out - assert_eq!(res, -2); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + let test_results = TestResults { + res: -2, + outcomes: vec![], + }; + + run_test(start_time, logs, job_manager_params, test_results).await; } + // TODO: Should this work like this? #[tokio::test(start_paused = true)] async fn test_metadata_update_event_with_no_updates_after_spin_up() { - let _ = market::START.set(Instant::now()); + let start_time = Instant::now(); + let job_id = format!("{:064x}", 1); - let job_num = U256::from(1).into(); - let job_logs: Vec<(u64, Log)> = vec![ - (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,market::now_timestamp().as_secs()).abi_encode_sequence()), - // instance type has also been updated in the metadata. should fail this job. + let logs = vec![ + (0, Action::Open, ("{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string(),31000000000000u64,31000u64,0).abi_encode_sequence()), (100, Action::MetadataUpdated, "{\"region\":\"ap-south-1\",\"url\":\"https://example.com/enclave.eif\",\"instance\":\"c6a.xlarge\",\"memory\":4096,\"vcpu\":2}".to_string().abi_encode()), (505, Action::Close, [].into()), - ].into_iter().map(|x| (x.0, test::get_log(x.1, Bytes::from(x.2), job_num))).collect(); + ]; - let start_time = Instant::now(); - // pending stream appended so job stream never ends - let job_stream = std::pin::pin!(tokio_stream::iter(job_logs.into_iter()) - .then(|(moment, log)| async move { - let delay = start_time + Duration::from_secs(moment) - Instant::now(); - sleep(delay).await; - log - }) - .chain(tokio_stream::pending())); - let mut aws: TestAws = Default::default(); - let res = market::job_manager_once( - job_stream, - &mut aws, - market::JobId { - id: job_num.encode_hex_with_prefix(), + let job_manager_params = JobManagerParams { + job_id: market::JobId { + id: job_id.clone(), operator: "abc".into(), contract: "xyz".into(), chain: "123".into(), }, - &["ap-south-1".into()], - 300, - &test::get_rates(), - &test::get_gb_rates(), - &Vec::new(), - &Vec::new(), - ) - .await; + allowed_regions: vec!["ap-south-1".to_owned()], + address_whitelist: vec![], + address_blacklist: vec![], + }; + + let test_results = TestResults { + res: -2, + outcomes: vec![], + }; - // job manager should have errored out - assert_eq!(res, -2); - assert!(aws.outcomes.is_empty()); - assert!(!aws.instances.contains_key(&job_num.to_string())) + run_test(start_time, logs, job_manager_params, test_results).await; } } diff --git a/operator/control-plane/src/server.rs b/operator/control-plane/src/server.rs index 9560a9c9..9dafa931 100644 --- a/operator/control-plane/src/server.rs +++ b/operator/control-plane/src/server.rs @@ -172,7 +172,7 @@ mod tests { for id in 1..4 { let temp_job_id = U256::from(id).to_be_bytes::<32>().encode_hex_with_prefix(); - let instance_metadata = InstanceMetadata::new(None, None).await; + let instance_metadata = InstanceMetadata::new(0).await; aws.instances .insert(temp_job_id.clone(), instance_metadata.clone()); @@ -228,7 +228,7 @@ mod tests { for id in 1..4 { let temp_job_id = U256::from(id).to_be_bytes::<32>().encode_hex_with_prefix(); - let instance_metadata = InstanceMetadata::new(None, None).await; + let instance_metadata = InstanceMetadata::new(0).await; aws.instances .insert(temp_job_id.clone(), instance_metadata.clone()); diff --git a/operator/control-plane/src/test.rs b/operator/control-plane/src/test.rs index b3fa176c..99f56d03 100644 --- a/operator/control-plane/src/test.rs +++ b/operator/control-plane/src/test.rs @@ -1,9 +1,8 @@ use std::collections::HashMap; +use std::hash::{DefaultHasher, Hasher}; use std::str::FromStr; -use alloy::hex::ToHexExt; -use alloy::primitives::B128; -use alloy::primitives::{keccak256, Address, Bytes, LogData, B256, B64, U256}; +use alloy::primitives::{keccak256, Address, Bytes, FixedBytes, LogData, B256, U256}; use alloy::providers::Provider; use alloy::pubsub::PubSubFrontend; use alloy::rpc::types::eth::Log; @@ -14,7 +13,7 @@ use tokio_stream::StreamExt; use crate::market::{GBRateCard, InfraProvider, JobId, LogsProvider, RateCard, RegionalRates}; #[cfg(test)] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct SpinUpOutcome { pub time: Instant, pub job: String, @@ -24,14 +23,13 @@ pub struct SpinUpOutcome { pub req_mem: i64, pub req_vcpu: i32, pub bandwidth: u64, - pub eif_url: String, pub contract_address: String, pub chain_id: String, pub instance_id: String, } #[cfg(test)] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct SpinDownOutcome { pub time: Instant, pub job: String, @@ -40,7 +38,7 @@ pub struct SpinDownOutcome { } #[cfg(test)] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct RunEnclaveOutcome { pub time: Instant, pub job: String, @@ -55,11 +53,57 @@ pub struct RunEnclaveOutcome { } #[cfg(test)] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] +pub struct UpdateEnclaveImageOutcome { + pub time: Instant, + pub instance_id: String, + pub region: String, + pub eif_url: String, + pub req_mem: i64, + pub req_vcpu: i32, +} + +#[cfg(test)] +#[derive(Clone, Debug, PartialEq)] pub enum TestAwsOutcome { SpinUp(SpinUpOutcome), SpinDown(SpinDownOutcome), RunEnclave(RunEnclaveOutcome), + UpdateEnclaveImage(UpdateEnclaveImageOutcome), +} + +pub fn compute_instance_id(counter: u64) -> String { + let mut hasher = DefaultHasher::new(); + hasher.write_u8(0); + hasher.write_u64(counter); + + let hash = hasher.finish(); + + format!("{:x}", hash) +} + +pub fn compute_instance_ip(counter: u64) -> String { + let mut hasher = DefaultHasher::new(); + hasher.write_u8(1); + hasher.write_u64(counter); + + let hash = hasher.finish(); + + hash.to_le_bytes() + .iter() + .map(|x| x.to_string()) + .reduce(|a, b| a + "." + &b) + .unwrap() +} + +pub fn compute_address_word(salt: &str) -> FixedBytes<32> { + let mut hasher = DefaultHasher::new(); + hasher.write_u8(2); + hasher.write(salt.as_bytes()); + + let hash = hasher.finish(); + + FixedBytes::<32>::from_slice(hash.to_le_bytes().repeat(4).as_slice()) } #[cfg(test)] @@ -71,17 +115,10 @@ pub struct InstanceMetadata { #[cfg(test)] impl InstanceMetadata { - pub async fn new(instance_id: Option, ip_address: Option) -> Self { - let instance_id = "i-".to_string() + &instance_id.unwrap_or(B128::random().encode_hex()); - - let ip_address = ip_address.unwrap_or( - B64::random() - .as_slice() - .iter() - .map(|x| x.to_string()) - .reduce(|a, b| a + "." + &b) - .unwrap(), - ); + pub async fn new(counter: u64) -> Self { + let instance_id = compute_instance_id(counter); + let ip_address = compute_instance_ip(counter); + Self { instance_id, ip_address, @@ -96,13 +133,14 @@ pub struct TestAws { // HashMap format - (Job, InstanceMetadata) pub instances: HashMap, + + counter: u64, } #[cfg(test)] impl InfraProvider for TestAws { async fn spin_up( &mut self, - eif_url: &str, job: &JobId, instance_type: &str, family: &str, @@ -122,7 +160,6 @@ impl InfraProvider for TestAws { req_mem, req_vcpu, bandwidth, - eif_url: eif_url.to_owned(), contract_address: job.contract.clone(), chain_id: job.chain.clone(), instance_id: x.1.instance_id.clone(), @@ -131,7 +168,9 @@ impl InfraProvider for TestAws { return Ok(x.1.instance_id.clone()); } - let instance_metadata: InstanceMetadata = InstanceMetadata::new(None, None).await; + let instance_metadata: InstanceMetadata = InstanceMetadata::new(self.counter).await; + self.counter += 1; + self.instances .insert(job.id.clone(), instance_metadata.clone()); @@ -144,7 +183,6 @@ impl InfraProvider for TestAws { req_mem, req_vcpu, bandwidth, - eif_url: eif_url.to_owned(), contract_address: job.contract.clone(), chain_id: job.chain.clone(), instance_id: instance_metadata.instance_id.clone(), @@ -229,51 +267,16 @@ impl InfraProvider for TestAws { req_vcpu: i32, req_mem: i64, ) -> Result<()> { - let job_id = self.instances.iter().find_map(|(key, val)| { - if val.instance_id == instance_id { - Some(key) - } else { - None - } - }); - if job_id.is_none() { - return Err(anyhow!( - "Instance not found for instance_id - {instance_id}" - )); - } - let job_id = job_id.unwrap(); - - let spin_up_outcome_index = - self.outcomes - .iter() - .enumerate() - .find_map(|(i, outcome)| match outcome { - TestAwsOutcome::SpinUp(spin_up) if spin_up.job == instance_id => Some(i), - _ => None, - }); - - if spin_up_outcome_index.is_none() { - return Err(anyhow!("Spin up outcome not found for job - {}", job_id)); - } - - let spin_up_outcome_index = spin_up_outcome_index.unwrap(); - - if let TestAwsOutcome::SpinUp(spin_up_outcome) = &mut self.outcomes[spin_up_outcome_index] { - if spin_up_outcome.region != region - || spin_up_outcome.req_vcpu != req_vcpu - || spin_up_outcome.req_mem != req_mem - { - return Err(anyhow!("Can only change EIF URL")); - } - - if spin_up_outcome.eif_url == eif_url { - return Err(anyhow!("Must input a different EIF URL")); - } - - eif_url.clone_into(&mut spin_up_outcome.eif_url); - } else { - panic!("Spin up outcome not found at proper index for the job.") - } + self.outcomes.push(TestAwsOutcome::UpdateEnclaveImage( + UpdateEnclaveImageOutcome { + time: Instant::now(), + instance_id: instance_id.to_owned(), + region: region.to_owned(), + eif_url: eif_url.to_owned(), + req_mem, + req_vcpu, + }, + )); Ok(()) } @@ -354,7 +357,7 @@ pub fn get_gb_rates() -> Vec { pub fn get_log(topic: Action, data: Bytes, idx: B256) -> Log { let mut log = Log { inner: alloy::primitives::Log { - address: Address::from_str("0x0F5F91BA30a00bD43Bd19466f020B3E5fc7a49ec").unwrap(), + address: Address::from_str("0x000000000000000000000000000000000000dead").unwrap(), data: LogData::new_unchecked(vec![], Bytes::new()), }, ..Default::default() @@ -365,8 +368,8 @@ pub fn get_log(topic: Action, data: Bytes, idx: B256) -> Log { vec![ keccak256("JobOpened(bytes32,string,address,address,uint256,uint256,uint256)"), idx, - log.address().into_word(), - log.address().into_word(), + compute_address_word("owner"), + compute_address_word("provider"), ], data, ); @@ -386,7 +389,7 @@ pub fn get_log(topic: Action, data: Bytes, idx: B256) -> Log { vec![ keccak256("JobDeposited(bytes32,address,uint256)"), idx, - log.address().into_word(), + compute_address_word("depositor"), ], data, ); @@ -396,7 +399,7 @@ pub fn get_log(topic: Action, data: Bytes, idx: B256) -> Log { vec![ keccak256("JobWithdrew(bytes32,address,uint256)"), idx, - log.address().into_word(), + compute_address_word("withdrawer"), ], data, );