Skip to content

Commit

Permalink
Update settlement contract balances in background task (#2108)
Browse files Browse the repository at this point in the history
# Description
Addresses the bug that we currently fetch settlement buffer balances
once per token and don't update them in between auctions, which can lead
to solutions relying on outdate internal buffer balance information and
thus incorrectly marking interactions as internalizable causing
simulation to fail.

# Changes
- [ ] Add an e2e showcasing the problematic behavior
- [ ] Spawn a task when creating a new token fetcher, which on arrival
of new blocks update the token balances of all cached tokens.

This approach is a bit wasteful, as token balances only really change
when a settlement trading this token is included on-chain. However,
getting that information in this component is non-trivial. Moreover,
some tokens exhibit custom balance logic (e.g. rebasing tokens) which
could update the contract balance even without a settlement taking
place.

If we deem this approach too inefficient, I would suggest we restrict
the number of tokens we cache (using an LRU cache) and re-fetch all
relevant information (including decimals and symbol) in case of a cache
miss.
If we don't want to lose caching of decimals and symbols for some
tokens, I'd suggest splitting the fetcher into two components, one for
static information, and one more `RecentBlockCache` like component for
the settlement balances.

Let me know what you think.

## How to test
Added an end to end test which fails without the changes in
`driver::infra::tokens`

## Related Issues

Fixes #2093
  • Loading branch information
fleupold authored Dec 4, 2023
1 parent 0c1fcb0 commit 39f4f4a
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 3 deletions.
69 changes: 67 additions & 2 deletions crates/driver/src/infra/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use {
domain::eth,
infra::{blockchain, Ethereum},
},
ethrpc::current_block::{self, CurrentBlockStream},
futures::StreamExt,
std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
},
tracing::Instrument,
};

#[derive(Clone, Debug)]
Expand All @@ -22,10 +25,16 @@ pub struct Fetcher(Arc<Inner>);

impl Fetcher {
pub fn new(eth: Ethereum) -> Self {
Self(Arc::new(Inner {
let block_stream = eth.current_block().clone();
let inner = Arc::new(Inner {
eth,
cache: RwLock::new(HashMap::new()),
}))
});
tokio::task::spawn(
update_task(block_stream, Arc::downgrade(&inner))
.instrument(tracing::info_span!("token_fetcher")),
);
Self(inner)
}

/// Returns the `Metadata` for the given tokens. Note that the result will
Expand All @@ -39,6 +48,62 @@ impl Fetcher {
}
}

/// Runs a single cache update cycle whenever a new block arrives until the
/// fetcher is dropped.
async fn update_task(blocks: CurrentBlockStream, inner: std::sync::Weak<Inner>) {
let mut stream = current_block::into_stream(blocks);
while stream.next().await.is_some() {
let inner = match inner.upgrade() {
Some(inner) => inner,
// Fetcher was dropped, stop update task.
None => break,
};
if let Err(err) = update_balances(inner).await {
tracing::warn!(?err, "error updating token cache");
}
}
}

/// Updates the settlement contract's balance for every cached token.
async fn update_balances(inner: Arc<Inner>) -> Result<(), blockchain::Error> {
let settlement = inner.eth.contracts().settlement().address().into();
let futures = {
let cache = inner.cache.read().unwrap();
let tokens = cache.keys().cloned().collect::<Vec<_>>();
tracing::debug!(
tokens = tokens.len(),
"updating settlement contract balances"
);
tokens.into_iter().map(|token| {
let erc20 = inner.eth.erc20(token);
async move {
Ok::<(eth::TokenAddress, eth::TokenAmount), blockchain::Error>((
token,
erc20.balance(settlement).await?,
))
}
})
};

// Don't hold on to the lock while fetching balances to allow concurrent
// updates. This may lead to new entries arriving in the meantime, however
// their balances should already be up-to-date.
let mut balances = futures::future::try_join_all(futures)
.await?
.into_iter()
.collect::<HashMap<_, _>>();

let mut cache = inner.cache.write().unwrap();
for (key, entry) in cache.iter_mut() {
if let Some(balance) = balances.remove(key) {
entry.balance = balance;
} else {
tracing::info!(?key, "key without balance update");
}
}
Ok(())
}

