Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Reorg handling #27

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/bitcoind/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,39 @@ impl BitcoinD {
self.make_node_request_failible("sendrawtransaction", &params!(tx_hex))
.map(|_| ())
}

/// Get some stats (previousblockhash, confirmations, height) about this block
pub fn get_block_stats(&self, blockhash: BlockHash) -> Result<BlockStats, BitcoindError> {
let res = self.make_node_request(
"getblockheader",
&params!(Json::String(blockhash.to_string()),),
);
let confirmations = res
.get("confirmations")
.map(|a| a.as_i64())
.flatten()
.expect("Invalid confirmations in `getblockheader` response: not an i64")
as i32;
let previous_block_str = res
.get("previousblockhash")
.map(|a| a.as_str())
.flatten()
.expect("Invalid previousblockhash in `getblockheader` response: not a string");
let previous_blockhash = BlockHash::from_str(previous_block_str)
.expect("Invalid previousblockhash hex in `getblockheader` response");
let height = res
.get("height")
.map(|a| a.as_i64())
.flatten()
.expect("Invalid height in `getblockheader` response: not an u32")
as i32;
Ok(BlockStats {
confirmations,
previous_blockhash,
height,
blockhash,
})
}
}

/// Info about bitcoind's sync state
Expand All @@ -436,3 +469,11 @@ pub struct UtxoInfo {
pub bestblock: BlockHash,
pub value: Amount,
}

#[derive(Debug, Clone, PartialEq)]
pub struct BlockStats {
pub confirmations: i32,
pub previous_blockhash: BlockHash,
pub blockhash: BlockHash,
pub height: i32,
}
19 changes: 19 additions & 0 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,25 @@ pub fn db_cancel_signatures(
db_sigs_by_type(db_path, vault_id, SigTxType::Cancel)
}

/// Set the unvault_height and spent_height to NULL, if the unvault and spent
/// txs weren't confirmed at the ancestor_height
pub fn db_update_heights_after_reorg(
db_path: &path::Path,
ancestor_height: i32,
) -> Result<(), DatabaseError> {
db_exec(&db_path, |db_tx| {
db_tx.execute(
"UPDATE vaults SET unvault_height = NULL WHERE unvault_height > (?1)",
params![ancestor_height],
)?;
db_tx.execute(
"UPDATE vaults SET spent_height = NULL WHERE spent_height > (?1)",
params![ancestor_height],
)?;
Ok(())
})
}

// Create the db file with RW permissions only for the user
fn create_db_file(db_path: &path::Path) -> Result<(), DatabaseError> {
let mut options = fs::OpenOptions::new();
Expand Down
68 changes: 47 additions & 21 deletions src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
os::unix::fs::MetadataExt,
path,
process::{Command, Stdio},
collections::HashMap,
};

use serde::{Deserialize, Serialize, Serializer};
Expand Down Expand Up @@ -111,26 +112,14 @@ impl Plugin {
self.path.to_string_lossy()
}

