diff --git a/bitacross-worker/Cargo.lock b/bitacross-worker/Cargo.lock index a6f8424a51..7820d81aff 100644 --- a/bitacross-worker/Cargo.lock +++ b/bitacross-worker/Cargo.lock @@ -612,6 +612,7 @@ dependencies = [ "parking_lot 0.12.1", "parse_duration", "prometheus", + "rayon", "regex 1.9.5", "scale-info", "serde 1.0.193", @@ -5755,9 +5756,9 @@ checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" [[package]] name = "rayon" -version = "1.7.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" dependencies = [ "either", "rayon-core", @@ -5765,14 +5766,12 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.11.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" dependencies = [ - "crossbeam-channel", "crossbeam-deque", "crossbeam-utils", - "num_cpus", ] [[package]] diff --git a/bitacross-worker/core-primitives/node-api/api-client-extensions/src/chain.rs b/bitacross-worker/core-primitives/node-api/api-client-extensions/src/chain.rs index 89321b0034..0333c41f66 100644 --- a/bitacross-worker/core-primitives/node-api/api-client-extensions/src/chain.rs +++ b/bitacross-worker/core-primitives/node-api/api-client-extensions/src/chain.rs @@ -49,6 +49,10 @@ pub trait ChainApi { from: Self::BlockNumber, to: Self::BlockNumber, ) -> ApiResult>>; + fn get_block_by_number( + &self, + block: Self::BlockNumber, + ) -> ApiResult>>; fn is_grandpa_available(&self) -> ApiResult; fn grandpa_authorities(&self, hash: Option) -> ApiResult; fn grandpa_authorities_proof(&self, hash: Option) -> ApiResult; @@ -101,6 +105,16 @@ where Ok(blocks) } + fn get_block_by_number( + &self, + block_number: Self::BlockNumber, + ) -> ApiResult>> { + match self.get_signed_block_by_num(Some(block_number))? { + Some(block) => Ok(Some(block.into())), + None => Ok(None), + } + } + fn is_grandpa_available(&self) -> ApiResult { let genesis_hash = Some(self.get_genesis_hash().expect("Failed to get genesis hash")); Ok(self diff --git a/bitacross-worker/service/Cargo.toml b/bitacross-worker/service/Cargo.toml index 68049051a9..eaa51566fe 100644 --- a/bitacross-worker/service/Cargo.toml +++ b/bitacross-worker/service/Cargo.toml @@ -20,6 +20,7 @@ log = "0.4" parking_lot = "0.12.1" parse_duration = "2.1.1" prometheus = { version = "0.13.0", features = ["process"], default-features = false } # Enabling std lead to protobuf dependency conflicts with substrate, and we don't need it. +rayon = "1.10.0" regex = "1.9.5" scale-info = { version = "2.10.0", default-features = false, features = ["derive"] } serde = "1.0" diff --git a/bitacross-worker/service/src/parentchain_handler.rs b/bitacross-worker/service/src/parentchain_handler.rs index ca1557cb33..a87f927f10 100644 --- a/bitacross-worker/service/src/parentchain_handler.rs +++ b/bitacross-worker/service/src/parentchain_handler.rs @@ -31,6 +31,7 @@ use itp_storage::StorageProof; use itp_time_utils::duration_now; use itp_types::ShardIdentifier; use log::*; +use rayon::prelude::*; use sp_consensus_grandpa::VersionedAuthorityList; use sp_runtime::traits::Header as HeaderTrait; use std::{cmp::min, sync::Arc, time::Duration}; @@ -204,10 +205,27 @@ where } loop { - let block_chunk_to_sync = self.parentchain_api.get_blocks( - start_block, - min(start_block + BLOCK_SYNC_BATCH_SIZE, curr_block_number), - )?; + let chunk_range = + start_block..min(start_block + BLOCK_SYNC_BATCH_SIZE, curr_block_number); + + let start_fetch_time = duration_now(); + + let block_chunk_to_sync = chunk_range + .into_par_iter() + .filter_map(|block_number| { + self.parentchain_api + .get_block_by_number(block_number) + .expect("failed to get block") + }) + .collect::>(); + + debug!( + "[{:?}] Fetched {} blocks in {}", + id, + block_chunk_to_sync.len(), + format_duration(duration_now().saturating_sub(start_fetch_time)) + ); + if block_chunk_to_sync.len() == BLOCK_SYNC_BATCH_SIZE as usize { let now = duration_now(); let total_blocks = curr_block_number.saturating_sub(last_synced_header_number); @@ -247,7 +265,7 @@ where vec![] } else { let evs = block_chunk_to_sync - .iter() + .par_iter() .map(|block| { self.parentchain_api.get_events_for_block(Some(block.block.header.hash())) }) @@ -260,13 +278,15 @@ where vec![] } else { block_chunk_to_sync - .iter() + .par_iter() .map(|block| { self.parentchain_api.get_events_value_proof(Some(block.block.header.hash())) }) .collect::, _>>()? }; + let sync_start_time = duration_now(); + self.enclave_api.sync_parentchain( block_chunk_to_sync.as_slice(), events_chunk_to_sync.as_slice(), @@ -275,14 +295,16 @@ where immediate_import, )?; + info!( + "[{:?}] Synced parentchain batch in {}", + id, + format_duration(duration_now().saturating_sub(sync_start_time)) + ); + let api_client_until_synced_header = block_chunk_to_sync .last() .map(|b| b.block.header.clone()) .ok_or(Error::EmptyChunk)?; - debug!( - "[{:?}] Synced {} out of {} finalized parentchain blocks", - id, api_client_until_synced_header.number, curr_block_number, - ); // #TODO: #1451: fix api/client types until_synced_header = diff --git a/bitacross-worker/service/src/tests/mocks/parentchain_api_mock.rs b/bitacross-worker/service/src/tests/mocks/parentchain_api_mock.rs index cfd1e891ba..b383c2412c 100644 --- a/bitacross-worker/service/src/tests/mocks/parentchain_api_mock.rs +++ b/bitacross-worker/service/src/tests/mocks/parentchain_api_mock.rs @@ -101,4 +101,11 @@ impl ChainApi for ParentchainApiMock { fn get_events_for_block(&self, _block_hash: Option) -> ApiResult> { Ok(Default::default()) } + + fn get_block_by_number( + &self, + block: Self::BlockNumber, + ) -> ApiResult>> { + Ok(self.parentchain.get(block as usize).cloned()) + } } diff --git a/tee-worker/Cargo.lock b/tee-worker/Cargo.lock index 05c62a3cd4..3670bfc070 100644 --- a/tee-worker/Cargo.lock +++ b/tee-worker/Cargo.lock @@ -5156,6 +5156,7 @@ dependencies = [ "parse_duration", "primitive-types", "prometheus", + "rayon", "regex 1.9.5", "scale-info", "serde 1.0.193", @@ -6838,9 +6839,9 @@ checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" [[package]] name = "rayon" -version = "1.7.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" dependencies = [ "either", "rayon-core", @@ -6848,14 +6849,12 @@ dependencies = [ [[package]] name = "rayon-core" -version = "1.11.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" dependencies = [ - "crossbeam-channel", "crossbeam-deque", "crossbeam-utils", - "num_cpus 1.16.0", ] [[package]] diff --git a/tee-worker/cli/lit_test_failed_parentchain_extrinsic.sh b/tee-worker/cli/lit_test_failed_parentchain_extrinsic.sh index b6ce22ee31..e55f73afd6 100755 --- a/tee-worker/cli/lit_test_failed_parentchain_extrinsic.sh +++ b/tee-worker/cli/lit_test_failed_parentchain_extrinsic.sh @@ -46,7 +46,7 @@ echo "New Account created: ${FIRST_NEW_ACCOUNT}" echo "Linking identity to Bob" OUTPUT=$(${CLIENT} link-identity //Bob did:litentry:substrate:${FIRST_NEW_ACCOUNT} litentry) || { echo "Link identity command failed"; exit 1; } echo "Finished Linking identity to Bob" -sleep 30 +sleep 60 echo "Capturing IDGraph Hash of Bob" INITIAL_ID_GRAPH_HASH=$(${CLIENT} id-graph-hash did:litentry:substrate:0x8eaf04151687736326c9fea17e25fc5287613693c912909cb226aa4794f26a48) || { echo "Failed to get ID Graph hash"; exit 1; } @@ -58,7 +58,7 @@ echo "New Account created: ${SECOND_NEW_ACCOUNT}" echo "Linking new identity to Bob with Eve as delegate signer" OUTPUT=$(${CLIENT} link-identity //Bob "did:litentry:substrate:${SECOND_NEW_ACCOUNT}" litentry -d //Eve) || { echo "Link identity command failed"; exit 1; } echo "Finished Linking identity to Bob" -sleep 30 +sleep 60 echo "Capturing IDGraph Hash of Bob" FINAL_ID_GRAPH_HASH=$(${CLIENT} id-graph-hash did:litentry:substrate:0x8eaf04151687736326c9fea17e25fc5287613693c912909cb226aa4794f26a48) || { echo "Failed to get ID Graph hash"; exit 1; } diff --git a/tee-worker/core-primitives/node-api/api-client-extensions/src/chain.rs b/tee-worker/core-primitives/node-api/api-client-extensions/src/chain.rs index 89321b0034..0333c41f66 100644 --- a/tee-worker/core-primitives/node-api/api-client-extensions/src/chain.rs +++ b/tee-worker/core-primitives/node-api/api-client-extensions/src/chain.rs @@ -49,6 +49,10 @@ pub trait ChainApi { from: Self::BlockNumber, to: Self::BlockNumber, ) -> ApiResult>>; + fn get_block_by_number( + &self, + block: Self::BlockNumber, + ) -> ApiResult>>; fn is_grandpa_available(&self) -> ApiResult; fn grandpa_authorities(&self, hash: Option) -> ApiResult; fn grandpa_authorities_proof(&self, hash: Option) -> ApiResult; @@ -101,6 +105,16 @@ where Ok(blocks) } + fn get_block_by_number( + &self, + block_number: Self::BlockNumber, + ) -> ApiResult>> { + match self.get_signed_block_by_num(Some(block_number))? { + Some(block) => Ok(Some(block.into())), + None => Ok(None), + } + } + fn is_grandpa_available(&self) -> ApiResult { let genesis_hash = Some(self.get_genesis_hash().expect("Failed to get genesis hash")); Ok(self diff --git a/tee-worker/service/Cargo.toml b/tee-worker/service/Cargo.toml index c875c05259..ab861c5243 100644 --- a/tee-worker/service/Cargo.toml +++ b/tee-worker/service/Cargo.toml @@ -20,6 +20,7 @@ log = "0.4" parking_lot = "0.12.1" parse_duration = "2.1.1" prometheus = { version = "0.13.0", features = ["process"], default-features = false } # Enabling std lead to protobuf dependency conflicts with substrate, and we don't need it. +rayon = "1.10.0" regex = "1.9.5" scale-info = { version = "2.10.0", default-features = false, features = ["derive"] } serde = "1.0" diff --git a/tee-worker/service/src/parentchain_handler.rs b/tee-worker/service/src/parentchain_handler.rs index d4f59fd3bb..6aa9f7069a 100644 --- a/tee-worker/service/src/parentchain_handler.rs +++ b/tee-worker/service/src/parentchain_handler.rs @@ -31,6 +31,7 @@ use itp_storage::StorageProof; use itp_time_utils::duration_now; use itp_types::ShardIdentifier; use log::*; +use rayon::prelude::*; use sp_consensus_grandpa::VersionedAuthorityList; use sp_runtime::traits::Header as HeaderTrait; use std::{cmp::min, sync::Arc, time::Duration}; @@ -204,10 +205,27 @@ where } loop { - let block_chunk_to_sync = self.parentchain_api.get_blocks( - start_block, - min(start_block + BLOCK_SYNC_BATCH_SIZE, curr_block_number), - )?; + let chunk_range = + start_block..min(start_block + BLOCK_SYNC_BATCH_SIZE, curr_block_number); + + let start_fetch_time = duration_now(); + + let block_chunk_to_sync = chunk_range + .into_par_iter() + .filter_map(|block_number| { + self.parentchain_api + .get_block_by_number(block_number) + .expect("failed to get block") + }) + .collect::>(); + + debug!( + "[{:?}] Fetched {} blocks in {}", + id, + block_chunk_to_sync.len(), + format_duration(duration_now().saturating_sub(start_fetch_time)) + ); + if block_chunk_to_sync.len() == BLOCK_SYNC_BATCH_SIZE as usize { let now = duration_now(); let total_blocks = curr_block_number.saturating_sub(last_synced_header_number); @@ -247,7 +265,7 @@ where vec![] } else { let evs = block_chunk_to_sync - .iter() + .par_iter() .map(|block| { self.parentchain_api.get_events_for_block(Some(block.block.header.hash())) }) @@ -260,13 +278,15 @@ where vec![] } else { block_chunk_to_sync - .iter() + .par_iter() .map(|block| { self.parentchain_api.get_events_value_proof(Some(block.block.header.hash())) }) .collect::, _>>()? }; + let sync_start_time = duration_now(); + self.enclave_api.sync_parentchain( block_chunk_to_sync.as_slice(), events_chunk_to_sync.as_slice(), @@ -275,14 +295,16 @@ where immediate_import, )?; + info!( + "[{:?}] Synced parentchain batch in {}", + id, + format_duration(duration_now().saturating_sub(sync_start_time)) + ); + let api_client_until_synced_header = block_chunk_to_sync .last() .map(|b| b.block.header.clone()) .ok_or(Error::EmptyChunk)?; - debug!( - "[{:?}] Synced {} out of {} finalized parentchain blocks", - id, api_client_until_synced_header.number, curr_block_number, - ); // #TODO: #1451: fix api/client types until_synced_header = diff --git a/tee-worker/service/src/tests/mocks/parentchain_api_mock.rs b/tee-worker/service/src/tests/mocks/parentchain_api_mock.rs index cfd1e891ba..b383c2412c 100644 --- a/tee-worker/service/src/tests/mocks/parentchain_api_mock.rs +++ b/tee-worker/service/src/tests/mocks/parentchain_api_mock.rs @@ -101,4 +101,11 @@ impl ChainApi for ParentchainApiMock { fn get_events_for_block(&self, _block_hash: Option) -> ApiResult> { Ok(Default::default()) } + + fn get_block_by_number( + &self, + block: Self::BlockNumber, + ) -> ApiResult>> { + Ok(self.parentchain.get(block as usize).cloned()) + } }