/// Provides metadata of tokens.
struct Inner {
eth: Ethereum,
Expand Down
2 changes: 1 addition & 1 deletion crates/e2e/src/setup/colocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub async fn start_solver(weth: H160) -> Url {
r#"
weth = "{weth:?}"
base-tokens = []
max-hops = 0
max-hops = 1
max-partial-attempts = 5
risk-parameters = [0,0,0,0]
"#,
Expand Down
108 changes: 108 additions & 0 deletions crates/e2e/tests/e2e/colocation_buffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use {
e2e::{setup::*, tx},
ethcontract::prelude::U256,
model::{
order::{OrderCreation, OrderKind},
signature::EcdsaSigningScheme,
},
secp256k1::SecretKey,
shared::ethrpc::Web3,
web3::signing::SecretKeyRef,
};

#[tokio::test]
#[ignore]
async fn local_node_buffers() {
run_test(onchain_settlement_without_liquidity).await;
}

async fn onchain_settlement_without_liquidity(web3: Web3) {
let mut onchain = OnchainComponents::deploy(web3).await;

let [solver] = onchain.make_solvers(to_wei(1)).await;
let [trader] = onchain.make_accounts(to_wei(1)).await;
let [token_a, token_b] = onchain
.deploy_tokens_with_weth_uni_v2_pools(to_wei(1_000), to_wei(1_000))
.await;

// Fund trader, settlement accounts, and pool creation
token_a.mint(trader.address(), to_wei(100)).await;
token_b
.mint(onchain.contracts().gp_settlement.address(), to_wei(5))
.await;
token_a.mint(solver.address(), to_wei(1000)).await;
token_b.mint(solver.address(), to_wei(1000)).await;

// Approve GPv2 for trading
tx!(
trader.account(),
token_a.approve(onchain.contracts().allowance, to_wei(100))
);

// Start system
let solver_endpoint = colocation::start_solver(onchain.contracts().weth.address()).await;
colocation::start_driver(onchain.contracts(), &solver_endpoint, &solver);

let services = Services::new(onchain.contracts()).await;
services.start_autopilot(vec![
"--enable-colocation=true".to_string(),
format!(
"--trusted-tokens={weth:#x},{token_a:#x},{token_b:#x}",
weth = onchain.contracts().weth.address(),
token_a = token_a.address(),
token_b = token_b.address()
),
"--drivers=test_solver|http://localhost:11088/test_solver".to_string(),
]);
services.start_api(vec![]).await;

// Place Order
let order = OrderCreation {
sell_token: token_a.address(),
sell_amount: to_wei(9),
fee_amount: to_wei(1),
buy_token: token_b.address(),
buy_amount: to_wei(5),
valid_to: model::time::now_in_epoch_seconds() + 300,
kind: OrderKind::Buy,
..Default::default()
}
.sign(
EcdsaSigningScheme::Eip712,
&onchain.contracts().domain_separator,
SecretKeyRef::from(&SecretKey::from_slice(trader.private_key()).unwrap()),
);
services.create_order(&order).await.unwrap();

tracing::info!("waiting for first trade");
let trade_happened =
|| async { token_b.balance_of(trader.address()).call().await.unwrap() == order.buy_amount };
wait_for_condition(TIMEOUT, trade_happened).await.unwrap();

// Check that settlement buffers were traded.
let settlement_contract_balance = token_b
.balance_of(onchain.contracts().gp_settlement.address())
.call()
.await
.unwrap();
// Check that internal buffers were used
assert!(settlement_contract_balance == 0.into());

// Same order can trade again with external liquidity
let order = OrderCreation {
valid_to: model::time::now_in_epoch_seconds() + 301,
..order
}
.sign(
EcdsaSigningScheme::Eip712,
&onchain.contracts().domain_separator,
SecretKeyRef::from(&SecretKey::from_slice(trader.private_key()).unwrap()),
);
services.create_order(&order).await.unwrap();

tracing::info!("waiting for second trade");
let trade_happened = || async {
token_b.balance_of(trader.address()).call().await.unwrap() == order.buy_amount * 2
};
wait_for_condition(TIMEOUT, trade_happened).await.unwrap();
}
1 change: 1 addition & 0 deletions crates/e2e/tests/e2e/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// Each of the following modules contains tests.
mod app_data;
mod app_data_signer;
mod colocation_buffers;
mod colocation_ethflow;
mod colocation_hooks;
mod colocation_partial_fill;
Expand Down

0 comments on commit 39f4f4a

Please sign in to comment.