/// Takes updates about our vaults' status and returns which should be Canceled, if any.
///
/// This will start a plugin process and write a JSON request to its stding containing:
/// - the block height of the last block
/// - the info of vaults that were unvaulted
/// - the deposit outpoint of the vaults that were succesfully spent
/// - the deposit outpoint of the unvaulted vaults that were revaulted
/// It will then read a JSON response from its stdout containing a list of deposit outpoints
/// of vaults that should be canceled, and expect the plugin process to terminate.
pub fn poll(
&self,
block_height: i32,
block_info: &NewBlockInfo,
) -> Result<Vec<OutPoint>, PluginError> {
let query = serde_json::json!({
"method": "new_block",
"config": self.config,
"block_height": block_height,
"block_info": block_info,
});
/// This will start a plugin process and write a JSON request to its stding containing
/// the query.
/// It will then read a JSON response from its stdout containing <T>,
/// and expect the plugin process to terminate.
fn send_message<T>(&self, query: &serde_json::Value) -> Result<T, PluginError>
where
T: serde::de::DeserializeOwned,
{
let mut query_ser =
serde_json::to_vec(&query).expect("No string in map and Serialize won't fail");
query_ser.push(b'\n');
Expand Down Expand Up @@ -171,11 +160,48 @@ impl Plugin {
self.path.as_path(),
String::from_utf8_lossy(&output.stdout)
);
let resp: NewBlockResponse = serde_json::from_slice(&output.stdout)
let resp: T = serde_json::from_slice(&output.stdout)
.map_err(|e| PluginError::Deserialization(self.path.clone(), e.to_string()))?;

Ok(resp)
}

/// Takes updates about our vaults' status and returns which should be Canceled, if any.
///
/// This will send to the plugin:
/// - the block height of the last block
/// - the info of vaults that were unvaulted
/// - the deposit outpoint of the vaults that were succesfully spent
/// - the deposit outpoint of the unvaulted vaults that were revaulted
/// and will receive a list of deposit outpoints of vaults that should be
/// canceled.
pub fn poll(
&self,
block_height: i32,
block_info: &NewBlockInfo,
) -> Result<Vec<OutPoint>, PluginError> {
let query = serde_json::json!({
"method": "new_block",
"config": self.config,
"block_height": block_height,
"block_info": block_info,
});
let resp: NewBlockResponse = self.send_message(&query)?;

Ok(resp.revault)
}

/// Informs the plugin that it should wipe every information it has
/// regarding the block at height `block_height` and the following ones
pub fn invalidate_block(&self, block_height: i32) -> Result<HashMap<(), ()>, PluginError> {
let query = serde_json::json!({
"method": "invalidate_block",
"config": self.config,
"block_height": block_height,
});

self.send_message(&query)
}
}

#[cfg(test)]
Expand Down
56 changes: 53 additions & 3 deletions src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
database::{
db_blank_vaults, db_cancel_signatures, db_del_vault, db_instance, db_should_cancel_vault,
db_should_not_cancel_vault, db_unvault_spender_confirmed, db_unvaulted_vaults,
db_update_tip, db_vault, schema::DbVault, DatabaseError,
db_update_heights_after_reorg, db_update_tip, db_vault, schema::DbVault, DatabaseError,
},
plugins::{NewBlockInfo, VaultInfo},
};
Expand Down Expand Up @@ -536,6 +536,48 @@ fn new_block(
Ok(())
}

fn handle_reorg(
db_path: &path::Path,
config: &Config,
bitcoind: &BitcoinD,
old_tip: &ChainTip,
) -> Result<(), PollerError> {
// Finding the common ancestor
let mut stats = bitcoind.get_block_stats(old_tip.hash)?;
let mut ancestor = ChainTip {
hash: old_tip.hash,
height: old_tip.height,
};

while stats.confirmations == -1 {
stats = bitcoind.get_block_stats(stats.previous_blockhash)?;
ancestor = ChainTip {
hash: stats.blockhash,
height: stats.height,
};
}

log::debug!(
"Unwinding the state until the common ancestor: {:?} {:?}",
ancestor.hash,
ancestor.height,
);

db_update_heights_after_reorg(db_path, ancestor.height)?;

for plugin in &config.plugins {
plugin
.invalidate_block(ancestor.height + 1)
.map(|_| ())
.unwrap_or_else(|e|
// FIXME: should we crash instead?
log::error!("Error when invalidating block in plugin: '{}'", e));
}
db_update_tip(db_path, ancestor.height, ancestor.hash)?;

Ok(())
}

