diff --git a/Cargo.lock b/Cargo.lock index 4d76e8b..7cb6200 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -378,7 +378,7 @@ dependencies = [ [[package]] name = "ckb-bitcoin-spv-verifier" version = "0.1.0" -source = "git+https://github.com/ckb-cell/ckb-bitcoin-spv?rev=837a307#837a307f5b1b9193621f032c2370bb9e91506b81" +source = "git+https://github.com/ckb-cell/ckb-bitcoin-spv?rev=a6fce4b#a6fce4b5333967905a6d3859c56babc985ef89e9" dependencies = [ "bitcoin", "bitcoin_hashes", diff --git a/Cargo.toml b/Cargo.toml index 7de0920..947c1e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ ckb-hash = "0.114" [dependencies.ckb-bitcoin-spv-verifier] version = "0.1.0" git = "https://github.com/ckb-cell/ckb-bitcoin-spv" -rev = "837a307" +rev = "a6fce4b" [features] default = ["default-tls"] diff --git a/src/cli/deploy.rs b/src/cli/deploy.rs index 814d8a0..87d6b9a 100644 --- a/src/cli/deploy.rs +++ b/src/cli/deploy.rs @@ -62,10 +62,10 @@ pub struct Args { impl Args { // TODO Deploy the Bitcoin SPV contract as type script. pub fn execute(&self) -> Result<()> { - log::info!("Try to deploy a contract on CKB."); + log::info!("Try to deploy a contract on CKB"); if self.contract_owner.network() != self.ckb.network { - let msg = "The input addresses and the selected network are not matched."; + let msg = "The input addresses and the selected network are not matched"; return Err(Error::Cli(msg.to_owned())); } @@ -74,7 +74,7 @@ impl Args { Error::other(msg) })?; log::info!( - "The contract requires {} CKBytes for its data.", + "The contract requires {} CKBytes for its data", HumanCapacity::from(contract_data_capacity.as_u64()) ); @@ -101,13 +101,13 @@ impl Args { let address = CkbAddress::new(self.ckb.network, payload, true); (address, sk) })?; - log::info!("The contract deployer is {deployer}."); + log::info!("The contract deployer is {deployer}"); let iterator = InputIterator::new_with_address(&[deployer], &network_info); let mut builder = SimpleTransactionBuilder::new(configuration, iterator); builder.add_output_and_data(output, self.contract_data.pack()); let data_hash = packed::CellOutput::calc_data_hash(&self.contract_data); - log::info!("The contract data hash is {data_hash:#x}."); + log::info!("The contract data hash is {data_hash:#x}"); let mut tx_with_groups = builder.build(&Default::default())?; diff --git a/src/cli/init.rs b/src/cli/init.rs index bbf5005..d1e155f 100644 --- a/src/cli/init.rs +++ b/src/cli/init.rs @@ -59,6 +59,8 @@ pub struct Args { /// The start height of the new Bitcoin SPV instance. /// /// This height should be multiples of number 2016. + /// + // TODO Input hash rather than height. #[arg(long, required = true)] pub(crate) bitcoin_start_height: u32, @@ -103,7 +105,7 @@ pub struct Args { impl Args { // TODO Split this method into several smaller methods. pub fn execute(&self) -> Result<()> { - log::info!("Try to initialize a Bitcoin SPV instance on CKB."); + log::info!("Try to initialize a Bitcoin SPV instance on CKB"); self.check_inputs()?; log::info!("The bitcoin start height is {}", self.bitcoin_start_height); @@ -132,7 +134,7 @@ impl Args { let address = CkbAddress::new(self.ckb.network, payload, true); (address, sk) })?; - log::info!("The contract deployer is {deployer}."); + log::info!("The contract deployer is {deployer}"); let spv_outputs_data = { let spv_info = packed::SpvInfo::new_builder().build(); @@ -159,7 +161,7 @@ impl Args { .build(); tx_builder.cell_dep(spv_contract_cell_dep.clone()); - log::debug!("Try to find the first live cell for {deployer}."); + log::debug!("Try to find the first live cell for {deployer}"); let input0 = iterator .next() .transpose() @@ -168,7 +170,7 @@ impl Args { Error::other(msg) })? .ok_or_else(|| { - let msg = format!("{deployer} has no live cell."); + let msg = format!("{deployer} has no live cell"); Error::other(msg) })?; @@ -311,7 +313,7 @@ impl Args { let mut check_result = None; for (mut input_index, input) in iterator.enumerate() { input_index += 1; // The first input has been handled. - log::debug!("Try to find the {input_index}-th live cell for {deployer}."); + log::debug!("Try to find the {input_index}-th live cell for {deployer}"); let input = input.map_err(|err| { let msg = format!( "failed to find {input_index}-th live cell for {deployer} since {err}" @@ -356,7 +358,7 @@ impl Args { check_result } .ok_or_else(|| { - let msg = format!("{deployer}'s live cells are not enough."); + let msg = format!("{deployer}'s live cells are not enough"); Error::other(msg) })?; @@ -375,7 +377,7 @@ impl Args { fn check_inputs(&self) -> Result<()> { if self.spv_owner.network() != self.ckb.network { - let msg = "The input addresses and the selected network are not matched."; + let msg = "The input addresses and the selected network are not matched"; return Err(Error::cli(msg)); } @@ -401,7 +403,7 @@ impl Args { fn check_remotes(&self) -> Result<()> { if self.spv_owner.network() != self.ckb.network { - let msg = "The input addresses and the selected network are not matched."; + let msg = "The input addresses and the selected network are not matched"; return Err(Error::cli(msg)); } diff --git a/src/cli/serve.rs b/src/cli/serve.rs index b21f9c5..9e4f476 100644 --- a/src/cli/serve.rs +++ b/src/cli/serve.rs @@ -29,7 +29,9 @@ use clap::Parser; use secp256k1::SecretKey; use crate::{ - components::{ApiServiceConfig, SpvClientCell, SpvInfoCell, SpvService, Storage}, + components::{ + ApiServiceConfig, SpvOperation, SpvReorgInput, SpvService, SpvUpdateInput, Storage, + }, prelude::*, result::{Error, Result}, }; @@ -60,11 +62,6 @@ pub struct Args { #[arg(long, default_value = "30")] pub(crate) interval: u64, - /// Take a break after download some bitcoin headers, - /// to avoid some API limits. - #[arg(long, default_value = "10")] - pub(crate) bitcoin_headers_download_limit: u32, - /// Don't update all headers in one CKB transaction, /// to avoid size limit or cycles limit. #[arg(long, default_value = "10")] @@ -77,7 +74,7 @@ pub struct Args { impl Args { pub fn execute(&self) -> Result<()> { - log::info!("Starting the Bitcoin SPV service."); + log::info!("Starting the Bitcoin SPV service"); let storage = Storage::new(&self.data_dir)?; if !storage.is_initialized()? { @@ -98,25 +95,11 @@ impl Args { let _api_service = ApiServiceConfig::new(self.listen_address).start(spv_service.clone()); - let (mut stg_tip_height, _) = storage.tip_state()?; - log::info!("Tip height in local storage is {stg_tip_height}"); - let mut prev_tx_hash: Option = None; loop { - let btc_tip_height = btc_cli.get_tip_height()?; - log::info!("Tip height from Bitcoin endpoint is {btc_tip_height}"); - - let required_download = stg_tip_height < btc_tip_height; - if required_download { - let end_height_limit = stg_tip_height + self.bitcoin_headers_download_limit; - let end_height = if end_height_limit < btc_tip_height { - end_height_limit - } else { - btc_tip_height - }; - let headers = btc_cli.get_headers(stg_tip_height + 1, end_height)?; - (stg_tip_height, _) = storage.append_headers(headers)?; + if !spv_service.sync_storage()? { + continue; } if let Some(ref tx_hash) = prev_tx_hash { @@ -136,33 +119,49 @@ impl Args { } } - let (spv_info, spv_client_curr, spv_client_next) = - spv_service.find_spv_cells_for_update()?; - log::info!("Tip SPV client is {}", spv_client_curr.client.id); - let spv_tip_height = spv_client_curr.client.headers_mmr_root.max_height; - log::info!("Tip height in Bitcoin SPV instance is {spv_tip_height}"); + let (stg_tip_height, stg_tip_header) = spv_service.storage.tip_state()?; + let stg_tip_hash = stg_tip_header.block_hash(); + log::info!("[storage] header#{stg_tip_height:07}, {stg_tip_hash:#x}; tip"); + + match spv_service.select_operation()? { + SpvOperation::Update(input) => { + let spv_tip_height = input.curr.client.headers_mmr_root.max_height; + + match stg_tip_height.cmp(&spv_tip_height) { + Ordering::Less | Ordering::Equal => { + log::info!("No updates, sleep for a while"); + self.take_a_break(); + continue; + } + Ordering::Greater => {} + } + + log::info!("Try to update SPV instance"); + + let (spv_client, spv_update) = storage.generate_spv_client_and_spv_update( + spv_tip_height, + self.spv_headers_update_limit, + )?; - match stg_tip_height.cmp(&spv_tip_height) { - Ordering::Less | Ordering::Equal => { - self.take_a_break(); - continue; + let tx_hash = + self.update_spv_cells(&spv_service, input, spv_client, spv_update)?; + + prev_tx_hash = Some(tx_hash); } - Ordering::Greater => {} - } + SpvOperation::Reorg(input) => { + log::info!("Try to reorg SPV instance"); + + let spv_tip_height = input.curr.client.headers_mmr_root.max_height; - let (spv_client, spv_update) = storage.generate_spv_client_and_spv_update( - spv_tip_height, - self.spv_headers_update_limit, - )?; + let (spv_client, spv_update) = + storage.generate_spv_client_and_spv_update(spv_tip_height, u32::MAX)?; - let tx_hash = self.update_spv_cells( - &spv_service, - (spv_info, spv_client_curr, spv_client_next), - spv_client, - spv_update, - )?; + let tx_hash = + self.reorg_spv_cells(&spv_service, input, spv_client, spv_update)?; - prev_tx_hash = Some(tx_hash); + prev_tx_hash = Some(tx_hash); + } + } } // TODO Handle Ctrl-C and clean resources before exit. @@ -171,12 +170,10 @@ impl Args { pub(crate) fn update_spv_cells( &self, spv: &SpvService, - cells: (SpvInfoCell, SpvClientCell, SpvClientCell), + update_input: SpvUpdateInput, mut spv_client: SpvClient, spv_update: packed::SpvUpdate, ) -> Result { - let (spv_info_cell, spv_client_curr_cell, spv_client_next_cell) = cells; - let network_info = NetworkInfo::new(self.ckb.network, self.ckb.ckb_endpoint.as_str().to_owned()); let configuration = { @@ -192,17 +189,17 @@ impl Args { let address = CkbAddress::new(self.ckb.network, payload, true); (address, sk) })?; - log::debug!("The SPV cells will be updated by {deployer}."); + log::debug!("The SPV cells will be updated by {deployer}"); let iterator = InputIterator::new_with_address(&[deployer.clone()], &network_info); let mut tx_builder = TransactionBuilder::default(); let spv_inputs = { let spv_info_input = CellInput::new_builder() - .previous_output(spv_info_cell.cell.out_point.clone()) + .previous_output(update_input.info.cell.out_point.clone()) .build(); let spv_client_input = CellInput::new_builder() - .previous_output(spv_client_next_cell.cell.out_point.clone()) + .previous_output(update_input.next.cell.out_point.clone()) .build(); vec![spv_info_input, spv_client_input] }; @@ -213,18 +210,18 @@ impl Args { tx_builder.cell_dep(spv_contract_cell_dep); tx_builder.cell_dep(lock_contract_cell_dep); let spv_client_curr_cell_dep = CellDep::new_builder() - .out_point(spv_client_curr_cell.cell.out_point) + .out_point(update_input.curr.cell.out_point) .dep_type(DepType::Code.into()) .build(); tx_builder.cell_dep(spv_client_curr_cell_dep); let spv_outputs: Vec = vec![ - spv_info_cell.cell.output.clone(), - spv_client_next_cell.cell.output.clone(), + update_input.info.cell.output.clone(), + update_input.next.cell.output.clone(), ]; let spv_outputs_data = { - spv_client.id = spv_client_next_cell.client.id; - let mut spv_info = spv_info_cell.info; + spv_client.id = update_input.next.client.id; + let mut spv_info = update_input.info.info; spv_info.tip_client_id = spv_client.id; let packed_spv_info: packed::SpvInfo = spv_info.pack(); let packed_spv_client: packed::SpvClient = spv_client.pack(); @@ -263,11 +260,11 @@ impl Args { change_builder.init(&mut tx_builder); { let spv_info_input = TransactionInput { - live_cell: spv_info_cell.cell.clone(), + live_cell: update_input.info.cell.clone(), since: 0, }; let spv_client_input = TransactionInput { - live_cell: spv_client_next_cell.cell.clone(), + live_cell: update_input.next.cell.clone(), since: 0, }; let _ = change_builder.check_balance(spv_info_input, &mut tx_builder); @@ -279,7 +276,192 @@ impl Args { let mut check_result = None; for (mut input_index, input) in iterator.enumerate() { input_index += 2; // The first 2 inputs are SPV cells. - log::debug!("Try to find the {input_index}-th live cell for {deployer}."); + log::debug!("Try to find the {input_index}-th live cell for {deployer}"); + let input = input.map_err(|err| { + let msg = format!( + "failed to find {input_index}-th live cell for {deployer} since {err}" + ); + Error::other(msg) + })?; + tx_builder.input(input.cell_input()); + tx_builder.witness(PackedBytes::default()); + + let previous_output = input.previous_output(); + let lock_script = previous_output.lock(); + lock_groups + .entry(lock_script.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_lock_script(&lock_script)) + .input_indices + .push(input_index); + + if change_builder.check_balance(input, &mut tx_builder) { + let mut script_groups: Vec = lock_groups + .into_values() + .chain(type_groups.into_values()) + .collect(); + for script_group in script_groups.iter_mut() { + for handler in configuration.get_script_handlers() { + for context in &contexts.contexts { + if handler.build_transaction( + &mut tx_builder, + script_group, + context.as_ref(), + )? { + break; + } + } + } + } + let tx_view = change_builder.finalize(tx_builder); + + check_result = Some(TransactionWithScriptGroups::new(tx_view, script_groups)); + break; + } + } + check_result + } + .ok_or_else(|| { + let msg = format!("{deployer}'s live cells are not enough"); + Error::other(msg) + })?; + + TransactionSigner::new(&network_info).sign_transaction( + &mut tx_with_groups, + &SignContexts::new_sighash(vec![deployer_key]), + )?; + + let tx_json = TransactionView::from(tx_with_groups.get_tx_view().clone()); + let tx_hash = self + .ckb + .client() + .send_transaction_ext(tx_json, self.dry_run)?; + + Ok(tx_hash) + } + + pub(crate) fn reorg_spv_cells( + &self, + spv: &SpvService, + reorg_input: SpvReorgInput, + mut spv_client: SpvClient, + spv_update: packed::SpvUpdate, + ) -> Result { + let network_info = + NetworkInfo::new(self.ckb.network, self.ckb.ckb_endpoint.as_str().to_owned()); + let configuration = { + let mut tmp = TransactionBuilderConfiguration::new_with_network(network_info.clone())?; + tmp.fee_rate = self.ckb.fee_rate; + tmp + }; + + let (deployer, deployer_key) = SecretKey::from_slice(&self.common.private_key.as_ref()[..]) + .map(|sk| { + let pk = sk.public_key(&SECP256K1); + let payload = CkbAddressPayload::from_pubkey(&pk); + let address = CkbAddress::new(self.ckb.network, payload, true); + (address, sk) + })?; + log::debug!("The SPV cells will be updated by {deployer}"); + + let iterator = InputIterator::new_with_address(&[deployer.clone()], &network_info); + let mut tx_builder = TransactionBuilder::default(); + + let spv_inputs = { + let spv_info_input = CellInput::new_builder() + .previous_output(reorg_input.info.cell.out_point.clone()) + .build(); + let mut inputs = vec![spv_info_input]; + for client in &reorg_input.stale { + let spv_client_input = CellInput::new_builder() + .previous_output(client.cell.out_point.clone()) + .build(); + inputs.push(spv_client_input); + } + inputs + }; + tx_builder.inputs(spv_inputs); + + let spv_contract_cell_dep = spv.storage.spv_contract_cell_dep()?; + let lock_contract_cell_dep = spv.storage.lock_contract_cell_dep()?; + tx_builder.cell_dep(spv_contract_cell_dep); + tx_builder.cell_dep(lock_contract_cell_dep); + let spv_client_curr_cell_dep = CellDep::new_builder() + .out_point(reorg_input.curr.cell.out_point) + .dep_type(DepType::Code.into()) + .build(); + tx_builder.cell_dep(spv_client_curr_cell_dep); + + let spv_outputs = { + let mut outputs = vec![reorg_input.info.cell.output.clone()]; + for client in &reorg_input.stale { + outputs.push(client.cell.output.clone()); + } + outputs + }; + let spv_outputs_data = { + let mut spv_info = reorg_input.info.info.clone(); + spv_info.tip_client_id = reorg_input.info.next_tip_client_id(); + let packed_spv_info: packed::SpvInfo = spv_info.pack(); + let mut outputs_data = vec![packed_spv_info.as_bytes()]; + for client in &reorg_input.stale { + spv_client.id = client.client.id; + let packed_spv_client: packed::SpvClient = spv_client.pack(); + outputs_data.push(packed_spv_client.as_bytes()); + } + outputs_data + }; + tx_builder.outputs(spv_outputs); + tx_builder.outputs_data(spv_outputs_data.iter().map(Pack::pack)); + + #[allow(clippy::mutable_key_type)] + let mut lock_groups: HashMap = HashMap::default(); + #[allow(clippy::mutable_key_type)] + let mut type_groups: HashMap = HashMap::default(); + + for (output_idx, output) in tx_builder.get_outputs().clone().iter().enumerate() { + if let Some(type_script) = &output.type_().to_opt() { + type_groups + .entry(type_script.calc_script_hash()) + .or_insert_with(|| ScriptGroup::from_type_script(type_script)) + .output_indices + .push(output_idx); + } + } + + let witness = { + let type_args = BytesOpt::new_builder() + .set(Some(Pack::pack(spv_update.as_slice()))) + .build(); + let witness_args = WitnessArgs::new_builder().output_type(type_args).build(); + Pack::pack(&witness_args.as_bytes()) + }; + tx_builder.witness(witness); + tx_builder.witnesses(vec![PackedBytes::default(); reorg_input.stale.len()]); + + let mut change_builder = + DefaultChangeBuilder::new(&configuration, (&deployer).into(), Vec::new()); + change_builder.init(&mut tx_builder); + { + let spv_info_input = TransactionInput { + live_cell: reorg_input.info.cell.clone(), + since: 0, + }; + let _ = change_builder.check_balance(spv_info_input, &mut tx_builder); + for client in &reorg_input.stale { + let spv_client_input = TransactionInput { + live_cell: client.cell.clone(), + since: 0, + }; + let _ = change_builder.check_balance(spv_client_input, &mut tx_builder); + } + }; + let contexts = HandlerContexts::default(); + + let mut tx_with_groups = { + let mut check_result = None; + for (mut input_index, input) in iterator.enumerate() { + input_index += 1 + reorg_input.stale.len(); // info + stale clients + log::debug!("Try to find the {input_index}-th live cell for {deployer}"); let input = input.map_err(|err| { let msg = format!( "failed to find {input_index}-th live cell for {deployer} since {err}" @@ -324,7 +506,7 @@ impl Args { check_result } .ok_or_else(|| { - let msg = format!("{deployer}'s live cells are not enough."); + let msg = format!("{deployer}'s live cells are not enough"); Error::other(msg) })?; diff --git a/src/components/api_service/error.rs b/src/components/api_service/error.rs new file mode 100644 index 0000000..b67f4c9 --- /dev/null +++ b/src/components/api_service/error.rs @@ -0,0 +1,26 @@ +use std::fmt; + +use jsonrpc_core::{Error as RpcError, ErrorCode as RpcErrorCode}; + +#[repr(i64)] +pub enum ApiErrorCode { + // Bitcoin: 21xxx + // Storage: 23xxx + StorageTxTooNew = 23101, + StorageTxUnconfirmed, + StorageHeaderMissing = 23301, + StorageHeaderUnmatched, + // Onchain: 25xxx + OnchainTxUnconfirmed = 25101, + OnchainReorgRequired = 25901, +} + +impl ApiErrorCode { + pub fn with_desc(self, desc: D) -> RpcError { + RpcError { + code: RpcErrorCode::ServerError(self as i64), + message: desc.to_string(), + data: None, + } + } +} diff --git a/src/components/api_service.rs b/src/components/api_service/mod.rs similarity index 65% rename from src/components/api_service.rs rename to src/components/api_service/mod.rs index eed889e..fa067d8 100644 --- a/src/components/api_service.rs +++ b/src/components/api_service/mod.rs @@ -21,6 +21,10 @@ use crate::{ result::{Error, Result}, }; +mod error; + +pub use error::ApiErrorCode; + pub struct ApiServiceConfig { listen_address: SocketAddr, } @@ -88,9 +92,9 @@ impl SpvRpc for SpvRpcImpl { tokio::task::block_in_place(|| -> RpcResult<(u32, Hash, Vec)> { let (merkle_block, raw_tx_out_proof) = spv.btc_cli.get_tx_out_proof(txid).map_err(|err| { - let message = format!( - "failed to get tx out proof for {txid:#x} from remote since {err}" - ); + let message = + format!("failed to get tx out proof for {txid:#x} from remote"); + log::error!("{message} since {err}"); RpcError { code: RpcErrorCode::InternalError, message, @@ -100,7 +104,9 @@ impl SpvRpc for SpvRpcImpl { let block_hash = merkle_block.header.block_hash(); log::trace!(">>> the input tx in header {block_hash:#x}"); let block_height = spv.btc_cli.get_block_height(block_hash).map_err(|err| { - let message = format!("failed to get block height from remote since {err}"); + let message = + format!("failed to get block height for {block_hash:#x} from remote"); + log::error!("{message} since {err}"); RpcError { code: RpcErrorCode::InternalError, message, @@ -112,8 +118,8 @@ impl SpvRpc for SpvRpcImpl { })?; let (stg_tip_height, _) = spv.storage.tip_state().map_err(|err| { - let message = - format!("failed to read tip bitcoin height from local storage since {err}"); + let message = "failed to read tip bitcoin height from local storage".to_owned(); + log::error!("{message} since {err}"); RpcError { code: RpcErrorCode::InternalError, message, @@ -124,55 +130,42 @@ impl SpvRpc for SpvRpcImpl { // TODO Define server errors with enum. if stg_tip_height < target_height { - let message = format!( + let desc = format!( "target transaction is in header#{target_height}, \ - but the tip header in server is header#{stg_tip_height}" + but the tip header in local storage is header#{stg_tip_height}" ); - return Err(RpcError { - code: RpcErrorCode::ServerError(-1), - message, - data: None, - }); + return Err(ApiErrorCode::StorageTxTooNew.with_desc(desc)); } if stg_tip_height < target_height + confirmations { - let message = format!( + let desc = format!( "target transaction is in header#{target_height} \ and it requires {confirmations} confirmations, \ - but the tip header in server is header#{stg_tip_height}" + but the tip header in local storage is header#{stg_tip_height}" ); - return Err(RpcError { - code: RpcErrorCode::ServerError(-2), - message, - data: None, - }); + return Err(ApiErrorCode::StorageTxUnconfirmed.with_desc(desc)); } let stg_target_hash = spv .storage .bitcoin_header_hash(target_height) .map_err(|err| { - let message = format!("server doesn't have header#{target_height} since {err}"); - RpcError { - code: RpcErrorCode::ServerError(-3), - message, - data: None, - } + let desc = format!("local storage doesn't have header#{target_height}"); + log::error!("{desc} since {err}"); + ApiErrorCode::StorageHeaderMissing.with_desc(desc) })?; if target_hash != stg_target_hash { - let message = format!( + let desc = format!( "target transaction is in header#{target_height}, \ the header hash from remote is {target_hash:#x}, \ - its hash in server is {stg_target_hash:#x}" + its hash in local storage is {stg_target_hash:#x}" ); - return Err(RpcError { - code: RpcErrorCode::ServerError(-4), - message, - data: None, - }); + return Err(ApiErrorCode::StorageHeaderUnmatched.with_desc(desc)); } let spv_client_cell = tokio::task::block_in_place(|| -> RpcResult { spv.find_best_spv_client(stg_tip_height).map_err(|err| { - let message = format!("failed to get SPV cell from remote since {err}"); + let message = + format!("failed to get SPV cell base on height {stg_tip_height} from chain"); + log::error!("{message} since {err}"); RpcError { code: RpcErrorCode::InternalError, message, @@ -182,18 +175,39 @@ impl SpvRpc for SpvRpcImpl { })?; log::trace!(">>> the best SPV client is {}", spv_client_cell.client); - if spv_client_cell.client.headers_mmr_root.max_height < target_height + confirmations { - let message = format!( + let spv_header_root = &spv_client_cell.client.headers_mmr_root; + + let spv_best_height = spv_header_root.max_height; + if spv_best_height < target_height + confirmations { + let desc = format!( "target transaction is in header#{target_height} \ and it requires {confirmations} confirmations, \ - but the best SPV header is header#{}", - spv_client_cell.client.headers_mmr_root.max_height + but the best SPV header is header#{spv_best_height}", ); - return Err(RpcError { - code: RpcErrorCode::ServerError(-5), - message, - data: None, - }); + return Err(ApiErrorCode::OnchainTxUnconfirmed.with_desc(desc)); + } + + let packed_stg_header_root = + spv.storage + .generate_headers_root(spv_best_height) + .map_err(|err| { + let message = + format!("failed to generate headers MMR root for height {spv_best_height}"); + log::error!("{message} since {err}"); + RpcError { + code: RpcErrorCode::InternalError, + message, + data: None, + } + })?; + let packed_spv_header_root = spv_header_root.pack(); + + if packed_stg_header_root.as_slice() != packed_spv_header_root.as_slice() { + log::warn!("[onchain] header#{spv_best_height}; mmr-root {spv_header_root}"); + let stg_header_root = packed_stg_header_root.unpack(); + log::warn!("[storage] header#{spv_best_height}; mmr-root {stg_header_root}"); + let desc = "the SPV instance on chain is not unknown, reorg is required"; + return Err(ApiErrorCode::OnchainReorgRequired.with_desc(desc)); } let header_proof = spv @@ -203,7 +217,8 @@ impl SpvRpc for SpvRpcImpl { vec![target_height], ) .map_err(|err| { - let message = format!("failed to generate headers MMR proof since {err}"); + let message = "failed to generate headers MMR proof".to_owned(); + log::error!("{message} since {err}"); RpcError { code: RpcErrorCode::InternalError, message, diff --git a/src/components/bitcoin_client.rs b/src/components/bitcoin_client.rs index 0fb2da3..f7fe87b 100644 --- a/src/components/bitcoin_client.rs +++ b/src/components/bitcoin_client.rs @@ -150,6 +150,13 @@ impl BitcoinClient { .and_then(|hash| self.get_block_height(hash)) } + pub fn get_tip_state(&self) -> BtcRpcResult<(u32, Header)> { + self.get_best_block_hash().and_then(|hash| { + self.get_block_height(hash) + .and_then(|height| self.get_block_header(hash).map(|header| (height, header))) + }) + } + pub fn get_block_hash(&self, height: u32) -> BtcRpcResult { let params = serialize_parameters!(height); self.post("getblockhash", params) @@ -248,13 +255,32 @@ impl BitcoinClient { Ok(header) } - pub fn get_headers(&self, start: u32, end: u32) -> Result> { - log::trace!("Download headers from {start} to {end}"); + pub fn get_headers( + &self, + start: u32, + end: u32, + mut expected_prev_hash: BlockHash, + ) -> Result>> { + log::info!("Download headers from {start} to {end} base on {expected_prev_hash:#x}"); let mut headers = Vec::new(); for height in start..=end { let header = self.get_block_header_by_height(height)?; + let block_hash = header.block_hash(); + log::trace!( + "[download] header#{height:07}, {block_hash:#x}; tip; prev {}", + header.prev_blockhash + ); + if header.prev_blockhash != expected_prev_hash { + log::warn!( + "[download] reorg at {height} when download headers from {start} to {end}\ + expect {expected_prev_hash:#x} but got {:#x}", + header.prev_blockhash + ); + return Ok(None); + } + expected_prev_hash = block_hash; headers.push(header); } - Ok(headers) + Ok(Some(headers)) } } diff --git a/src/components/mod.rs b/src/components/mod.rs index 67ab1fb..ecc61e9 100644 --- a/src/components/mod.rs +++ b/src/components/mod.rs @@ -10,5 +10,5 @@ mod spv_service; pub use api_service::ApiServiceConfig; pub use bitcoin_client::BitcoinClient; pub use ckb_client::CkbRpcClientExtension; -pub use spv_service::{SpvClientCell, SpvInfoCell, SpvService}; +pub use spv_service::{SpvClientCell, SpvOperation, SpvReorgInput, SpvService, SpvUpdateInput}; pub use storage::{Error as StorageError, Storage}; diff --git a/src/components/spv_service.rs b/src/components/spv_service.rs index ea653b0..04c88cd 100644 --- a/src/components/spv_service.rs +++ b/src/components/spv_service.rs @@ -3,9 +3,9 @@ use std::collections::HashMap; use ckb_bitcoin_spv_verifier::types::{ - core::{SpvClient, SpvInfo}, + core::{Hash, SpvClient, SpvInfo}, packed, - prelude::Unpack as VUnpack, + prelude::{Pack as VPack, Unpack as VUnpack}, }; use ckb_sdk::{ rpc::{ @@ -42,6 +42,28 @@ pub struct SpvClientCell { pub(crate) cell: LiveCell, } +pub struct SpvInstance { + pub(crate) info: SpvInfoCell, + pub(crate) clients: HashMap, +} + +pub struct SpvUpdateInput { + pub(crate) info: SpvInfoCell, + pub(crate) curr: SpvClientCell, + pub(crate) next: SpvClientCell, +} + +pub struct SpvReorgInput { + pub(crate) info: SpvInfoCell, + pub(crate) curr: SpvClientCell, + pub(crate) stale: Vec, +} + +pub enum SpvOperation { + Update(SpvUpdateInput), + Reorg(SpvReorgInput), +} + impl SpvInfoCell { pub(crate) fn prev_tip_client_id(&self) -> u8 { let current = self.info.tip_client_id; @@ -63,54 +85,112 @@ impl SpvInfoCell { } impl SpvService { - pub(crate) fn find_spv_cells_for_update( - &self, - ) -> Result<(SpvInfoCell, SpvClientCell, SpvClientCell)> { - let (spv_info, spv_clients) = self.find_spv_cells()?; - let spv_client_curr = spv_clients - .get(&spv_info.info.tip_client_id) + pub(crate) fn find_best_spv_client(&self, height: u32) -> Result { + let SpvInstance { mut info, clients } = self.find_spv_cells()?; + for _ in 0..clients.len() { + let cell = clients.get(&info.info.tip_client_id).ok_or_else(|| { + let msg = format!( + "the SPV client (id={}) is not found", + info.info.tip_client_id + ); + Error::other(msg) + })?; + if cell.client.headers_mmr_root.max_height <= height { + return Ok(cell.to_owned()); + } + info.info.tip_client_id = info.prev_tip_client_id(); + } + let msg = format!("all SPV clients have better heights than server has (height: {height})"); + Err(Error::other(msg)) + } + + pub(crate) fn select_operation(&self) -> Result { + let ins = self.find_spv_cells()?; + let spv_client_curr = ins + .clients + .get(&ins.info.info.tip_client_id) .ok_or_else(|| { let msg = format!( "the current tip SPV client (id={}) is not found", - spv_info.info.tip_client_id + ins.info.info.tip_client_id ); Error::other(msg) })? .to_owned(); - let next_tip_client_id = spv_info.next_tip_client_id(); - let spv_client_next = spv_clients + log::trace!("[onchain] tip SPV client {}", spv_client_curr.client); + + let spv_header_root_curr = &spv_client_curr.client.headers_mmr_root; + let spv_height_curr = spv_header_root_curr.max_height; + let packed_stg_header_root_curr = self.storage.generate_headers_root(spv_height_curr)?; + let packed_spv_header_root_curr = spv_header_root_curr.pack(); + + if packed_stg_header_root_curr.as_slice() != packed_spv_header_root_curr.as_slice() { + log::warn!("[onchain] header#{spv_height_curr}; mmr-root {spv_header_root_curr}"); + let stg_header_root_curr = packed_stg_header_root_curr.unpack(); + log::warn!("[storage] header#{spv_height_curr}; mmr-root {stg_header_root_curr}"); + let input = self.prepare_reorg_input(ins)?; + return Ok(SpvOperation::Reorg(input)); + } + + let next_tip_client_id = ins.info.next_tip_client_id(); + let spv_client_next = ins + .clients .get(&next_tip_client_id) .ok_or_else(|| { - let msg = - format!("the next tip SPV client (id={next_tip_client_id}) is not found",); + let msg = format!("the next tip SPV client (id={next_tip_client_id}) is not found"); Error::other(msg) })? .to_owned(); - Ok((spv_info, spv_client_curr, spv_client_next)) + log::trace!( + "[onchain] old SPV client {} (will be next)", + spv_client_next.client + ); + let input = SpvUpdateInput { + info: ins.info, + curr: spv_client_curr, + next: spv_client_next, + }; + Ok(SpvOperation::Update(input)) } - pub(crate) fn find_best_spv_client(&self, height: u32) -> Result { - let (mut spv_info, spv_clients) = self.find_spv_cells()?; - for _ in 0..spv_clients.len() { - let spv_client = spv_clients - .get(&spv_info.info.tip_client_id) - .ok_or_else(|| { - let msg = format!( - "the current tip SPV client (id={}) is not found", - spv_info.info.tip_client_id - ); - Error::other(msg) - })?; - if spv_client.client.headers_mmr_root.max_height <= height { - return Ok(spv_client.to_owned()); + pub(crate) fn prepare_reorg_input(&self, ins: SpvInstance) -> Result { + let SpvInstance { mut info, clients } = ins; + let mut stale = Vec::new(); + for _ in 0..clients.len() { + let cell = clients.get(&info.info.tip_client_id).ok_or_else(|| { + let msg = format!( + "the SPV client (id={}) is not found", + info.info.tip_client_id + ); + Error::other(msg) + })?; + + let spv_header_root = &cell.client.headers_mmr_root; + let spv_height = spv_header_root.max_height; + let packed_stg_header_root = self.storage.generate_headers_root(spv_height)?; + let packed_spv_header_root = spv_header_root.pack(); + + if packed_stg_header_root.as_slice() == packed_spv_header_root.as_slice() { + let input = SpvReorgInput { + info, + curr: cell.clone(), + stale, + }; + return Ok(input); } - spv_info.info.tip_client_id = spv_info.prev_tip_client_id(); + + log::trace!("[onchain] header#{spv_height}; mmr-root {spv_header_root}"); + let stg_header_root = packed_stg_header_root.unpack(); + log::trace!("[storage] header#{spv_height}; mmr-root {stg_header_root}"); + + stale.push(cell.clone()); + info.info.tip_client_id = info.prev_tip_client_id(); } - let msg = format!("all SPV clients have better heights than server has (height: {height})"); + let msg = "failed to reorg since no common parent between SPV instance and storage"; Err(Error::other(msg)) } - pub(crate) fn find_spv_cells(&self) -> Result<(SpvInfoCell, HashMap)> { + pub(crate) fn find_spv_cells(&self) -> Result { let cells = self.find_raw_spv_cells()?; parse_raw_spv_cells(cells) } @@ -146,9 +226,83 @@ impl SpvService { } }) } + + pub(crate) fn sync_storage(&self) -> Result { + let spv = &self; + let (stg_tip_height, stg_tip_header) = spv.storage.tip_state()?; + let stg_tip_hash = stg_tip_header.block_hash(); + log::info!("[storage] header#{stg_tip_height:07}, {stg_tip_hash:#x}; tip"); + + let (btc_tip_height, btc_tip_header) = spv.btc_cli.get_tip_state()?; + log::info!( + "[bitcoin] header#{btc_tip_height:07}, {:#x}; tip; prev {:#x}", + btc_tip_header.block_hash(), + btc_tip_header.prev_blockhash + ); + + if stg_tip_height >= btc_tip_height { + return Ok(true); + } + + let btc_header = spv.btc_cli.get_block_header_by_height(stg_tip_height)?; + let btc_hash = btc_header.block_hash(); + if stg_tip_hash == btc_hash { + let headers = if let Some(headers) = + spv.btc_cli + .get_headers(stg_tip_height + 1, btc_tip_height, stg_tip_hash)? + { + headers + } else { + return Ok(false); + }; + let _ = spv.storage.append_headers(headers)?; + return Ok(true); + } + + log::info!("Try to find the height when fork happened."); + let (stg_base_height, _) = spv.storage.base_state()?; + let mut fork_point = None; + + for height in (stg_base_height..stg_tip_height).rev() { + let stg_hash = spv.storage.bitcoin_header_hash(height)?; + log::debug!("[storage] header#{height:07}, {stg_hash:#x}"); + let btc_header = spv.btc_cli.get_block_header_by_height(height)?; + let btc_hash: Hash = btc_header.block_hash().into(); + log::debug!("[bitcoin] header#{height:07}, {btc_hash:#x}"); + + if stg_hash == btc_hash { + log::info!("Fork happened at height {height}."); + fork_point = Some((height, btc_hash)); + } + } + + if fork_point.is_none() { + let msg = format!( + "reorg failed since the fork point is ahead than \ + local start height {stg_base_height}" + ); + return Err(Error::other(msg)); + } + let (fork_height, fork_hash) = fork_point.unwrap(); + + log::warn!("The chain in storage rollback to header#{fork_height:07}, {fork_hash:#x}"); + spv.storage.rollback_to(Some(fork_height))?; + + let headers = if let Some(headers) = + spv.btc_cli + .get_headers(fork_height + 1, btc_tip_height, fork_hash.into())? + { + headers + } else { + return Ok(false); + }; + let _ = spv.storage.append_headers(headers)?; + + Ok(true) + } } -fn parse_raw_spv_cells(cells: Vec) -> Result<(SpvInfoCell, HashMap)> { +fn parse_raw_spv_cells(cells: Vec) -> Result { let mut spv_info_opt = None; let mut spv_clients = HashMap::new(); let clients_count = (cells.len() - 1) as u8; // Checked when fetch SPV cells. @@ -177,7 +331,11 @@ fn parse_raw_spv_cells(cells: Vec) -> Result<(SpvInfoCell, HashMap Result { + let (_, mmr) = self.chain_root_mmr(tip_height)?; + let mmr_root = mmr.get_root()?; + Ok(mmr_root) + } fn generate_headers_proof(&self, tip_height: u32, heights: Vec) -> Result { let (base_height, mmr) = self.chain_root_mmr(tip_height)?; let positions = heights @@ -259,6 +264,15 @@ pub(crate) trait BitcoinSpvStorage: InternalBitcoinSpvStorage { Ok(()) } + fn base_state(&self) -> Result<(u32, Header)> { + self.get_base_bitcoin_height() + .and_then(|opt| opt.ok_or_else(|| Error::not_found("base bitcoin height"))) + .and_then(|height| { + self.get_bitcoin_header(height) + .map(|header| (height, header)) + }) + } + fn tip_state(&self) -> Result<(u32, Header)> { self.get_tip_bitcoin_height().and_then(|height| { self.get_bitcoin_header(height)