Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into solver-competition-pr…
Browse files Browse the repository at this point in the history
…ices
  • Loading branch information
fleupold committed Dec 8, 2023
2 parents 23edd31 + 54da168 commit 27de708
Show file tree
Hide file tree
Showing 16 changed files with 215 additions and 258 deletions.
14 changes: 13 additions & 1 deletion crates/autopilot/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ impl Postgres {
let metrics = Metrics::get();

// update table row metrics
for &table in database::ALL_TABLES {
for &table in database::TABLES {
let mut ex = self.0.acquire().await?;
let count = count_rows_in_table(&mut ex, table).await?;
metrics.table_rows.with_label_values(&[table]).set(count);
}

// update table row metrics
for &table in database::LARGE_TABLES {
let mut ex = self.0.acquire().await?;
let count = estimate_rows_in_table(&mut ex, table).await?;
metrics.table_rows.with_label_values(&[table]).set(count);
}

// update unused app data metric
{
let mut ex = self.0.acquire().await?;
Expand All @@ -50,6 +57,11 @@ async fn count_rows_in_table(ex: &mut PgConnection, table: &str) -> sqlx::Result
sqlx::query_scalar(&query).fetch_one(ex).await
}

async fn estimate_rows_in_table(ex: &mut PgConnection, table: &str) -> sqlx::Result<i64> {
let query = format!("SELECT reltuples::bigint FROM pg_class WHERE relname='{table}';");
sqlx::query_scalar(&query).fetch_one(ex).await
}

async fn count_unused_app_data(ex: &mut PgConnection) -> sqlx::Result<i64> {
let query = r#"
SELECT
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ pub async fn run(args: Arguments) {
.await;
let run = RunLoop {
solvable_orders_cache,
database: db,
database: Arc::new(db),
drivers: args.drivers.into_iter().map(Driver::new).collect(),
current_block: current_block_stream,
web3,
Expand Down
50 changes: 38 additions & 12 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use {

pub struct RunLoop {
pub solvable_orders_cache: Arc<SolvableOrdersCache>,
pub database: Postgres,
pub database: Arc<Postgres>,
pub drivers: Vec<Driver>,
pub current_block: CurrentBlockStream,
pub web3: Web3,
Expand Down Expand Up @@ -299,6 +299,12 @@ impl RunLoop {
tracing::warn!(?err, driver = %driver.name, "settlement failed");
}
}
let unsettled_orders: Vec<_> = solutions
.iter()
.flat_map(|p| p.solution.orders.keys())
.filter(|uid| !solution.orders.contains_key(uid))
.collect();
Metrics::matched_unsettled(driver, unsettled_orders.as_slice());
}
}

Expand All @@ -313,17 +319,22 @@ impl RunLoop {
);
let request = &request;

let start = Instant::now();
self.database
.store_order_events(
&auction
.orders
.iter()
.map(|o| (o.metadata.uid, OrderEventLabel::Ready))
.collect_vec(),
)
.await;
tracing::debug!(elapsed=?start.elapsed(), aution_id=%id, "storing order events took");
let db = self.database.clone();
let events = auction
.orders
.iter()
.map(|o| (o.metadata.uid, OrderEventLabel::Ready))
.collect_vec();
// insert into `order_events` table operations takes a while and the result is
// ignored, so we run it in the background
tokio::spawn(
async move {
let start = Instant::now();
db.store_order_events(&events).await;
tracing::debug!(elapsed=?start.elapsed(), events_count=events.len(), "stored order events");
}
.instrument(tracing::Span::current()),
);

let start = Instant::now();
futures::future::join_all(self.drivers.iter().map(|driver| async move {
Expand Down Expand Up @@ -641,6 +652,11 @@ struct Metrics {
/// Tracks the result of driver `/settle` requests.
#[metric(labels("driver", "result"))]
settle: prometheus::IntCounterVec,

/// Tracks the number of orders that were part of some but not the winning
/// solution together with the winning driver that did't include it.
#[metric(labels("ignored_by"))]
matched_unsettled: prometheus::IntCounterVec,
}

impl Metrics {
Expand Down Expand Up @@ -719,4 +735,14 @@ impl Metrics {
.with_label_values(&[&driver.name, label])
.inc();
}

fn matched_unsettled(winning: &Driver, unsettled: &[&OrderUid]) {
if !unsettled.is_empty() {
tracing::debug!(?unsettled, "some orders were matched but not settled");
}
Self::get()
.matched_unsettled
.with_label_values(&[&winning.name])
.inc_by(unsettled.len() as u64);
}
}
14 changes: 10 additions & 4 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ use {

pub type PgTransaction<'a> = sqlx::Transaction<'a, sqlx::Postgres>;

/// The names of all tables we use in the db.
pub const ALL_TABLES: &[&str] = &[
/// The names of tables we use in the db.
pub const TABLES: &[&str] = &[
"orders",
"order_events",
"trades",
"invalidations",
"quotes",
Expand All @@ -69,10 +68,17 @@ pub const ALL_TABLES: &[&str] = &[
"app_data",
];

/// The names of potentially big volume tables we use in the db.
pub const LARGE_TABLES: &[&str] = &["order_events"];

pub fn all_tables() -> impl Iterator<Item = &'static str> {
TABLES.iter().copied().chain(LARGE_TABLES.iter().copied())
}

/// Delete all data in the database. Only used by tests.
#[allow(non_snake_case)]
pub async fn clear_DANGER_(ex: &mut PgTransaction<'_>) -> sqlx::Result<()> {
for table in ALL_TABLES {
for table in all_tables() {
ex.execute(format!("TRUNCATE {table};").as_str()).await?;
}
Ok(())
Expand Down
14 changes: 12 additions & 2 deletions crates/driver/src/domain/competition/solution/settlement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,18 @@ impl Settlement {
.fold(Default::default(), |mut acc, solution| {
for trade in solution.user_trades() {
let order = acc.entry(trade.order().uid).or_default();
order.sell = trade.sell_amount(&solution.prices).unwrap_or_default();
order.buy = trade.buy_amount(&solution.prices).unwrap_or_default();
order.sell = trade.sell_amount(&solution.prices, solution.weth).unwrap_or_else(|| {
// This should never happen, returning 0 is better than panicking, but we
// should still alert.
tracing::error!(uid = ?trade.order().uid, "could not compute sell_amount");
0.into()
});
order.buy = trade.buy_amount(&solution.prices, solution.weth).unwrap_or_else(|| {
// This should never happen, returning 0 is better than panicking, but we
// should still alert.
tracing::error!(uid = ?trade.order().uid, "could not compute buy_amount");
0.into()
});
}
acc
})
Expand Down
10 changes: 6 additions & 4 deletions crates/driver/src/domain/competition/solution/trade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ impl Fulfillment {
pub fn sell_amount(
&self,
prices: &HashMap<eth::TokenAddress, eth::U256>,
weth: eth::WethAddress,
) -> Option<eth::TokenAmount> {
let before_fee = match self.order.side {
order::Side::Sell => self.executed.0,
order::Side::Buy => self
.executed
.0
.checked_mul(*prices.get(&self.order.buy.token)?)?
.checked_div(*prices.get(&self.order.sell.token)?)?,
.checked_mul(*prices.get(&self.order.buy.token.wrap(weth))?)?
.checked_div(*prices.get(&self.order.sell.token.wrap(weth))?)?,
};
Some(eth::TokenAmount(
before_fee.checked_add(self.solver_fee().0)?,
Expand All @@ -110,14 +111,15 @@ impl Fulfillment {
pub fn buy_amount(
&self,
prices: &HashMap<eth::TokenAddress, eth::U256>,
weth: eth::WethAddress,
) -> Option<eth::TokenAmount> {
let amount = match self.order.side {
order::Side::Buy => self.executed.0,
order::Side::Sell => self
.executed
.0
.checked_mul(*prices.get(&self.order.sell.token)?)?
.checked_div(*prices.get(&self.order.buy.token)?)?,
.checked_mul(*prices.get(&self.order.sell.token.wrap(weth))?)?
.checked_div(*prices.get(&self.order.buy.token.wrap(weth))?)?,
};
Some(eth::TokenAmount(amount))
}
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/infra/api/routes/solve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn route(
.tap_err(|err| {
observe::invalid_dto(err, "auction");
})?;
tracing::debug!(elapsed=?start.elapsed(), auction_id=%auction_id, "auction task execution time");
tracing::debug!(elapsed = ?start.elapsed(), "auction task execution time");
let auction = state.pre_processor().prioritize(auction).await;
let competition = state.competition();
let result = competition.solve(&auction).await;
Expand Down
3 changes: 3 additions & 0 deletions crates/driver/src/infra/observe/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub struct Metrics {
/// The results of the quoting process.
#[metric(labels("solver", "result"))]
pub quotes: prometheus::IntCounterVec,
/// The results of the mempool submission.
#[metric(labels("mempool", "result"))]
pub mempool_submission: prometheus::IntCounterVec,
}

/// Setup the metrics registry.
Expand Down
9 changes: 9 additions & 0 deletions crates/driver/src/infra/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,15 @@ pub fn mempool_executed(
);
}
}
let result = match res {
Ok(_) => "Success",
Err(mempools::Error::Revert(_) | mempools::Error::SimulationRevert) => "Revert",
Err(mempools::Error::Other(_)) => "Other",
};
metrics::get()
.mempool_submission
.with_label_values(&[&mempool.to_string(), result])
.inc();
}

/// Observe that an invalid DTO was received.
Expand Down
Loading

0 comments on commit 27de708

Please sign in to comment.