diff --git a/src/bitcoind/interface.rs b/src/bitcoind/interface.rs index 09dd8e8..edbb952 100644 --- a/src/bitcoind/interface.rs +++ b/src/bitcoind/interface.rs @@ -413,6 +413,39 @@ impl BitcoinD { self.make_node_request_failible("sendrawtransaction", ¶ms!(tx_hex)) .map(|_| ()) } + + /// Get some stats (previousblockhash, confirmations, height) about this block + pub fn get_block_stats(&self, blockhash: BlockHash) -> Result { + let res = self.make_node_request( + "getblockheader", + ¶ms!(Json::String(blockhash.to_string()),), + ); + let confirmations = res + .get("confirmations") + .map(|a| a.as_i64()) + .flatten() + .expect("Invalid confirmations in `getblockheader` response: not an i64") + as i32; + let previous_block_str = res + .get("previousblockhash") + .map(|a| a.as_str()) + .flatten() + .expect("Invalid previousblockhash in `getblockheader` response: not a string"); + let previous_blockhash = BlockHash::from_str(previous_block_str) + .expect("Invalid previousblockhash hex in `getblockheader` response"); + let height = res + .get("height") + .map(|a| a.as_i64()) + .flatten() + .expect("Invalid height in `getblockheader` response: not an u32") + as i32; + Ok(BlockStats { + confirmations, + previous_blockhash, + height, + blockhash, + }) + } } /// Info about bitcoind's sync state @@ -436,3 +469,11 @@ pub struct UtxoInfo { pub bestblock: BlockHash, pub value: Amount, } + +#[derive(Debug, Clone, PartialEq)] +pub struct BlockStats { + pub confirmations: i32, + pub previous_blockhash: BlockHash, + pub blockhash: BlockHash, + pub height: i32, +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 6db615e..f28cffe 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -342,6 +342,25 @@ pub fn db_cancel_signatures( db_sigs_by_type(db_path, vault_id, SigTxType::Cancel) } +/// Set the unvault_height and spent_height to NULL, if the unvault and spent +/// txs weren't confirmed at the ancestor_height +pub fn db_update_heights_after_reorg( + db_path: &path::Path, + ancestor_height: i32, +) -> Result<(), DatabaseError> { + db_exec(&db_path, |db_tx| { + db_tx.execute( + "UPDATE vaults SET unvault_height = NULL WHERE unvault_height > (?1)", + params![ancestor_height], + )?; + db_tx.execute( + "UPDATE vaults SET spent_height = NULL WHERE spent_height > (?1)", + params![ancestor_height], + )?; + Ok(()) + }) +} + // Create the db file with RW permissions only for the user fn create_db_file(db_path: &path::Path) -> Result<(), DatabaseError> { let mut options = fs::OpenOptions::new(); diff --git a/src/plugins.rs b/src/plugins.rs index 98a70f2..61e01af 100644 --- a/src/plugins.rs +++ b/src/plugins.rs @@ -10,6 +10,7 @@ use std::{ os::unix::fs::MetadataExt, path, process::{Command, Stdio}, + collections::HashMap, }; use serde::{Deserialize, Serialize, Serializer}; @@ -111,26 +112,14 @@ impl Plugin { self.path.to_string_lossy() } - /// Takes updates about our vaults' status and returns which should be Canceled, if any. - /// - /// This will start a plugin process and write a JSON request to its stding containing: - /// - the block height of the last block - /// - the info of vaults that were unvaulted - /// - the deposit outpoint of the vaults that were succesfully spent - /// - the deposit outpoint of the unvaulted vaults that were revaulted - /// It will then read a JSON response from its stdout containing a list of deposit outpoints - /// of vaults that should be canceled, and expect the plugin process to terminate. - pub fn poll( - &self, - block_height: i32, - block_info: &NewBlockInfo, - ) -> Result, PluginError> { - let query = serde_json::json!({ - "method": "new_block", - "config": self.config, - "block_height": block_height, - "block_info": block_info, - }); + /// This will start a plugin process and write a JSON request to its stding containing + /// the query. + /// It will then read a JSON response from its stdout containing , + /// and expect the plugin process to terminate. + fn send_message(&self, query: &serde_json::Value) -> Result + where + T: serde::de::DeserializeOwned, + { let mut query_ser = serde_json::to_vec(&query).expect("No string in map and Serialize won't fail"); query_ser.push(b'\n'); @@ -171,11 +160,48 @@ impl Plugin { self.path.as_path(), String::from_utf8_lossy(&output.stdout) ); - let resp: NewBlockResponse = serde_json::from_slice(&output.stdout) + let resp: T = serde_json::from_slice(&output.stdout) .map_err(|e| PluginError::Deserialization(self.path.clone(), e.to_string()))?; + Ok(resp) + } + + /// Takes updates about our vaults' status and returns which should be Canceled, if any. + /// + /// This will send to the plugin: + /// - the block height of the last block + /// - the info of vaults that were unvaulted + /// - the deposit outpoint of the vaults that were succesfully spent + /// - the deposit outpoint of the unvaulted vaults that were revaulted + /// and will receive a list of deposit outpoints of vaults that should be + /// canceled. + pub fn poll( + &self, + block_height: i32, + block_info: &NewBlockInfo, + ) -> Result, PluginError> { + let query = serde_json::json!({ + "method": "new_block", + "config": self.config, + "block_height": block_height, + "block_info": block_info, + }); + let resp: NewBlockResponse = self.send_message(&query)?; + Ok(resp.revault) } + + /// Informs the plugin that it should wipe every information it has + /// regarding the block at height `block_height` and the following ones + pub fn invalidate_block(&self, block_height: i32) -> Result, PluginError> { + let query = serde_json::json!({ + "method": "invalidate_block", + "config": self.config, + "block_height": block_height, + }); + + self.send_message(&query) + } } #[cfg(test)] diff --git a/src/poller.rs b/src/poller.rs index 62d757a..1a6cdc3 100644 --- a/src/poller.rs +++ b/src/poller.rs @@ -7,7 +7,7 @@ use crate::{ database::{ db_blank_vaults, db_cancel_signatures, db_del_vault, db_instance, db_should_cancel_vault, db_should_not_cancel_vault, db_unvault_spender_confirmed, db_unvaulted_vaults, - db_update_tip, db_vault, schema::DbVault, DatabaseError, + db_update_heights_after_reorg, db_update_tip, db_vault, schema::DbVault, DatabaseError, }, plugins::{NewBlockInfo, VaultInfo}, }; @@ -536,6 +536,48 @@ fn new_block( Ok(()) } +fn handle_reorg( + db_path: &path::Path, + config: &Config, + bitcoind: &BitcoinD, + old_tip: &ChainTip, +) -> Result<(), PollerError> { + // Finding the common ancestor + let mut stats = bitcoind.get_block_stats(old_tip.hash)?; + let mut ancestor = ChainTip { + hash: old_tip.hash, + height: old_tip.height, + }; + + while stats.confirmations == -1 { + stats = bitcoind.get_block_stats(stats.previous_blockhash)?; + ancestor = ChainTip { + hash: stats.blockhash, + height: stats.height, + }; + } + + log::debug!( + "Unwinding the state until the common ancestor: {:?} {:?}", + ancestor.hash, + ancestor.height, + ); + + db_update_heights_after_reorg(db_path, ancestor.height)?; + + for plugin in &config.plugins { + plugin + .invalidate_block(ancestor.height + 1) + .map(|_| ()) + .unwrap_or_else(|e| + // FIXME: should we crash instead? + log::error!("Error when invalidating block in plugin: '{}'", e)); + } + db_update_tip(db_path, ancestor.height, ancestor.hash)?; + + Ok(()) +} + pub fn main_loop( db_path: &path::Path, secp: &secp256k1::Secp256k1, @@ -549,7 +591,11 @@ pub fn main_loop( if bitcoind_tip.height > db_instance.tip_blockheight { let curr_tip_hash = bitcoind.block_hash(db_instance.tip_blockheight); if db_instance.tip_blockheight != 0 && curr_tip_hash != db_instance.tip_blockhash { - panic!("No reorg handling yet"); + let tip = ChainTip { + hash: db_instance.tip_blockhash, + height: db_instance.tip_blockheight, + }; + handle_reorg(db_path, config, bitcoind, &tip)?; } match new_block(db_path, secp, config, bitcoind, &bitcoind_tip) { @@ -559,7 +605,11 @@ pub fn main_loop( Err(e) => return Err(e), } } else if bitcoind_tip.hash != db_instance.tip_blockhash { - panic!("No reorg handling yet"); + let tip = ChainTip { + hash: db_instance.tip_blockhash, + height: db_instance.tip_blockheight, + }; + handle_reorg(db_path, config, bitcoind, &tip)?; } thread::sleep(config.bitcoind_config.poll_interval_secs); diff --git a/tests/plugins/max_value_in_flight.py b/tests/plugins/max_value_in_flight.py index 2e14398..2d5ab09 100755 --- a/tests/plugins/max_value_in_flight.py +++ b/tests/plugins/max_value_in_flight.py @@ -51,28 +51,37 @@ def recorded_attempts(config): req = read_request() config = req["config"] assert "data_dir" in config and "max_value" in config - block_info = req["block_info"] maybe_create_data_dir(config) assert DATASTORE_FNAME in os.listdir(config["data_dir"]) - # First update the recorded attempts with the new and pass attempts. - in_flight = recorded_attempts(config) - for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]: - del in_flight[op] - for v in block_info["new_attempts"]: - in_flight[v["deposit_outpoint"]] = v["value"] - update_in_flight(config, in_flight) - - # Did we get above the threshold? Note we might stay a bit above the threshold - # for the time that the vaults we told the WT to revault previously actually get - # their Cancel transaction confirmed. - resp = {"revault": []} - value_in_flight = sum([in_flight[k] for k in in_flight]) - while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0: - v = block_info["new_attempts"].pop(0) - resp["revault"].append(v["deposit_outpoint"]) - value_in_flight -= v["value"] - continue + resp = {} + if req["method"] == "new_block": + block_info = req["block_info"] + # First update the recorded attempts with the new and pass attempts. + in_flight = recorded_attempts(config) + for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]: + del in_flight[op] + for v in block_info["new_attempts"]: + in_flight[v["deposit_outpoint"]] = v["value"] + update_in_flight(config, in_flight) + + # Did we get above the threshold? Note we might stay a bit above the threshold + # for the time that the vaults we told the WT to revault previously actually get + # their Cancel transaction confirmed. + resp = {"revault": []} + value_in_flight = sum([in_flight[k] for k in in_flight]) + while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0: + v = block_info["new_attempts"].pop(0) + resp["revault"].append(v["deposit_outpoint"]) + value_in_flight -= v["value"] + continue + elif req["method"] == "invalidate_block": + # We don't really care + pass + else: + # TODO: maybe we should reply saying that we don't know what + # they're talking about? + pass sys.stdout.write(json.dumps(resp)) sys.stdout.flush() diff --git a/tests/plugins/revault_all.py b/tests/plugins/revault_all.py index e0c9d00..dd692a1 100755 --- a/tests/plugins/revault_all.py +++ b/tests/plugins/revault_all.py @@ -15,7 +15,16 @@ def read_request(): if __name__ == "__main__": req = read_request() - block_info = req["block_info"] - resp = {"revault": [v["deposit_outpoint"] for v in block_info["new_attempts"]]} + resp = {} + if req["method"] == "new_block": + block_info = req["block_info"] + resp = {"revault": [v["deposit_outpoint"] for v in block_info["new_attempts"]]} + elif req["method"] == "invalidate_block": + # We don't really care + pass + else: + # TODO: maybe we should reply saying that we don't know what + # they're talking about? + pass sys.stdout.write(json.dumps(resp)) sys.stdout.flush() diff --git a/tests/plugins/revault_nothing.py b/tests/plugins/revault_nothing.py index 4ff66a3..64b9ee3 100755 --- a/tests/plugins/revault_nothing.py +++ b/tests/plugins/revault_nothing.py @@ -15,6 +15,15 @@ def read_request(): if __name__ == "__main__": req = read_request() - resp = {"revault": []} + resp = {} + if req["method"] == "new_block": + resp = {"revault": []} + elif req["method"] == "invalidate_block": + # We don't really care + pass + else: + # TODO: maybe we should reply saying that we don't know what + # they're talking about? + pass sys.stdout.write(json.dumps(resp)) sys.stdout.flush()