diff --git a/crates/e2e/tests/e2e/colocation_quoting.rs b/crates/e2e/tests/e2e/colocation_quoting.rs index 6721557ede..5e1152c35a 100644 --- a/crates/e2e/tests/e2e/colocation_quoting.rs +++ b/crates/e2e/tests/e2e/colocation_quoting.rs @@ -77,6 +77,8 @@ async fn uses_stale_liquidity(web3: Web3) { tracing::info!("waiting for liquidity state to update"); wait_for_condition(TIMEOUT, || async { + // Mint blocks until we evict the cached liquidty and fetch the new state. + onchain.mint_block().await; let next = services.submit_quote("e).await.unwrap(); next.quote.buy_amount != first.quote.buy_amount }) diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index 96af84abd2..4ecbaff8b1 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -292,8 +292,14 @@ where let mut mutexed = self.mutexed.lock().unwrap(); mutexed.insert(cache_miss_block, chunk.iter().cloned(), fetched); - for key in found_keys { - mutexed.recently_used.cache_set(key, ()); + if block.is_some() { + // Only if a block number was specified the caller actually cared about the most + // accurate data for these keys. Only in that case we want to be nice and + // remember the key for future background updates of the cached + // liquidity. + for key in found_keys { + mutexed.recently_used.cache_set(key, ()); + } } } @@ -336,6 +342,7 @@ where } fn get(&mut self, key: K, block: Option) -> Option<&[V]> { + let allow_background_udpates = block.is_some(); let block = block.or_else(|| { self.cached_most_recently_at_block .get(&key) @@ -345,7 +352,7 @@ where }) })?; let result = self.entries.get(&(block, key.clone())).map(Vec::as_slice); - if result.is_some_and(|values| !values.is_empty()) { + if allow_background_udpates && result.is_some_and(|values| !values.is_empty()) { self.recently_used.cache_set(key, ()); } result @@ -382,11 +389,27 @@ where fn remove_cached_blocks_older_than(&mut self, oldest_to_keep: u64) { tracing::debug!("dropping blocks older than {} from cache", oldest_to_keep); self.entries = self.entries.split_off(&(oldest_to_keep, K::first_ord())); + + // Iterate from newest block to oldest block and only keep the most recent + // liquidity around to reduce memory consumption. + let mut cached_keys = HashSet::new(); + let mut items = 0; + for ((_block, key), values) in self.entries.iter_mut().rev() { + if !cached_keys.insert(key) { + *values = vec![]; + } else { + items += values.len(); + } + } + // Afterwards drop all entries that are now empty. + self.entries.retain(|_, values| !values.is_empty()); + self.cached_most_recently_at_block .retain(|_, block| *block >= oldest_to_keep); tracing::debug!( - "the cache now contains entries for {} block-key combinations", - self.entries.len() + entries = self.entries.len(), + items, + "cache was updated and now contains", ); } @@ -469,7 +492,8 @@ mod tests { let fetcher = FakeCacheFetcher::new(vec![ TestValue::new(0, "a"), TestValue::new(1, "b"), - TestValue::new(2, "c"), + // no liquidity for key 2 on-chain + TestValue::new(3, "c"), ]); let block_number = 10u64; let block_stream = mock_single_block(BlockInfo { @@ -478,7 +502,7 @@ mod tests { }); let cache = RecentBlockCache::new( CacheConfig { - number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(), + number_of_entries_to_auto_update: NonZeroUsize::new(1).unwrap(), ..Default::default() }, fetcher, @@ -487,41 +511,40 @@ mod tests { ) .unwrap(); + let assert_keys_recently_used = |expected_keys: &[usize]| { + let cached_keys = cache + .mutexed + .lock() + .unwrap() + .keys_of_recently_used_entries() + .collect::>(); + let expected_keys: Vec<_> = expected_keys.iter().copied().map(TestKey).collect(); + assert_eq!(cached_keys, expected_keys); + }; + cache - .fetch(test_keys(0..1), Block::Recent) - .now_or_never() - .unwrap() + .fetch(test_keys(0..1), Block::Number(block_number)) + .await .unwrap(); + assert_keys_recently_used(&[0]); + + // Don't cache this because we didn't request the liquidity on a specific block. + cache.fetch(test_keys(1..2), Block::Recent).await.unwrap(); + assert_keys_recently_used(&[0]); + + // Don't cache this because there is no liquidity for this block on-chain. cache - .fetch(test_keys(1..2), Block::Recent) - .now_or_never() - .unwrap() + .fetch(test_keys(2..3), Block::Number(block_number)) + .await .unwrap(); - let keys = cache - .mutexed - .lock() - .unwrap() - .keys_of_recently_used_entries() - .collect::>(); - assert_eq!(keys, test_keys(0..2).collect()); + assert_keys_recently_used(&[0]); - // 1 is already cached, 2 isn't. - // Additionally 3 will never yield any data. We don't consider these - // keys as recently used. That's because we update data for recently used keys - // in the background. If we would consider keys without data to be recently used - // we'd issue a lot of useless update reqeusts. + // Cache the new key but evict the other key because we have a limited capacity. cache - .fetch(test_keys(1..3), Block::Recent) - .now_or_never() - .unwrap() + .fetch(test_keys(3..4), Block::Number(block_number)) + .await .unwrap(); - let keys = cache - .mutexed - .lock() - .unwrap() - .keys_of_recently_used_entries() - .collect::>(); - assert_eq!(keys, test_keys(1..3).collect()); + assert_keys_recently_used(&[3]); } #[tokio::test] @@ -535,7 +558,7 @@ mod tests { }); let cache = RecentBlockCache::new( CacheConfig { - number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(), + number_of_entries_to_auto_update: NonZeroUsize::new(4).unwrap(), ..Default::default() }, fetcher, @@ -544,34 +567,45 @@ mod tests { ) .unwrap(); - let initial_values = vec![TestValue::new(0, "hello"), TestValue::new(1, "ether")]; + // Initial state on the block chain. + let initial_values = vec![ + TestValue::new(0, "1"), + TestValue::new(1, "1"), + TestValue::new(2, "1"), + TestValue::new(3, "1"), + ]; *values.lock().unwrap() = initial_values.clone(); let result = cache - .fetch(test_keys(0..2), Block::Recent) - .now_or_never() - .unwrap() + .fetch(test_keys(0..2), Block::Number(block_number)) + .await .unwrap(); assert_eq!(result.len(), 2); - let updated_values = vec![TestValue::new(0, "hello_1"), TestValue::new(1, "ether_1")]; + let result = cache.fetch(test_keys(0..4), Block::Recent).await.unwrap(); + // We can fetch data for keys with `Recent` but we don't schedule them for auto + // updates. + assert_eq!(result.len(), 4); + + // New state on the block chain on the next block. + let updated_values = vec![ + TestValue::new(0, "2"), + TestValue::new(1, "2"), + TestValue::new(2, "2"), + TestValue::new(3, "2"), + ]; *values.lock().unwrap() = updated_values.clone(); - cache - .update_cache_at_block(block_number) - .now_or_never() - .unwrap() - .unwrap(); + cache.update_cache_at_block(block_number).await.unwrap(); values.lock().unwrap().clear(); - let result = cache - .fetch(test_keys(0..2), Block::Recent) - .now_or_never() - .unwrap() - .unwrap(); - assert_eq!(result.len(), 2); - for value in updated_values { - assert!(result.contains(&value)); - } + let result = cache.fetch(test_keys(0..4), Block::Recent).await.unwrap(); + assert_eq!(result.len(), 4); + // These keys were scheduled for background updates and show the new value. + assert!(result.contains(&updated_values[0])); + assert!(result.contains(&updated_values[1])); + // These keys were NOT scheduled for background updates and show the old value. + assert!(result.contains(&initial_values[2])); + assert!(result.contains(&initial_values[3])); } #[tokio::test] @@ -699,16 +733,16 @@ mod tests { #[tokio::test] async fn evicts_old_blocks_from_cache() { - let values = (0..10).map(|key| TestValue::new(key, "")).collect(); + let values = (0..=12).map(|key| TestValue::new(key, "")).collect(); let fetcher = FakeCacheFetcher::new(values); - let block_number = 10u64; - let block_stream = mock_single_block(BlockInfo { - number: block_number, + let block = |number| BlockInfo { + number, ..Default::default() - }); + }; + let (block_sender, block_stream) = tokio::sync::watch::channel(block(10)); let cache = RecentBlockCache::new( CacheConfig { - number_of_blocks_to_cache: NonZeroU64::new(5).unwrap(), + number_of_blocks_to_cache: NonZeroU64::new(2).unwrap(), number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(), ..Default::default() }, @@ -718,24 +752,32 @@ mod tests { ) .unwrap(); + // Fetch 10 keys on block 10; but we only have capacity to update 2 of those in + // the background. cache .fetch(test_keys(0..10), Block::Number(10)) - .now_or_never() - .unwrap() + .await .unwrap(); assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 10); - cache - .update_cache_at_block(14) - .now_or_never() - .unwrap() - .unwrap(); + + block_sender.send(block(11)).unwrap(); + // Fetch updated liquidity for 2 of the initial 10 keys + cache.update_cache_at_block(11).await.unwrap(); + // Fetch 2 new keys which are NOT scheduled for background updates + cache.fetch(test_keys(10..12), Block::Recent).await.unwrap(); assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 12); - cache - .update_cache_at_block(15) - .now_or_never() - .unwrap() - .unwrap(); + + block_sender.send(block(12)).unwrap(); + // Fetch updated liquidity for 2 of the initial 10 keys + cache.update_cache_at_block(12).await.unwrap(); assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 4); + + block_sender.send(block(13)).unwrap(); + // Update 2 blocks in background but now it's time to evict the 2 additional + // keys we fetched with `Block::Recent` because we are only allowed to + // keep state that is up to 2 blocks old. + cache.update_cache_at_block(13).await.unwrap(); + assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 2); } #[tokio::test]