pub fn main_loop(
db_path: &path::Path,
secp: &secp256k1::Secp256k1<impl secp256k1::Verification>,
Expand All @@ -549,7 +591,11 @@ pub fn main_loop(
if bitcoind_tip.height > db_instance.tip_blockheight {
let curr_tip_hash = bitcoind.block_hash(db_instance.tip_blockheight);
if db_instance.tip_blockheight != 0 && curr_tip_hash != db_instance.tip_blockhash {
panic!("No reorg handling yet");
let tip = ChainTip {
hash: db_instance.tip_blockhash,
height: db_instance.tip_blockheight,
};
handle_reorg(db_path, config, bitcoind, &tip)?;
}

match new_block(db_path, secp, config, bitcoind, &bitcoind_tip) {
Expand All @@ -559,7 +605,11 @@ pub fn main_loop(
Err(e) => return Err(e),
}
} else if bitcoind_tip.hash != db_instance.tip_blockhash {
panic!("No reorg handling yet");
let tip = ChainTip {
hash: db_instance.tip_blockhash,
height: db_instance.tip_blockheight,
};
handle_reorg(db_path, config, bitcoind, &tip)?;
}

thread::sleep(config.bitcoind_config.poll_interval_secs);
Expand Down
47 changes: 28 additions & 19 deletions tests/plugins/max_value_in_flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,37 @@ def recorded_attempts(config):
req = read_request()
config = req["config"]
assert "data_dir" in config and "max_value" in config
block_info = req["block_info"]
maybe_create_data_dir(config)
assert DATASTORE_FNAME in os.listdir(config["data_dir"])

# First update the recorded attempts with the new and pass attempts.
in_flight = recorded_attempts(config)
for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]:
del in_flight[op]
for v in block_info["new_attempts"]:
in_flight[v["deposit_outpoint"]] = v["value"]
update_in_flight(config, in_flight)

# Did we get above the threshold? Note we might stay a bit above the threshold
# for the time that the vaults we told the WT to revault previously actually get
# their Cancel transaction confirmed.
resp = {"revault": []}
value_in_flight = sum([in_flight[k] for k in in_flight])
while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0:
v = block_info["new_attempts"].pop(0)
resp["revault"].append(v["deposit_outpoint"])
value_in_flight -= v["value"]
continue
resp = {}
if req["method"] == "new_block":
block_info = req["block_info"]
# First update the recorded attempts with the new and pass attempts.
in_flight = recorded_attempts(config)
for op in block_info["successful_attempts"] + block_info["revaulted_attempts"]:
del in_flight[op]
for v in block_info["new_attempts"]:
in_flight[v["deposit_outpoint"]] = v["value"]
update_in_flight(config, in_flight)

# Did we get above the threshold? Note we might stay a bit above the threshold
# for the time that the vaults we told the WT to revault previously actually get
# their Cancel transaction confirmed.
resp = {"revault": []}
value_in_flight = sum([in_flight[k] for k in in_flight])
while value_in_flight > config["max_value"] and len(block_info["new_attempts"]) > 0:
v = block_info["new_attempts"].pop(0)
resp["revault"].append(v["deposit_outpoint"])
value_in_flight -= v["value"]
continue
elif req["method"] == "invalidate_block":
# We don't really care
pass
else:
# TODO: maybe we should reply saying that we don't know what
# they're talking about?
pass

sys.stdout.write(json.dumps(resp))
sys.stdout.flush()
13 changes: 11 additions & 2 deletions tests/plugins/revault_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ def read_request():

if __name__ == "__main__":
req = read_request()
block_info = req["block_info"]
resp = {"revault": [v["deposit_outpoint"] for v in block_info["new_attempts"]]}
resp = {}
if req["method"] == "new_block":
block_info = req["block_info"]
resp = {"revault": [v["deposit_outpoint"] for v in block_info["new_attempts"]]}
elif req["method"] == "invalidate_block":
# We don't really care
pass
else:
# TODO: maybe we should reply saying that we don't know what
# they're talking about?
pass
sys.stdout.write(json.dumps(resp))
sys.stdout.flush()
11 changes: 10 additions & 1 deletion tests/plugins/revault_nothing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ def read_request():

if __name__ == "__main__":
req = read_request()
resp = {"revault": []}
resp = {}
if req["method"] == "new_block":
resp = {"revault": []}
elif req["method"] == "invalidate_block":
# We don't really care
pass
else:
# TODO: maybe we should reply saying that we don't know what
# they're talking about?
pass
sys.stdout.write(json.dumps(resp))
sys.stdout.flush()