Skip to content

Commit

Permalink
Reduce memory consumption of RecentBlockCache (#2102)
Browse files Browse the repository at this point in the history
# Description
Our `RecentBlockCache` works like this:
1. somebody requests liquidity
2. cache checks if it's already known
3. if it's not in the cache query the blockchain
4. store in cache
5. remember requested liquidity source for updating it in the background

Whenever we see a new block we fetch the current liquidity for all the
liquidity sources and write them to the cache together with their block.
We have a max cache duration. Whenever the cached state exceeds that
duration we remove the oldest entries.

This implementation uses unnecessarily much memory in 2 ways:
1. We can fetch liquidity for quotes. For those requests it's okay to
return liquidity that is not 100% up-to-date. However, we still remember
the requested liquidity source for future updates. This is not great
because we can receive quote requests for all sorts of random tokens
we'll never see again.
2. We cache state for the same liquidity source for multiple blocks. But
the cache only has 2 access patterns:
    * "Give me the most recent available on the blockchain"
    * "Give me the most recent available in the cache"
There is no access pattern "Give me cached liquidity specifically from
an older block with number X"
That means it's enough to keep the most recent data for any liquidity
pool cached at any point.
    
We can see these 2 things at play with this
[log](https://production-6de61f.kb.eu-central-1.aws.cloud.es.io/app/discover#/?_g=(filters:!(),refreshInterval:(pause:!t,value:0),time:(from:'2023-11-28T16:18:27.243Z',to:'2023-11-28T17:55:08.577Z'))&_a=(columns:!(log),filters:!(),grid:(columns:('@timestamp':(width:164))),index:c0e240e0-d9b3-11ed-b0e6-e361adffce0b,interval:auto,query:(language:kuery,query:'mainnet%20and%20driver%20and%20(log:%20%22the%20cache%20now%20contains%20entries%22%20or%20log:%20%22fetched%20liquidity%20sources%22)'),sort:!(!('@timestamp',desc)))).
After ~1h of operation it shows a single `RecentBlockCache` holding ~20K
items. On an average auction we can fetch ~800 uni v2 sources. We
currently have a configuration where we cache up to 10 blocks worth of
data. Meaning we have roughly 8K cache entries for liquidity that is
needed in auction and 12K entries that's only needed for quotes.
Also this is only for a single univ2 like liquidity source. In total we
have 4 different ones configured in our backend.

# Changes
We address `1` by not remembering liquidity sources for background
updates for quote requests.
We address `2` by throwing away all the duplicated data.

## How to test
I did a manual set up where I run an autopilot locally in shadow mode
(fetch auction from prod mainnet) and a driver with all liquidity
sources enabled.
I collected 3 graphs in total to measure the impact of this change on
the memory.
1. This graph is the status quo (very noisy and not really reproducable
across runs)
<img width="1617" alt="0_current_no_optimizations"
src="https://github.com/cowprotocol/services/assets/19190235/0997b34f-8f30-43c4-a797-5e3cf7bccbbf">

2. This graph applies one optimization that is not part of this PR to
make the memory consumption more predictable across runs. I want to
merge that optimization as well but right now it's very hacky. However,
I will include this optimization in all my graphs because it makes the
impact of each optimization easier to spot.
<img width="1420" alt="1_with_univ2_call_optimization"
src="https://github.com/cowprotocol/services/assets/19190235/6f259fa4-4fcd-45dd-ba37-160962065ab3">

3. The effects of this PR's optimization. The memory usage is more
stable over all and grows less over time.
<img width="1607" alt="2_cache_eviction"
src="https://github.com/cowprotocol/services/assets/19190235/ec5b5712-e4e3-4c4e-8feb-dc46e2cfa3f3">
  • Loading branch information
MartinquaXD authored Dec 1, 2023
1 parent 7d5397c commit 1fe4c44
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 73 deletions.
2 changes: 2 additions & 0 deletions crates/e2e/tests/e2e/colocation_quoting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ async fn uses_stale_liquidity(web3: Web3) {

tracing::info!("waiting for liquidity state to update");
wait_for_condition(TIMEOUT, || async {
// Mint blocks until we evict the cached liquidty and fetch the new state.
onchain.mint_block().await;
let next = services.submit_quote(&quote).await.unwrap();
next.quote.buy_amount != first.quote.buy_amount
})
Expand Down
188 changes: 115 additions & 73 deletions crates/shared/src/recent_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,14 @@ where

let mut mutexed = self.mutexed.lock().unwrap();
mutexed.insert(cache_miss_block, chunk.iter().cloned(), fetched);
for key in found_keys {
mutexed.recently_used.cache_set(key, ());
if block.is_some() {
// Only if a block number was specified the caller actually cared about the most
// accurate data for these keys. Only in that case we want to be nice and
// remember the key for future background updates of the cached
// liquidity.
for key in found_keys {
mutexed.recently_used.cache_set(key, ());
}
}
}

Expand Down Expand Up @@ -336,6 +342,7 @@ where
}

fn get(&mut self, key: K, block: Option<u64>) -> Option<&[V]> {
let allow_background_udpates = block.is_some();
let block = block.or_else(|| {
self.cached_most_recently_at_block
.get(&key)
Expand All @@ -345,7 +352,7 @@ where
})
})?;
let result = self.entries.get(&(block, key.clone())).map(Vec::as_slice);
if result.is_some_and(|values| !values.is_empty()) {
if allow_background_udpates && result.is_some_and(|values| !values.is_empty()) {
self.recently_used.cache_set(key, ());
}
result
Expand Down Expand Up @@ -382,11 +389,27 @@ where
fn remove_cached_blocks_older_than(&mut self, oldest_to_keep: u64) {
tracing::debug!("dropping blocks older than {} from cache", oldest_to_keep);
self.entries = self.entries.split_off(&(oldest_to_keep, K::first_ord()));

// Iterate from newest block to oldest block and only keep the most recent
// liquidity around to reduce memory consumption.
let mut cached_keys = HashSet::new();
let mut items = 0;
for ((_block, key), values) in self.entries.iter_mut().rev() {
if !cached_keys.insert(key) {
*values = vec![];
} else {
items += values.len();
}
}
// Afterwards drop all entries that are now empty.
self.entries.retain(|_, values| !values.is_empty());

self.cached_most_recently_at_block
.retain(|_, block| *block >= oldest_to_keep);
tracing::debug!(
"the cache now contains entries for {} block-key combinations",
self.entries.len()
entries = self.entries.len(),
items,
"cache was updated and now contains",
);
}

Expand Down Expand Up @@ -469,7 +492,8 @@ mod tests {
let fetcher = FakeCacheFetcher::new(vec![
TestValue::new(0, "a"),
TestValue::new(1, "b"),
TestValue::new(2, "c"),
// no liquidity for key 2 on-chain
TestValue::new(3, "c"),
]);
let block_number = 10u64;
let block_stream = mock_single_block(BlockInfo {
Expand All @@ -478,7 +502,7 @@ mod tests {
});
let cache = RecentBlockCache::new(
CacheConfig {
number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(1).unwrap(),
..Default::default()
},
fetcher,
Expand All @@ -487,41 +511,40 @@ mod tests {
)
.unwrap();

let assert_keys_recently_used = |expected_keys: &[usize]| {
let cached_keys = cache
.mutexed
.lock()
.unwrap()
.keys_of_recently_used_entries()
.collect::<Vec<_>>();
let expected_keys: Vec<_> = expected_keys.iter().copied().map(TestKey).collect();
assert_eq!(cached_keys, expected_keys);
};

cache
.fetch(test_keys(0..1), Block::Recent)
.now_or_never()
.unwrap()
.fetch(test_keys(0..1), Block::Number(block_number))
.await
.unwrap();
assert_keys_recently_used(&[0]);

// Don't cache this because we didn't request the liquidity on a specific block.
cache.fetch(test_keys(1..2), Block::Recent).await.unwrap();
assert_keys_recently_used(&[0]);

// Don't cache this because there is no liquidity for this block on-chain.
cache
.fetch(test_keys(1..2), Block::Recent)
.now_or_never()
.unwrap()
.fetch(test_keys(2..3), Block::Number(block_number))
.await
.unwrap();
let keys = cache
.mutexed
.lock()
.unwrap()
.keys_of_recently_used_entries()
.collect::<HashSet<_>>();
assert_eq!(keys, test_keys(0..2).collect());
assert_keys_recently_used(&[0]);

// 1 is already cached, 2 isn't.
// Additionally 3 will never yield any data. We don't consider these
// keys as recently used. That's because we update data for recently used keys
// in the background. If we would consider keys without data to be recently used
// we'd issue a lot of useless update reqeusts.
// Cache the new key but evict the other key because we have a limited capacity.
cache
.fetch(test_keys(1..3), Block::Recent)
.now_or_never()
.unwrap()
.fetch(test_keys(3..4), Block::Number(block_number))
.await
.unwrap();
let keys = cache
.mutexed
.lock()
.unwrap()
.keys_of_recently_used_entries()
.collect::<HashSet<_>>();
assert_eq!(keys, test_keys(1..3).collect());
assert_keys_recently_used(&[3]);
}

#[tokio::test]
Expand All @@ -535,7 +558,7 @@ mod tests {
});
let cache = RecentBlockCache::new(
CacheConfig {
number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(4).unwrap(),
..Default::default()
},
fetcher,
Expand All @@ -544,34 +567,45 @@ mod tests {
)
.unwrap();

let initial_values = vec![TestValue::new(0, "hello"), TestValue::new(1, "ether")];
// Initial state on the block chain.
let initial_values = vec![
TestValue::new(0, "1"),
TestValue::new(1, "1"),
TestValue::new(2, "1"),
TestValue::new(3, "1"),
];
*values.lock().unwrap() = initial_values.clone();

let result = cache
.fetch(test_keys(0..2), Block::Recent)
.now_or_never()
.unwrap()
.fetch(test_keys(0..2), Block::Number(block_number))
.await
.unwrap();
assert_eq!(result.len(), 2);

let updated_values = vec![TestValue::new(0, "hello_1"), TestValue::new(1, "ether_1")];
let result = cache.fetch(test_keys(0..4), Block::Recent).await.unwrap();
// We can fetch data for keys with `Recent` but we don't schedule them for auto
// updates.
assert_eq!(result.len(), 4);

// New state on the block chain on the next block.
let updated_values = vec![
TestValue::new(0, "2"),
TestValue::new(1, "2"),
TestValue::new(2, "2"),
TestValue::new(3, "2"),
];
*values.lock().unwrap() = updated_values.clone();
cache
.update_cache_at_block(block_number)
.now_or_never()
.unwrap()
.unwrap();
cache.update_cache_at_block(block_number).await.unwrap();
values.lock().unwrap().clear();

let result = cache
.fetch(test_keys(0..2), Block::Recent)
.now_or_never()
.unwrap()
.unwrap();
assert_eq!(result.len(), 2);
for value in updated_values {
assert!(result.contains(&value));
}
let result = cache.fetch(test_keys(0..4), Block::Recent).await.unwrap();
assert_eq!(result.len(), 4);
// These keys were scheduled for background updates and show the new value.
assert!(result.contains(&updated_values[0]));
assert!(result.contains(&updated_values[1]));
// These keys were NOT scheduled for background updates and show the old value.
assert!(result.contains(&initial_values[2]));
assert!(result.contains(&initial_values[3]));
}

#[tokio::test]
Expand Down Expand Up @@ -699,16 +733,16 @@ mod tests {

#[tokio::test]
async fn evicts_old_blocks_from_cache() {
let values = (0..10).map(|key| TestValue::new(key, "")).collect();
let values = (0..=12).map(|key| TestValue::new(key, "")).collect();
let fetcher = FakeCacheFetcher::new(values);
let block_number = 10u64;
let block_stream = mock_single_block(BlockInfo {
number: block_number,
let block = |number| BlockInfo {
number,
..Default::default()
});
};
let (block_sender, block_stream) = tokio::sync::watch::channel(block(10));
let cache = RecentBlockCache::new(
CacheConfig {
number_of_blocks_to_cache: NonZeroU64::new(5).unwrap(),
number_of_blocks_to_cache: NonZeroU64::new(2).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(),
..Default::default()
},
Expand All @@ -718,24 +752,32 @@ mod tests {
)
.unwrap();

// Fetch 10 keys on block 10; but we only have capacity to update 2 of those in
// the background.
cache
.fetch(test_keys(0..10), Block::Number(10))
.now_or_never()
.unwrap()
.await
.unwrap();
assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 10);
cache
.update_cache_at_block(14)
.now_or_never()
.unwrap()
.unwrap();

block_sender.send(block(11)).unwrap();
// Fetch updated liquidity for 2 of the initial 10 keys
cache.update_cache_at_block(11).await.unwrap();
// Fetch 2 new keys which are NOT scheduled for background updates
cache.fetch(test_keys(10..12), Block::Recent).await.unwrap();
assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 12);
cache
.update_cache_at_block(15)
.now_or_never()
.unwrap()
.unwrap();

block_sender.send(block(12)).unwrap();
// Fetch updated liquidity for 2 of the initial 10 keys
cache.update_cache_at_block(12).await.unwrap();
assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 4);

block_sender.send(block(13)).unwrap();
// Update 2 blocks in background but now it's time to evict the 2 additional
// keys we fetched with `Block::Recent` because we are only allowed to
// keep state that is up to 2 blocks old.
cache.update_cache_at_block(13).await.unwrap();
assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 2);
}

#[tokio::test]
Expand Down

0 comments on commit 1fe4c44

Please sign in to comment.