diff --git a/Cargo.lock b/Cargo.lock index 943975229..f827919dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -375,7 +375,7 @@ dependencies = [ "env_logger", "ff", "halo2_proofs", - "halo2aggregator-s", + "halo2aggregator-s 1.1.0 (git+https://github.com/DelphinusLab/halo2aggregator-s.git?tag=on-prove-pairing-0.6.4)", "hex", "lazy_static", "log", @@ -412,7 +412,7 @@ dependencies = [ "env_logger", "ff", "halo2_proofs", - "halo2aggregator-s", + "halo2aggregator-s 1.1.0 (git+https://github.com/DelphinusLab/halo2aggregator-s.git?tag=on-prove-pairing-0.6.4)", "hex", "lazy_static", "log", @@ -1166,6 +1166,30 @@ dependencies = [ "subtle", ] +[[package]] +name = "halo2aggregator-s" +version = "1.1.0" +source = "git+https://github.com/DelphinusLab/halo2aggregator-s.git?tag=on-prove-pairing-0.6.2#f699aac4e726b7c27936741846975a5c199fce5a" +dependencies = [ + "ark-ff", + "ark-std", + "blake2b_simd", + "ff", + "halo2_proofs", + "halo2ecc-s 0.3.2 (git+https://github.com/DelphinusLab/halo2ecc-s.git?tag=on-prove-pairing-0.6.2)", + "lazy_static", + "num-bigint", + "num-integer", + "num-traits", + "poseidon", + "rand_core", + "serde", + "serde_json", + "sha2", + "sha3", + "tera", +] + [[package]] name = "halo2aggregator-s" version = "1.1.0" @@ -1176,7 +1200,7 @@ dependencies = [ "blake2b_simd", "ff", "halo2_proofs", - "halo2ecc-s", + "halo2ecc-s 0.3.2 (git+https://github.com/DelphinusLab/halo2ecc-s.git?tag=on-prove-pairing-0.6.4)", "lazy_static", "num-bigint", "num-integer", @@ -1190,6 +1214,19 @@ dependencies = [ "tera", ] +[[package]] +name = "halo2ecc-s" +version = "0.3.2" +source = "git+https://github.com/DelphinusLab/halo2ecc-s.git?tag=on-prove-pairing-0.6.2#15b3baefac2f950e448ff1dca7c8562bf10ba8a0" +dependencies = [ + "ark-std", + "halo2_proofs", + "num-bigint", + "num-integer", + "num-traits", + "rayon", +] + [[package]] name = "halo2ecc-s" version = "0.3.2" @@ -3406,6 +3443,7 @@ dependencies = [ "delphinus-zkwasm", "env_logger", "halo2_proofs", + "halo2aggregator-s 1.1.0 (git+https://github.com/DelphinusLab/halo2aggregator-s.git?tag=on-prove-pairing-0.6.2)", "hex", "indicatif", "log", @@ -3430,7 +3468,7 @@ dependencies = [ "clap", "ff", "halo2_proofs", - "halo2ecc-s", + "halo2ecc-s 0.3.2 (git+https://github.com/DelphinusLab/halo2ecc-s.git?tag=on-prove-pairing-0.6.4)", "hex", "itertools", "lazy_static", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index b8edd820a..1c7607329 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] env_logger = "0.9.3" +halo2aggregator-s = { git = "https://github.com/DelphinusLab/halo2aggregator-s.git", tag="on-prove-pairing-0.6.2" } log = "0.4.17" md5 = "0.7.0" sha2 = "0.10.6" diff --git a/crates/cli/src/config.rs b/crates/cli/src/config.rs index b171cfd46..f36077516 100644 --- a/crates/cli/src/config.rs +++ b/crates/cli/src/config.rs @@ -343,6 +343,9 @@ impl Config { println!("skip first {} slice(s)", skip); } + #[cfg(feature = "continuation")] + let mut last_post_image_table_commitment: Option<(String, String)> = None; + let mut slices = Slices::new(self.k, tables, padding)? .into_iter() .enumerate() @@ -408,12 +411,14 @@ impl Config { transcript: name_of_transcript(&self.name, index), }; + let pkey = &cached_proving_key.as_ref().unwrap().1; + let proof = match circuit { ZkWasmCircuit::Ongoing(circuit) => proof_piece_info.create_proof::( &circuit, &vec![instances.clone()], ¶ms, - &cached_proving_key.as_ref().unwrap().1, + pkey, proof_load_info.hashtype, self.scheme.into(), ), @@ -422,12 +427,43 @@ impl Config { &circuit, &vec![instances.clone()], ¶ms, - &cached_proving_key.as_ref().unwrap().1, + pkey, proof_load_info.hashtype, self.scheme.into(), ), }; + #[cfg(feature = "continuation")] + { + use crate::utils::get_named_advice_commitment; + use delphinus_zkwasm::circuits::image_table::IMAGE_COL_NAME; + use delphinus_zkwasm::circuits::post_image_table::POST_IMAGE_TABLE; + + // checks pre image col equals to last's post image col commitment + let pre_image_table_msm = + get_named_advice_commitment(pkey.get_vk(), &proof, IMAGE_COL_NAME); + + let last_post_image_table_msm = last_post_image_table_commitment.take(); + if let Some(last_post_image_table_msm) = last_post_image_table_msm { + assert_eq!( + pre_image_table_msm.x.to_string(), + last_post_image_table_msm.0 + ); + assert_eq!( + pre_image_table_msm.y.to_string(), + last_post_image_table_msm.1 + ); + } + + let post_image_table_msm = + get_named_advice_commitment(pkey.get_vk(), &proof, POST_IMAGE_TABLE); + + last_post_image_table_commitment = Some(( + post_image_table_msm.x.to_string(), + post_image_table_msm.y.to_string(), + )); + } + proof_piece_info.save_proof_data(&vec![instances.clone()], &proof, output_dir); proof_load_info.append_single_proof(proof_piece_info); @@ -450,6 +486,8 @@ impl Config { } pub(crate) fn verify(self, params_dir: &Path, output_dir: &PathBuf) -> anyhow::Result<()> { + let mut maximal_public_inputs_size = 0; + let mut proofs = { println!( "{} Reading proofs from {:?}", @@ -463,6 +501,16 @@ impl Config { let proofs: Vec> = ProofInfo::load_proof(output_dir, params_dir, &proof_load_info); + for proof in &proofs { + maximal_public_inputs_size = usize::max( + maximal_public_inputs_size, + proof + .instances + .iter() + .fold(0, |acc, x| usize::max(acc, x.len())), + ); + } + proofs } .into_iter() @@ -474,18 +522,13 @@ impl Config { proofs.len() ); + let params_verifier = { + let params = self.read_params(params_dir)?; + params.verifier(maximal_public_inputs_size)? + }; + let progress_bar = ProgressBar::new(proofs.len() as u64); while let Some(proof) = proofs.next() { - let params_verifier = { - let public_inputs_size = proof - .instances - .iter() - .fold(0, |acc, x| usize::max(acc, x.len())); - - let params = self.read_params(params_dir)?; - params.verifier(public_inputs_size)? - }; - { let mut buf = Vec::new(); proof.vkey.write(&mut Cursor::new(&mut buf))?; @@ -514,42 +557,6 @@ impl Config { .verify_proof(¶ms_verifier, self.scheme.into()) .unwrap(); - // TODO: handle checksum sanity check - // #[cfg(feature = "uniform-circuit")] - // { - // use delphinus_zkwasm::circuits::image_table::IMAGE_COL_NAME; - // use halo2_proofs::plonk::get_advice_commitments_from_transcript; - // use halo2aggregator_s::transcript::poseidon::PoseidonRead; - - // let _img_col_idx = proof - // .vkey - // .cs - // .named_advices - // .iter() - // .find(|(k, _)| k == IMAGE_COL_NAME) - // .unwrap() - // .1; - // let _img_col_commitment: Vec = - // get_advice_commitments_from_transcript::( - // &proof.vkey, - // &mut PoseidonRead::init(&proof.transcripts[..]), - // ) - // .unwrap(); - - // assert!( - // vec![_img_col_commitment[_img_col_idx as usize]][0] - // .x - // .to_string() - // == self.checksum.0 - // ); - // assert!( - // vec![_img_col_commitment[_img_col_idx as usize]][0] - // .y - // .to_string() - // == self.checksum.1 - // ); - // } - progress_bar.inc(1); } progress_bar.finish_and_clear(); diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index bdd9574e0..a17e1d15f 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -17,6 +17,7 @@ use delphinus_zkwasm::runtime::host::default_env::ExecutionArg; use args::HostMode; use config::Config; use delphinus_zkwasm::runtime::host::HostEnvBuilder; +use delphinus_zkwasm::zkwasm_host_circuits::host::db::MongoDB; use file_backend::FileBackendBuilder; use names::name_of_config; use specs::args::parse_args; @@ -29,6 +30,8 @@ mod config; mod file_backend; mod names; +pub mod utils; + const TRIVIAL_WASM: &str = r#" (module (func (export "zkmain")) @@ -84,7 +87,7 @@ fn main() -> Result<()> { private_inputs, context_inputs, indexed_witness: Rc::new(RefCell::new(HashMap::default())), - tree_db: None, + tree_db: Some(Rc::new(RefCell::new(MongoDB::new([0; 32], None)))), }, arg.running_arg.context_output, arg.instruction_limit, @@ -121,7 +124,7 @@ fn main() -> Result<()> { private_inputs, context_inputs, indexed_witness: Rc::new(RefCell::new(HashMap::default())), - tree_db: None, + tree_db: Some(Rc::new(RefCell::new(MongoDB::new([0; 32], None)))), }, arg.running_arg.context_output, arg.mock_test, @@ -142,7 +145,7 @@ fn main() -> Result<()> { private_inputs, context_inputs, indexed_witness: Rc::new(RefCell::new(HashMap::default())), - tree_db: None, + tree_db: Some(Rc::new(RefCell::new(MongoDB::new([0; 32], None)))), }, arg.running_arg.context_output, arg.mock_test, diff --git a/crates/cli/src/utils.rs b/crates/cli/src/utils.rs new file mode 100644 index 000000000..372682531 --- /dev/null +++ b/crates/cli/src/utils.rs @@ -0,0 +1,22 @@ +use halo2_proofs::pairing::bn256::Bn256; +use halo2_proofs::pairing::bn256::G1Affine; +use halo2_proofs::plonk::get_advice_commitments_from_transcript; +use halo2_proofs::plonk::VerifyingKey; +use halo2aggregator_s::transcript::poseidon::PoseidonRead; + +pub fn get_named_advice_commitment( + vkey: &VerifyingKey, + proof: &[u8], + named_advice: &str, +) -> G1Affine { + let img_col_idx = vkey + .cs + .named_advices + .iter() + .find(|(k, _)| k == named_advice) + .unwrap() + .1; + + get_advice_commitments_from_transcript::(vkey, &mut PoseidonRead::init(proof)) + .unwrap()[img_col_idx as usize] +} diff --git a/crates/host/src/host/ecc_helper/jubjub/mod.rs b/crates/host/src/host/ecc_helper/jubjub/mod.rs index 2f9e95176..8a2e17b82 100644 --- a/crates/host/src/host/ecc_helper/jubjub/mod.rs +++ b/crates/host/src/host/ecc_helper/jubjub/mod.rs @@ -1,17 +1,25 @@ -pub mod sum; -use ark_std::Zero; +use delphinus_zkwasm::runtime::monitor::plugins::table::Command; +use halo2_proofs::pairing::bn256::Fr; use halo2_proofs::pairing::bn256::Fr as BabyJubjubFq; use num_bigint::BigUint; use num_traits::FromPrimitive; +use num_traits::Zero; use std::ops::AddAssign; use std::ops::Shl; +use zkwasm_host_circuits::circuits::babyjub::AltJubChip; +use zkwasm_host_circuits::circuits::host::HostOpSelector; use zkwasm_host_circuits::host::jubjub; - -const LIMBSZ: usize = 64; -const LIMBNB: usize = 4; +use zkwasm_host_circuits::host::ForeignInst; +use zkwasm_host_circuits::proof::OpType; use super::bn_to_field; use super::field_to_bn; +use crate::PluginFlushStrategy; + +pub mod sum; + +const LIMBSZ: usize = 64; +const LIMBNB: usize = 4; pub fn fetch_fq(limbs: &[u64], index: usize) -> BabyJubjubFq { let mut bn = BigUint::zero(); @@ -42,3 +50,71 @@ pub fn babyjubjub_fq_to_limbs(result_limbs: &mut Vec, f: BabyJubjubFq) { result_limbs.append(&mut vec![value]); } } + +pub(crate) struct JubJubFlushStrategy { + current: usize, + group: usize, + maximal_group: usize, + new_msm: bool, +} + +impl JubJubFlushStrategy { + pub(crate) fn new(k: u32) -> Self { + Self { + current: 0, + group: 0, + maximal_group: AltJubChip::::max_rounds(k as usize), + new_msm: true, + } + } + + fn group_size() -> usize { + // new + scalar + point + result point + 1 + 4 + 8 + 8 + } +} + +impl PluginFlushStrategy for JubJubFlushStrategy { + fn notify(&mut self, op: &ForeignInst, value: Option) -> Vec { + let op_type = OpType::JUBJUBSUM as usize; + + self.current += 1; + + if *op as usize == ForeignInst::JubjubSumNew as usize { + let value = value.unwrap(); + assert!(value == 0 || value == 1); + + self.new_msm = value == 1; + + if self.new_msm { + return vec![Command::Finalize(op_type), Command::Start(op_type)]; + } else { + return vec![Command::Start(op_type)]; + } + } + + if self.current == JubJubFlushStrategy::group_size() { + self.current = 0; + self.group += 1; + + let mut commands = vec![Command::Commit(op_type, self.new_msm)]; + + if self.group >= self.maximal_group { + commands.push(Command::Abort); + } + + return commands; + } + + vec![Command::Noop] + } + + fn reset(&mut self) { + self.current = 0; + self.group = 0; + } + + fn maximal_group(&self) -> Option { + Some(self.maximal_group) + } +} diff --git a/crates/host/src/host/hash_helper/poseidon.rs b/crates/host/src/host/hash_helper/poseidon.rs index fc4dc2660..e54532d29 100644 --- a/crates/host/src/host/hash_helper/poseidon.rs +++ b/crates/host/src/host/hash_helper/poseidon.rs @@ -1,11 +1,14 @@ use delphinus_zkwasm::runtime::host::host_env::HostEnv; use delphinus_zkwasm::runtime::host::ForeignContext; use delphinus_zkwasm::runtime::host::ForeignStatics; +use delphinus_zkwasm::runtime::monitor::plugins::table::Command; use ff::PrimeField; use halo2_proofs::pairing::bn256::Fr; use poseidon::Poseidon; use std::rc::Rc; pub use zkwasm_host_circuits::host::poseidon::POSEIDON_HASHER; +use zkwasm_host_circuits::host::ForeignInst; +use zkwasm_host_circuits::proof::OpType; use zkwasm_host_circuits::host::Reduce; use zkwasm_host_circuits::host::ReduceRule; @@ -123,6 +126,8 @@ impl ForeignContext for PoseidonContext { } use specs::external_host_call_table::ExternalHostCallSignature; + +use crate::PluginFlushStrategy; pub fn register_poseidon_foreign(env: &mut HostEnv) { let foreign_poseidon_plugin = env .external_env @@ -170,3 +175,71 @@ pub fn register_poseidon_foreign(env: &mut HostEnv) { ), ); } + +pub(crate) struct PoseidonFlushStrategy { + current: usize, + group: usize, + maximal_group: usize, + new_hasher: bool, +} + +impl PoseidonFlushStrategy { + pub(crate) fn new(k: u32) -> Self { + Self { + current: 0, + group: 0, + maximal_group: PoseidonChip::max_rounds(k as usize), + new_hasher: true, + } + } + + fn group_size() -> usize { + // new + push + result + 1 + 4 * 8 + 4 + } +} + +impl PluginFlushStrategy for PoseidonFlushStrategy { + fn notify(&mut self, op: &ForeignInst, value: Option) -> Vec { + let op_type = OpType::POSEIDONHASH as usize; + + self.current += 1; + + if *op as usize == ForeignInst::PoseidonNew as usize { + let value = value.unwrap(); + assert!(value == 0 || value == 1); + + self.new_hasher = value == 1; + + if self.new_hasher { + return vec![Command::Finalize(op_type), Command::Start(op_type)]; + } else { + return vec![Command::Start(op_type)]; + } + } + + if self.current == PoseidonFlushStrategy::group_size() { + self.current = 0; + self.group += 1; + + let mut commands = vec![Command::Commit(op_type, self.new_hasher)]; + + if self.group >= self.maximal_group { + commands.push(Command::Abort); + } + + return commands; + } + + vec![Command::Noop] + } + + fn reset(&mut self) { + self.current = 0; + self.group = 0; + } + + fn maximal_group(&self) -> Option { + Some(self.maximal_group) + } +} diff --git a/crates/host/src/host/merkle_helper/mod.rs b/crates/host/src/host/merkle_helper/mod.rs index d6b71bb10..555634281 100644 --- a/crates/host/src/host/merkle_helper/mod.rs +++ b/crates/host/src/host/merkle_helper/mod.rs @@ -1,2 +1,85 @@ +use delphinus_zkwasm::runtime::monitor::plugins::table::Command; +use halo2_proofs::pairing::bn256::Fr; +use zkwasm_host_circuits::circuits::host::HostOpSelector; +use zkwasm_host_circuits::circuits::merkle::MerkleChip; +use zkwasm_host_circuits::host::ForeignInst; +use zkwasm_host_circuits::proof::OpType; + +use crate::PluginFlushStrategy; +use crate::MERKLE_TREE_HEIGHT; + pub mod datacache; pub mod merkle; + +pub(crate) struct MerkleFlushStrategy { + current: usize, + group: usize, + maximal_group: usize, + is_set: bool, +} + +impl MerkleFlushStrategy { + pub(crate) fn new(k: u32) -> Self { + Self { + current: 0, + group: 0, + maximal_group: MerkleChip::::max_rounds(k as usize), + is_set: false, + } + } + + fn group_size() -> usize { + // address + set_root + get/set + get_root + 1 + 4 + 4 + 4 + } +} + +impl PluginFlushStrategy for MerkleFlushStrategy { + fn notify(&mut self, op: &ForeignInst, _value: Option) -> Vec { + let op_type = OpType::MERKLE as usize; + + self.current += 1; + + if *op as usize == ForeignInst::MerkleAddress as usize { + self.is_set = false; + + return vec![Command::Start(op_type)]; + } + + if *op as usize == ForeignInst::MerkleSet as usize { + self.is_set = true; + } + + if *op as usize == ForeignInst::MerkleGet as usize { + return vec![Command::Finalize(op_type), Command::Noop]; + } + + if self.current == MerkleFlushStrategy::group_size() { + self.current = 0; + self.group += 1; + + let mut commands = if self.is_set { + vec![Command::Commit(op_type, false), Command::Finalize(op_type)] + } else { + vec![Command::Commit(op_type, true)] + }; + + if self.group >= self.maximal_group { + commands.push(Command::Abort); + } + + return commands; + } + + vec![Command::Noop] + } + + fn reset(&mut self) { + self.current = 0; + self.group = 0; + } + + fn maximal_group(&self) -> Option { + Some(self.maximal_group) + } +} diff --git a/crates/host/src/lib.rs b/crates/host/src/lib.rs index fcd9b67db..70174f078 100644 --- a/crates/host/src/lib.rs +++ b/crates/host/src/lib.rs @@ -1,33 +1,33 @@ #![deny(warnings)] -pub mod host; -use num_traits::FromPrimitive; -use std::cell::RefCell; -use std::rc::Rc; - use delphinus_zkwasm::foreign::context::runtime::register_context_foreign; use delphinus_zkwasm::foreign::log_helper::register_log_foreign; use delphinus_zkwasm::foreign::require_helper::register_require_foreign; use delphinus_zkwasm::foreign::wasm_input_helper::runtime::register_wasm_input_foreign; use delphinus_zkwasm::runtime::host::default_env::ExecutionArg; - use delphinus_zkwasm::runtime::host::host_env::HostEnv; use delphinus_zkwasm::runtime::host::HostEnvBuilder; +use delphinus_zkwasm::runtime::monitor::plugins::table::transaction::TransactionId; use delphinus_zkwasm::runtime::monitor::plugins::table::Command; use delphinus_zkwasm::runtime::monitor::plugins::table::Event; use delphinus_zkwasm::runtime::monitor::plugins::table::FlushStrategy; -use halo2_proofs::pairing::bn256::Fr; +use host::ecc_helper::jubjub::JubJubFlushStrategy; +use host::hash_helper::poseidon::PoseidonFlushStrategy; +use host::merkle_helper::MerkleFlushStrategy; +use num_traits::FromPrimitive; use serde::Deserialize; use serde::Serialize; +use std::cell::RefCell; use std::collections::HashMap; -use zkwasm_host_circuits::circuits::babyjub::AltJubChip; -use zkwasm_host_circuits::circuits::host::HostOpSelector; -use zkwasm_host_circuits::circuits::merkle::MerkleChip; -use zkwasm_host_circuits::circuits::poseidon::PoseidonChip; +use std::rc::Rc; use zkwasm_host_circuits::host::db::TreeDB; use zkwasm_host_circuits::host::ForeignInst; use zkwasm_host_circuits::proof::OpType; +pub mod host; + +// TODO: move into zkwasm-host-circuits repo + #[derive(Serialize, Deserialize, Debug)] pub struct HostEnvConfig { pub ops: Vec, @@ -90,6 +90,42 @@ impl GroupedForeign for ForeignInst { } } +trait GroupedForeignPlugin { + fn new_plugin_flush_strategy(&self, k: u32) -> Box; +} + +impl GroupedForeignPlugin for OpType { + fn new_plugin_flush_strategy(&self, k: u32) -> Box { + match self { + OpType::POSEIDONHASH => Box::new(PoseidonFlushStrategy::new(k)), + OpType::MERKLE => Box::new(MerkleFlushStrategy::new(k)), + OpType::JUBJUBSUM => Box::new(JubJubFlushStrategy::new(k)), + _ => Box::new(TrivialPluginFlushStrategy {}), + } + } +} + +trait PluginFlushStrategy { + fn notify(&mut self, op: &ForeignInst, value: Option) -> Vec; + fn reset(&mut self); + + fn maximal_group(&self) -> Option; +} + +struct TrivialPluginFlushStrategy {} + +impl PluginFlushStrategy for TrivialPluginFlushStrategy { + fn notify(&mut self, _op: &ForeignInst, _value: Option) -> Vec { + vec![Command::Noop] + } + + fn reset(&mut self) {} + + fn maximal_group(&self) -> Option { + None + } +} + impl StandardHostEnvBuilder { pub fn new(k: u32) -> Self { Self { @@ -108,69 +144,60 @@ impl StandardHostEnvBuilder { #[derive(Default)] struct StandardHostEnvFlushStrategy { k: u32, - ops: HashMap, + ops: HashMap>, } -trait OpTypeFlushHelper { - fn get_group_size(&self) -> usize; - fn get_max_bound(&self, k: usize) -> usize; -} +impl FlushStrategy for StandardHostEnvFlushStrategy { + fn notify(&mut self, op: Event) -> Vec { + match op { + Event::HostCall(op, value) => { + let inst = ForeignInst::from_usize(op); + if inst.is_none() { + return vec![Command::Noop]; + } -impl OpTypeFlushHelper for OpType { - fn get_group_size(&self) -> usize { - match self { - OpType::MERKLE => 1 + 4 + 4 + 4, // address + set_root + get/set + get_root - OpType::JUBJUBSUM => 1 + 4 + 8 + 8, // new + scalar + point + result point - OpType::POSEIDONHASH => 1 + 4 * 8 + 4, // new + push + result - _ => unreachable!(), - } - } + let inst = inst.unwrap(); + let op_type = inst.get_optype(); + if op_type.is_none() { + return vec![Command::Noop]; + } - fn get_max_bound(&self, k: usize) -> usize { - match self { - OpType::MERKLE => MerkleChip::::max_rounds(k), - OpType::JUBJUBSUM => AltJubChip::::max_rounds(k), - OpType::POSEIDONHASH => PoseidonChip::max_rounds(k), - _ => unreachable!(), - } - } -} + let op_type = op_type.unwrap(); + let plugin = self + .ops + .entry(op_type.clone() as usize) + .or_insert_with(|| op_type.new_plugin_flush_strategy(self.k)); -impl FlushStrategy for StandardHostEnvFlushStrategy { - fn notify(&mut self, op: Event) -> Command { - match op { - Event::HostCall(op) => { - let op_type = ForeignInst::from_usize(op).unwrap().get_optype(); - if let Some(optype) = op_type { - // cargo clippy false positive - #[allow(clippy::redundant_clone)] - let (count, total) = self.ops.entry(optype.clone() as usize).or_insert((0, 0)); - - *count += 1; - - if *count == 1 { - Command::Start(optype as usize) - } else if *count == optype.get_group_size() { - *total += 1; - *count = 0; - - if *total >= optype.get_max_bound(self.k as usize) { - Command::CommitAndAbort(optype as usize) - } else { - Command::Commit(optype as usize) - } - } else { - Command::Noop - } - } else { - Command::Noop + plugin.notify(&inst, value) + } + Event::Reset() => { + for (_, plugin) in self.ops.iter_mut() { + plugin.reset(); } + + vec![Command::Noop] } - Event::Reset => { - self.ops.clear(); - Command::Noop + } + } + + fn maximal_group(&self, transaction: TransactionId) -> Option { + // FIXME: add usize to zkwasm-host-circuits repo + fn optype_from_usize(index: usize) -> OpType { + match index { + 0 => OpType::BLS381PAIR, + 1 => OpType::BLS381SUM, + 2 => OpType::BN256PAIR, + 3 => OpType::BN256SUM, + 4 => OpType::POSEIDONHASH, + 5 => OpType::KECCAKHASH, + 6 => OpType::MERKLE, + 7 => OpType::JUBJUBSUM, + _ => unreachable!(), } } + optype_from_usize(transaction) + .new_plugin_flush_strategy(self.k) + .maximal_group() } } diff --git a/crates/zkwasm/src/runtime/host/default_env.rs b/crates/zkwasm/src/runtime/host/default_env.rs index cf24dfea3..0e96b5aaa 100644 --- a/crates/zkwasm/src/runtime/host/default_env.rs +++ b/crates/zkwasm/src/runtime/host/default_env.rs @@ -8,6 +8,7 @@ use crate::foreign::context::runtime::register_context_foreign; use crate::foreign::log_helper::register_log_foreign; use crate::foreign::require_helper::register_require_foreign; use crate::foreign::wasm_input_helper::runtime::register_wasm_input_foreign; +use crate::runtime::monitor::plugins::table::transaction::TransactionId; use crate::runtime::monitor::plugins::table::Command; use crate::runtime::monitor::plugins::table::Event; use crate::runtime::monitor::plugins::table::FlushStrategy; @@ -41,8 +42,12 @@ impl DefaultHostEnvBuilder { struct DefaultFlushStrategy; impl FlushStrategy for DefaultFlushStrategy { - fn notify(&mut self, _event: Event) -> Command { - Command::Noop + fn notify(&mut self, _event: Event) -> Vec { + vec![Command::Noop] + } + + fn maximal_group(&self, _transaction: TransactionId) -> Option { + None } } diff --git a/crates/zkwasm/src/runtime/monitor/plugins/table/mod.rs b/crates/zkwasm/src/runtime/monitor/plugins/table/mod.rs index 7b9afa717..b24b6d15f 100644 --- a/crates/zkwasm/src/runtime/monitor/plugins/table/mod.rs +++ b/crates/zkwasm/src/runtime/monitor/plugins/table/mod.rs @@ -22,10 +22,9 @@ use specs::types::ValueType; use specs::CompilationTable; use specs::ExecutionTable; use specs::Tables; -use transaction::HostTransaction; use transaction::TransactionId; +use transaction::TransactionSlicer; use wasmi::func::FuncInstanceInternal; - use wasmi::isa::Instruction; use wasmi::isa::Keep; use wasmi::memory_units::Pages; @@ -66,27 +65,33 @@ pub use specs::slice_backend::InMemoryBackendSlice; const DEFAULT_MEMORY_INDEX: u32 = 0; const DEFAULT_TABLE_INDEX: u32 = 0; +type HostTransaction = transaction::v2::HostTransaction; + #[derive(PartialEq)] pub enum Command { + /* Control transaction(start, commit) */ + // Do nothing but insert event Noop, - // Start a new transaction from current instruction + // Start a transaction Start(TransactionId), - // Commit the transaction including the current instruction - Commit(TransactionId), - // Flush the table at next host call instruction + // Commit a transaction with optional automatically finalizing timer + Commit(TransactionId, bool), + + /* Control slice */ Abort, - // Commit the transaction with current instruction and flush the table - // at next host call instruction - CommitAndAbort(TransactionId), + + /* Control dependencies */ + Finalize(TransactionId), } pub enum Event { - HostCall(usize), - Reset, + HostCall(usize, Option), + Reset(), } pub trait FlushStrategy { - fn notify(&mut self, op: Event) -> Command; + fn notify(&mut self, op: Event) -> Vec; + fn maximal_group(&self, transaction: TransactionId) -> Option; } pub struct TablePlugin { @@ -142,7 +147,7 @@ impl TablePlugin { context_output_table: vec![], host_transaction: HostTransaction::::new( - capacity, + capacity as usize, slice_backend_builder, flush_strategy, ), @@ -183,8 +188,7 @@ impl TablePlugin { configure_table, initial_frame_table: Arc::new( self.host_transaction - .slice_builder - .frame_table_builder + .frame_table_builder_get() .build_initial_frame_table(), ), initialization_state, @@ -193,7 +197,7 @@ impl TablePlugin { pub fn into_tables(self) -> Tables { let compilation_tables = self.into_compilation_table(); - let slice_backend = self.host_transaction.finalized(); + let slice_backend = self.host_transaction.finalize(); Tables { compilation_tables, @@ -234,7 +238,7 @@ impl TablePlugin { step_info, }; - self.host_transaction.insert(event); + self.host_transaction.push_event(event); } fn push_frame(&mut self, frame_id: u32) { @@ -360,14 +364,12 @@ impl Monitor for TablePlugin { }; self.host_transaction - .slice_builder - .frame_table_builder + .frame_table_builder_get_mut() .push_static_entry(*zkmain_idx as u32, 0, 0); if let Some(start_idx) = module.start_section() { self.host_transaction - .slice_builder - .frame_table_builder + .frame_table_builder_get_mut() .push_static_entry(start_idx, *zkmain_idx as u32, 0); self.start_fid = Some(start_idx); diff --git a/crates/zkwasm/src/runtime/monitor/plugins/table/slice_builder.rs b/crates/zkwasm/src/runtime/monitor/plugins/table/slice_builder.rs index 62a831c24..8df2780c8 100644 --- a/crates/zkwasm/src/runtime/monitor/plugins/table/slice_builder.rs +++ b/crates/zkwasm/src/runtime/monitor/plugins/table/slice_builder.rs @@ -6,7 +6,7 @@ use specs::slice_backend::Slice; use super::frame_table_builder::FrameTableBuilder; -pub(super) struct SliceBuilder { +pub struct SliceBuilder { pub(super) frame_table_builder: FrameTableBuilder, } diff --git a/crates/zkwasm/src/runtime/monitor/plugins/table/transaction/mod.rs b/crates/zkwasm/src/runtime/monitor/plugins/table/transaction/mod.rs new file mode 100644 index 000000000..8b4bb51ff --- /dev/null +++ b/crates/zkwasm/src/runtime/monitor/plugins/table/transaction/mod.rs @@ -0,0 +1,16 @@ +use specs::etable::EventTableEntry; +use specs::slice_backend::SliceBackendBuilder; + +use super::frame_table_builder::FrameTableBuilder; + +pub(super) mod v1; +pub(super) mod v2; + +pub type TransactionId = usize; + +pub(super) trait TransactionSlicer { + fn push_event(&mut self, event: EventTableEntry); + fn frame_table_builder_get(&self) -> &FrameTableBuilder; + fn frame_table_builder_get_mut(&mut self) -> &mut FrameTableBuilder; + fn finalize(self) -> Vec; +} diff --git a/crates/zkwasm/src/runtime/monitor/plugins/table/transaction.rs b/crates/zkwasm/src/runtime/monitor/plugins/table/transaction/v1.rs similarity index 62% rename from crates/zkwasm/src/runtime/monitor/plugins/table/transaction.rs rename to crates/zkwasm/src/runtime/monitor/plugins/table/transaction/v1.rs index b589fd436..fabf2c9d8 100644 --- a/crates/zkwasm/src/runtime/monitor/plugins/table/transaction.rs +++ b/crates/zkwasm/src/runtime/monitor/plugins/table/transaction/v1.rs @@ -5,13 +5,14 @@ use specs::etable::EventTableEntry; use specs::slice_backend::SliceBackendBuilder; use specs::step::StepInfo; +use crate::runtime::monitor::plugins::table::frame_table_builder::FrameTableBuilder; +use crate::runtime::monitor::plugins::table::slice_builder::SliceBuilder; +use crate::runtime::monitor::plugins::table::Command; use crate::runtime::monitor::plugins::table::Event; +use crate::runtime::monitor::plugins::table::FlushStrategy; -use super::slice_builder::SliceBuilder; -use super::Command; -use super::FlushStrategy; - -pub(crate) type TransactionId = usize; +use super::TransactionId; +use super::TransactionSlicer; struct Checkpoint { // transaction start index @@ -19,12 +20,12 @@ struct Checkpoint { } struct SafelyAbortPosition { - capacity: u32, + capacity: usize, cursor: Option, } impl SafelyAbortPosition { - fn new(capacity: u32) -> Self { + fn new(capacity: usize) -> Self { Self { capacity, cursor: None, @@ -40,14 +41,14 @@ impl SafelyAbortPosition { } fn finalize(&self) -> usize { - self.cursor.unwrap_or(self.capacity as usize) + self.cursor.unwrap_or(self.capacity) } } -pub(super) struct HostTransaction { +pub struct HostTransaction { slice_backend_builder: B, slices: Vec, - capacity: u32, + capacity: usize, safely_abort_position: SafelyAbortPosition, logs: Vec, @@ -55,12 +56,13 @@ pub(super) struct HostTransaction { controller: Box, host_is_full: bool, - pub(crate) slice_builder: SliceBuilder, + pub slice_builder: SliceBuilder, } impl HostTransaction { - pub(super) fn new( - capacity: u32, + #[allow(dead_code)] + pub fn new( + capacity: usize, slice_backend_builder: B, controller: Box, ) -> Self { @@ -137,63 +139,70 @@ impl HostTransaction { // controller should be reset and we will replay the remaining logs { - let command = self.controller.notify(Event::Reset); - assert!(command == Command::Noop); + let command = self.controller.notify(Event::Reset()); + assert!(command == vec![Command::Noop]); self.replay(logs); } } - - pub(super) fn finalized(mut self) -> Vec { - self.abort(); - - self.slices - } } impl HostTransaction { fn replay(&mut self, logs: Vec) { for log in logs { - self.insert(log); + self.push_event(log); } } +} - pub(crate) fn insert(&mut self, log: EventTableEntry) { - if self.logs.len() == self.capacity as usize { +impl TransactionSlicer for HostTransaction { + fn push_event(&mut self, event: EventTableEntry) { + if self.logs.len() == self.capacity { self.abort(); } - let command = match log.step_info { - StepInfo::ExternalHostCall { op, .. } => { + let commands = match event.step_info { + StepInfo::ExternalHostCall { op, value, .. } => { if self.host_is_full { self.abort(); } - self.controller.notify(Event::HostCall(op)) + self.controller.notify(Event::HostCall(op, value)) } - _ => Command::Noop, + _ => vec![Command::Noop], }; - match command { - Command::Noop => { - self.logs.push(log); - } - Command::Start(id) => { - self.start(id); - self.logs.push(log); - } - Command::Commit(id) => { - self.logs.push(log); - self.commit(id); - } - Command::Abort => { - self.insert(log); - self.host_is_full = true; - } - Command::CommitAndAbort(id) => { - self.logs.push(log); - self.commit(id); - self.host_is_full = true; + for command in commands { + match command { + Command::Noop => { + self.logs.push(event.clone()); + } + Command::Start(id) => { + self.start(id); + self.logs.push(event.clone()); + } + Command::Commit(id, _) => { + self.logs.push(event.clone()); + self.commit(id); + } + Command::Abort => { + self.host_is_full = true; + } + Command::Finalize(_) => (), } } } + + fn finalize(mut self) -> Vec { + self.abort(); + + self.slices + } + + fn frame_table_builder_get(&self) -> &FrameTableBuilder { + &self.slice_builder.frame_table_builder + } + + fn frame_table_builder_get_mut(&mut self) -> &mut FrameTableBuilder { + &mut self.slice_builder.frame_table_builder + } } diff --git a/crates/zkwasm/src/runtime/monitor/plugins/table/transaction/v2.rs b/crates/zkwasm/src/runtime/monitor/plugins/table/transaction/v2.rs new file mode 100644 index 000000000..3defe8fe9 --- /dev/null +++ b/crates/zkwasm/src/runtime/monitor/plugins/table/transaction/v2.rs @@ -0,0 +1,619 @@ +use std::cell::RefCell; +use std::cmp::Ordering; +use std::collections::HashMap; +use std::collections::LinkedList; +use std::rc::Rc; + +use log::warn; +use specs::etable::EventTableEntry; +use specs::slice_backend::SliceBackendBuilder; +use specs::step::StepInfo; + +use crate::runtime::monitor::plugins::table::frame_table_builder::FrameTableBuilder; +use crate::runtime::monitor::plugins::table::slice_builder::SliceBuilder; +use crate::runtime::monitor::plugins::table::Command; +use crate::runtime::monitor::plugins::table::Event; +use crate::runtime::monitor::plugins::table::FlushStrategy; + +use super::TransactionId; +use super::TransactionSlicer; + +const MAX_SLICES_IN_MEMORY: usize = 3; +const TIMER_DELAY: usize = 2; + +#[derive(Clone)] +struct Range { + start: usize, + // Inclusive bound + end: Option, +} + +impl Range { + fn contains(&self, offset: usize) -> bool { + if let Some(end) = self.end { + self.start <= offset && offset <= end + } else { + self.start <= offset + } + } + + fn before(&self, offset: usize) -> bool { + self.end.map(|end| end < offset).unwrap_or(false) + } + + fn after(&self, offset: usize) -> bool { + self.start > offset + } + + fn contains_or_after(&self, offset: usize) -> bool { + self.contains(offset) || self.after(offset) + } +} + +#[derive(Clone)] +struct Checkpoint { + range: Range, + weak_dependencies: HashMap, + // how many transactions it includes + transactions_group_number: HashMap, +} + +impl Checkpoint { + fn has_dependencies(&self) -> bool { + self.weak_dependencies + .iter() + .any(|(_tx, count)| *count != 0) + } + + // return value: + // Ordering::Greater: at least one of transaction is overflow + // Ordering::Equal: all transactions are full + // Ordering::Less : no transaction overflow and at least one of transacion is not full + fn transactions_group_number_ordering( + &self, + applied_transactions_group_number: &HashMap, + flush_strategy_controller: &dyn FlushStrategy, + ) -> Ordering { + let mut ordering = Ordering::Equal; + + for (tx, group_number) in &self.transactions_group_number { + if let Some(limit) = flush_strategy_controller.maximal_group(*tx) { + let applied = applied_transactions_group_number + .get(tx) + .cloned() + .unwrap_or_default(); + + let number = *group_number - applied; + match number.cmp(&limit) { + Ordering::Less => ordering = Ordering::Less, + Ordering::Equal => {} + Ordering::Greater => return Ordering::Greater, + } + } + } + + ordering + } +} + +struct WeakCommittedTransaction { + first_tx_start: usize, + last: usize, + count: usize, +} + +struct Checkpoints { + slice_capacity: usize, + + // the group number of the transaction which applied in slice + applied_transactions_group_number: HashMap, + // how many committed transactions exist now + total_transactions_group_number: HashMap, + + // uncommitted transaction and its start offset + transactions: HashMap, + // committed weak transactions + weak_committed: HashMap, + + checkpoints: Vec, +} + +impl Checkpoints { + fn new(slice_capacity: usize) -> Self { + Self { + slice_capacity, + applied_transactions_group_number: HashMap::default(), + total_transactions_group_number: HashMap::default(), + transactions: HashMap::default(), + weak_committed: HashMap::default(), + checkpoints: vec![Checkpoint { + range: Range { + start: 0, + end: None, + }, + weak_dependencies: HashMap::default(), + transactions_group_number: HashMap::default(), + }], + } + } + + fn start(&mut self, tx: TransactionId, offset: usize) { + // end the lastest checkpoint + if self.transactions.is_empty() { + if let Some(checkpoint) = self.checkpoints.last_mut() { + checkpoint.range.end = Some(offset); + } + } + + let old = self.transactions.insert(tx, offset); + assert!(old.is_none(), "recursive transaction is not supported yet"); + } + + fn commit(&mut self, tx: TransactionId, offset: usize) -> usize { + let start = self + .transactions + .remove(&tx) + .unwrap_or_else(|| panic!("commit a not existing transaction {}", tx)); + + let committed_tx = self.total_transactions_group_number.entry(tx).or_default(); + *committed_tx += 1; + + self.weak_committed + .entry(tx) + .and_modify(|tx| { + tx.count += 1; + tx.last = offset; + }) + .or_insert(WeakCommittedTransaction { + first_tx_start: start, + last: offset, + count: 1, + }); + + if self.transactions.is_empty() { + self.insert_checkpoint(offset); + } + + start + } + + // finalize all transactions of 'tx' to now, it is active called by host + fn finalize(&mut self, tx: TransactionId) { + let desc = self.weak_committed.remove(&tx); + + if let Some(desc) = desc { + if desc.last - desc.first_tx_start > self.slice_capacity { + panic!( + "Transactions (transaction id: {}, count: {}) cannot be placed in \ + a slice because the first transaction(start at {}) is more than \ + the slice size away from the last transaction(commit at {})", + tx, desc.count, desc.first_tx_start, desc.last + ); + } + + self.active_release_weak_dependencies_for_checkpoint_from(tx, desc.last); + } + } + + // called due to termination + fn finalize_all(&mut self) { + let weak_committed = self.weak_committed.keys().cloned().collect::>(); + + for tx in weak_committed { + self.finalize(tx); + } + } + + // Only finalized checkpoint can be applied + // release 'n' weak transactions after 'from'' + // fn force_commit_weak_n(&mut self, tx: TransactionId, from: usize, n: usize) { + // let have_started_new = + // self.passive_release_weak_dependencies_for_checkpoint_from(tx, from, n); + + // if !have_started_new { + // let desc = self.weak_committed.get_mut(&tx).unwrap(); + // desc.count = desc.count.checked_sub(n).unwrap(); + + // if desc.count == 0 { + // self.weak_committed.remove(&tx); + // } + // } + // } + + fn insert_checkpoint(&mut self, offset: usize) { + let weak_dependencies = self + .weak_committed + .iter() + .filter_map(|(tx, desc)| { + if desc.count > 0 { + Some((*tx, desc.count)) + } else { + None + } + }) + .collect(); + + self.checkpoints.push(Checkpoint { + range: Range { + start: offset, + end: None, + }, + weak_dependencies, + transactions_group_number: self.total_transactions_group_number.clone(), + }); + } + + // Only finalized checkpoint can be applied + // return value indicates whether a new id transaction have started + // fn passive_release_weak_dependencies_for_checkpoint_from( + // &mut self, + // tx: TransactionId, + // from: usize, + // n: usize, + // ) -> bool { + // assert!(n > 0); + + // for checkpoint in &mut self.checkpoints[from..] { + // let desc = checkpoint.weak_dependencies.get_mut(&tx); + + // match desc { + // Some(count) if *count >= n => { + // *count -= n; + // } + // // already finalized + // Some(count) => { + // assert_eq!(*count, 0); + // return true; + // } + // // already finalized + // None => return true, + // } + // } + + // false + // } + + // return value indicates whether a new id transaction have started + fn active_release_weak_dependencies_for_checkpoint_from( + &mut self, + tx: TransactionId, + from: usize, + ) { + for checkpoint in self.checkpoints.iter_mut().rev() { + if checkpoint.range.contains_or_after(from) { + checkpoint.weak_dependencies.remove(&tx); + } else { + break; + } + } + } + + fn apply_checkpoint(&mut self, index: usize, upper_bound: usize) -> usize { + let checkpoint = self.checkpoints[index].clone(); + + if checkpoint.has_dependencies() { + unreachable!("only firnalized checkpoint can be applied due to filter function"); + + // warn!( + // "Create a slice with weak transactions: {:?}, this may fail host circuits", + // checkpoint.weak_dependencies + // ); + + // for (tx, n) in &checkpoint.weak_dependencies { + // self.force_commit_weak_n(*tx, index, *n); + // } + } + + for (tx, n) in checkpoint.transactions_group_number { + self.applied_transactions_group_number + .entry(tx) + .and_modify(|count| *count = n) + .or_insert(n); + } + + let checkpoint = self.checkpoints[index].clone(); + + if checkpoint.range.before(upper_bound) { + self.checkpoints.drain(0..=index); + + checkpoint.range.end.unwrap() + } else { + let position = upper_bound; + + // if split + if Some(position) != checkpoint.range.end { + let checkpoint = self.checkpoints.get_mut(index).unwrap(); + checkpoint.range.start = position; + self.checkpoints.drain(0..index); + } else { + self.checkpoints.drain(0..=index); + } + + position + } + } + + fn find( + &self, + upper_bound: usize, + filter: impl Fn(&Checkpoint) -> bool, + flush_strategy_controller: &dyn FlushStrategy, + ) -> Option { + let last = self.checkpoints.binary_search_by(|checkpoint| { + let group_number_ordering = checkpoint.transactions_group_number_ordering( + &self.applied_transactions_group_number, + flush_strategy_controller, + ); + + if checkpoint.range.after(upper_bound) || group_number_ordering.is_gt() { + return Ordering::Greater; + } + + if checkpoint.range.contains(upper_bound) && group_number_ordering.is_eq() { + return Ordering::Equal; + } + + Ordering::Less + }); + let last = match last { + Ok(index) => index, + Err(index) => { + assert!(index > 0); + index - 1 + } + }; + + self.checkpoints[..=last] + .iter() + .enumerate() + .rev() + .find(|(_, checkpoint)| filter(checkpoint)) + .map(|(index, _)| index) + } + + fn checkpoint( + &mut self, + upper_bound: usize, + flush_strategy_controller: &dyn FlushStrategy, + ) -> usize { + // prefer checkpoint without weak dependencies + let index = self.find( + upper_bound, + |checkpoint| !checkpoint.has_dependencies(), + flush_strategy_controller, + ); + + self.apply_checkpoint( + index.expect("cannot find a checkpoint to meet host circuit"), + upper_bound, + ) + } +} + +struct Timer { + tx: TransactionId, + deadline: usize, + // since Rust LinkedList doesn't support remove by node + disabled: bool, +} + +pub struct HostTransaction { + capacity: usize, + last_committed_event_cursor: usize, + events: Vec, + checkpoints: Checkpoints, + controller: Box, + + timers: LinkedList>>, + transaction_to_timer: HashMap>>, + + slices: Vec, + slice_backend_builder: B, + slice_builder: SliceBuilder, +} + +impl HostTransaction { + #[allow(dead_code)] + pub fn new( + capacity: usize, + slice_backend_builder: B, + controller: Box, + ) -> Self { + Self { + capacity, + last_committed_event_cursor: 0, + events: Vec::with_capacity(capacity * MAX_SLICES_IN_MEMORY), + checkpoints: Checkpoints::new(capacity), + controller, + + timers: LinkedList::new(), + transaction_to_timer: HashMap::default(), + + slices: Vec::default(), + slice_backend_builder, + slice_builder: SliceBuilder::new(), + } + } + + fn _push_event(&mut self, event: EventTableEntry) { + self.events.push(event); + + if self.events.len() == self.capacity * MAX_SLICES_IN_MEMORY { + self.commit_slice(); + } + } + + fn commit_slice(&mut self) { + // Find a checkpoint so that the size of the slice does not exceed capacity + // return checkpoint, obliterated weak committed transactions + let checkpoint = self.checkpoints.checkpoint( + usize::min( + self.last_committed_event_cursor + self.capacity, + self.next_event_offset(), + ), + &*self.controller, + ); + + assert!( + checkpoint != self.last_committed_event_cursor, + "failed to select checkpoint" + ); + + // create slice + let event_entries = self + .events + .drain(0..(checkpoint - self.last_committed_event_cursor)) + .collect(); + let slice = self.slice_builder.build(event_entries); + self.slices.push(self.slice_backend_builder.build(slice)); + + // reset + self.last_committed_event_cursor = checkpoint; + } + + fn next_event_offset(&self) -> usize { + self.last_committed_event_cursor + self.events.len() + } + + fn start(&mut self, tx: TransactionId) { + let offset = self.next_event_offset(); + + self.checkpoints.start(tx, offset); + } + + fn commit(&mut self, tx: TransactionId, auto_finalize: bool) { + let now = self.next_event_offset(); + + let start = self.checkpoints.commit(tx, now); + + if now - start > self.capacity { + panic!( + "an overloaded transaction {} cannot be committed in a slice", + tx + ); + } + + if auto_finalize { + self.start_timer(tx, start + self.capacity * TIMER_DELAY); + } + } + + fn finalize(&mut self, tx: TransactionId) { + self.stop_timer(tx); + self.checkpoints.finalize(tx); + } + + fn finalize_all(&mut self) { + let txs = self + .transaction_to_timer + .keys() + .cloned() + .collect::>(); + + for tx in txs { + self.stop_timer(tx); + } + self.checkpoints.finalize_all(); + } + + fn start_timer(&mut self, tx: TransactionId, deadline: usize) { + // stop previous timer if exists + self.stop_timer(tx); + + let timer = Rc::new(RefCell::new(Timer { + tx, + deadline, + disabled: false, + })); + + self.timers.push_back(timer.clone()); + self.transaction_to_timer.insert(tx, timer); + } + + fn stop_timer(&mut self, tx: TransactionId) { + let timer = self.transaction_to_timer.remove(&tx); + + if let Some(timer) = timer { + let mut timer = timer.borrow_mut(); + timer.disabled = true; + } + } + + fn tick(&mut self) { + let mut delete = false; + + if let Some(timer) = self.timers.front() { + let now = self.next_event_offset(); + let timer = timer.borrow(); + let tx = timer.tx; + + if timer.deadline != now { + return; + } + + delete = true; + + if !timer.disabled { + drop(timer); + + warn!("Automatically commit overloaded weak transaction {}", tx); + self.finalize(tx); + } + } + + if delete { + self.timers.pop_front(); + } + } +} + +impl TransactionSlicer for HostTransaction { + fn push_event(&mut self, event: EventTableEntry) { + self.tick(); + + let commands = match event.step_info { + StepInfo::ExternalHostCall { op, value, .. } => { + self.controller.notify(Event::HostCall(op, value)) + } + _ => vec![Command::Noop], + }; + + for command in commands { + match command { + Command::Noop => self._push_event(event.clone()), + Command::Start(tx) => { + self.start(tx); + self._push_event(event.clone()); + } + Command::Commit(tx, timer) => { + self._push_event(event.clone()); + self.commit(tx, timer); + } + Command::Abort => { + // V2 doesn't care abort from host + } + Command::Finalize(tx) => { + self.finalize(tx); + } + } + } + } + + fn finalize(mut self) -> Vec { + self.finalize_all(); + + while !self.events.is_empty() { + self.commit_slice(); + } + + self.slices + } + + fn frame_table_builder_get(&self) -> &FrameTableBuilder { + &self.slice_builder.frame_table_builder + } + + fn frame_table_builder_get_mut(&mut self) -> &mut FrameTableBuilder { + &mut self.slice_builder.frame_table_builder + } +}