Skip to content

Commit

Permalink
Configurable order_events insert batch size (#2166)
Browse files Browse the repository at this point in the history
# Description
#2138 showed a reduction in
the insert time. The batch size config should now be configurable.

# Changes
Adds a config to the `Postgres` struct.

## How to test
Passes e2e tests
  • Loading branch information
squadgazzz authored Dec 13, 2023
1 parent 868e977 commit 0bdbfdc
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 56 deletions.
4 changes: 4 additions & 0 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub struct Arguments {
#[clap(long, env, default_value = "postgresql://")]
pub db_url: Url,

/// The number of order events to insert in a single batch.
#[clap(long, env, default_value = "500")]
pub order_events_insert_batch_size: NonZeroUsize,

/// Skip syncing past events (useful for local deployments)
#[clap(long, env, action = clap::ArgAction::Set, default_value = "false")]
pub skip_event_sync: bool,
Expand Down
40 changes: 30 additions & 10 deletions crates/autopilot/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,58 @@ pub mod recent_settlements;

use {
sqlx::{Executor, PgConnection, PgPool},
std::time::Duration,
std::{num::NonZeroUsize, time::Duration},
tracing::Instrument,
};

#[derive(Debug, Clone)]
pub struct Postgres(pub PgPool);
pub struct Config {
pub order_events_insert_batch_size: NonZeroUsize,
}

#[derive(Debug, Clone)]
pub struct Postgres {
pub pool: PgPool,
pub config: Config,
}

impl Postgres {
pub async fn new(url: &str) -> sqlx::Result<Self> {
Ok(Self(PgPool::connect(url).await?))
pub async fn new(
url: &str,
order_events_insert_batch_size: NonZeroUsize,
) -> sqlx::Result<Self> {
Ok(Self {
pool: PgPool::connect(url).await?,
config: Config {
order_events_insert_batch_size,
},
})
}

pub async fn with_defaults() -> sqlx::Result<Self> {
Self::new("postgresql://", NonZeroUsize::new(500).unwrap()).await
}

pub async fn update_database_metrics(&self) -> sqlx::Result<()> {
let metrics = Metrics::get();

// update table row metrics
for &table in database::TABLES {
let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
let count = count_rows_in_table(&mut ex, table).await?;
metrics.table_rows.with_label_values(&[table]).set(count);
}

// update table row metrics
for &table in database::LARGE_TABLES {
let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
let count = estimate_rows_in_table(&mut ex, table).await?;
metrics.table_rows.with_label_values(&[table]).set(count);
}

// update unused app data metric
{
let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
let count = count_unused_app_data(&mut ex).await?;
metrics.unused_app_data.set(count);
}
Expand All @@ -54,7 +74,7 @@ impl Postgres {

pub async fn update_large_tables_stats(&self) -> sqlx::Result<()> {
for &table in database::LARGE_TABLES {
let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
analyze_table(&mut ex, table).await?;
}

Expand Down Expand Up @@ -148,8 +168,8 @@ mod tests {
#[tokio::test]
#[ignore]
async fn postgres_count_rows_in_table_() {
let db = Postgres::new("postgresql://").await.unwrap();
let mut ex = db.0.begin().await.unwrap();
let db = Postgres::with_defaults().await.unwrap();
let mut ex = db.pool.begin().await.unwrap();
database::clear_DANGER_(&mut ex).await.unwrap();

let count = count_rows_in_table(&mut ex, "orders").await.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions crates/autopilot/src/database/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl QuoteStoring for Postgres {
.with_label_values(&["save_quote"])
.start_timer();

let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
let row = create_quote_row(data);
let id = database::quotes::save(&mut ex, &row).await?;
Ok(id)
Expand All @@ -41,7 +41,7 @@ impl QuoteStoring for Postgres {
.with_label_values(&["get_quote"])
.start_timer();

let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
let quote = database::quotes::get(&mut ex, id).await?;
quote.map(TryFrom::try_from).transpose()
}
Expand All @@ -56,7 +56,7 @@ impl QuoteStoring for Postgres {
.with_label_values(&["find_quote"])
.start_timer();

let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
let params = create_db_search_parameters(params, expiration);
let quote = database::quotes::find(&mut ex, &params)
.await
Expand All @@ -74,7 +74,7 @@ impl Postgres {
.with_label_values(&["solvable_orders"])
.start_timer();

let mut ex = self.0.begin().await?;
let mut ex = self.pool.begin().await?;
// Set the transaction isolation level to REPEATABLE READ
// so the both SELECT queries below are executed in the same database snapshot
// taken at the moment before the first query is executed.
Expand Down Expand Up @@ -103,7 +103,7 @@ impl Postgres {
.start_timer();

let data = serde_json::to_value(auction)?;
let mut ex = self.0.begin().await?;
let mut ex = self.pool.begin().await?;
database::auction::delete_all_auctions(&mut ex).await?;
let id = database::auction::save(&mut ex, &data).await?;
ex.commit().await?;
Expand Down
6 changes: 3 additions & 3 deletions crates/autopilot/src/database/auction_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl super::Postgres {
.with_label_values(&["update_settlement_tx_info"])
.start_timer();

let mut ex = self.0.acquire().await.context("acquire")?;
let mut ex = self.pool.acquire().await.context("acquire")?;
database::auction_transaction::insert_settlement_tx_info(
&mut ex,
block_number,
Expand All @@ -40,7 +40,7 @@ impl super::Postgres {
.with_label_values(&["get_settlement_event_without_tx_info"])
.start_timer();

let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
database::auction_transaction::get_settlement_event_without_tx_info(
&mut ex,
max_block_number,
Expand All @@ -58,7 +58,7 @@ impl super::Postgres {
.with_label_values(&["get_auction_id"])
.start_timer();

let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
database::auction_transaction::get_auction_id(&mut ex, &ByteArray(tx_from.0), tx_nonce)
.await
}
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/database/competition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl super::Postgres {

let json = &serde_json::to_value(&competition.competition_table)?;

let mut ex = self.0.begin().await.context("begin")?;
let mut ex = self.pool.begin().await.context("begin")?;

database::solver_competition::save(&mut ex, competition.auction_id, json)
.await
Expand Down
6 changes: 3 additions & 3 deletions crates/autopilot/src/database/ethflow_events/event_storing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type EthFlowEvent = contracts::cowswap_eth_flow::Event;
#[async_trait::async_trait]
impl EventStoring<EthFlowEvent> for Postgres {
async fn last_event_block(&self) -> Result<u64> {
let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
let block = database::ethflow_orders::last_indexed_block(&mut ex).await?;
Ok(block.unwrap_or_default() as u64)
}
Expand All @@ -52,7 +52,7 @@ impl EventStoring<EthFlowEvent> for Postgres {
.database_queries
.with_label_values(&["append_ethflow_refund_events"])
.start_timer();
let mut ex = self.0.begin().await?;
let mut ex = self.pool.begin().await?;
database::ethflow_orders::insert_refund_tx_hashes(&mut ex, &refunds).await?;
ex.commit().await?;
Ok(())
Expand All @@ -68,7 +68,7 @@ impl EventStoring<EthFlowEvent> for Postgres {
.database_queries
.with_label_values(&["replace_ethflow_refund_events"])
.start_timer();
let mut ex = self.0.begin().await?;
let mut ex = self.pool.begin().await?;
database::ethflow_orders::delete_refunds(
&mut ex,
*range.start() as i64,
Expand Down
6 changes: 3 additions & 3 deletions crates/autopilot/src/database/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl EventStoring<ContractEvent> for Postgres {
.with_label_values(&["last_event_block"])
.start_timer();

let mut con = self.0.acquire().await?;
let mut con = self.pool.acquire().await?;
let block_number = database::events::last_block(&mut con)
.await
.context("block_number_of_most_recent_event failed")?;
Expand All @@ -66,7 +66,7 @@ impl EventStoring<ContractEvent> for Postgres {
.start_timer();

let events = contract_to_db_events(events)?;
let mut transaction = self.0.begin().await?;
let mut transaction = self.pool.begin().await?;
database::events::append(&mut transaction, &events)
.await
.context("append_events")?;
Expand All @@ -85,7 +85,7 @@ impl EventStoring<ContractEvent> for Postgres {
.start_timer();

let events = contract_to_db_events(events)?;
let mut transaction = self.0.begin().await?;
let mut transaction = self.pool.begin().await?;
database::events::delete(&mut transaction, *range.start() as i64)
.await
.context("delete_events failed")?;
Expand Down
15 changes: 11 additions & 4 deletions crates/autopilot/src/database/onchain_order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>
.with_label_values(&["replace_onchain_order_events"])
.start_timer();

let mut transaction = self.db.0.begin().await?;
let mut transaction = self.db.pool.begin().await?;

database::onchain_broadcasted_orders::mark_as_reorged(
&mut transaction,
Expand Down Expand Up @@ -252,7 +252,7 @@ impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>
.database_queries
.with_label_values(&["append_onchain_order_events"])
.start_timer();
let mut transaction = self.db.0.begin().await?;
let mut transaction = self.db.pool.begin().await?;

database::onchain_invalidations::insert_onchain_invalidations(
&mut transaction,
Expand Down Expand Up @@ -296,7 +296,7 @@ impl<T: Sync + Send + Clone, W: Sync + Send + Clone> EventStoring<ContractEvent>
.with_label_values(&["last_event_block"])
.start_timer();

let mut con = self.db.0.acquire().await?;
let mut con = self.db.pool.acquire().await?;
let block_number = database::onchain_broadcasted_orders::last_block(&mut con)
.await
.context("block_number_of_most_recent_event failed")?;
Expand Down Expand Up @@ -724,6 +724,7 @@ impl Metrics {
mod test {
use {
super::*,
crate::database::Config,
contracts::cowswap_onchain_orders::event_data::OrderPlacement as ContractOrderPlacement,
database::{byte_array::ByteArray, onchain_broadcasted_orders::OnchainOrderPlacement},
ethcontract::{Bytes, EventMetadata, H160, U256},
Expand All @@ -748,6 +749,7 @@ mod test {
order_quoting::{FindQuoteError, MockOrderQuoting, Quote, QuoteData},
},
sqlx::PgPool,
std::num::NonZeroUsize,
};

#[test]
Expand Down Expand Up @@ -1372,7 +1374,12 @@ mod test {
.returning(|_, _, _, _| 1u8);
let web3 = Web3::new(create_env_test_transport());
let onchain_order_parser = OnchainOrderParser {
db: Postgres(PgPool::connect_lazy("postgresql://").unwrap()),
db: Postgres {
pool: PgPool::connect_lazy("postgresql://").unwrap(),
config: Config {
order_events_insert_batch_size: NonZeroUsize::new(500).unwrap(),
},
},
web3,
quoter: Arc::new(order_quoter),
custom_onchain_data_parser: Box::new(custom_onchain_order_parser),
Expand Down
10 changes: 3 additions & 7 deletions crates/autopilot/src/database/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,17 @@ impl super::Postgres {

/// Deletes events before the provided timestamp.
pub async fn delete_order_events_before(&self, timestamp: DateTime<Utc>) -> Result<u64, Error> {
order_events::delete_order_events_before(&self.0, timestamp).await
order_events::delete_order_events_before(&self.pool, timestamp).await
}
}

// Temporarily hardcoded. Will migrate to a config file in case the batches
// approach has a positive impact.
const DEFAULT_BATCH_SIZE: usize = 500;

async fn store_order_events(
db: &super::Postgres,
events: &[(OrderUid, OrderEventLabel)],
timestamp: DateTime<Utc>,
) -> Result<()> {
let mut ex = db.0.begin().await.context("begin transaction")?;
for chunk in events.chunks(DEFAULT_BATCH_SIZE) {
let mut ex = db.pool.begin().await.context("begin transaction")?;
for chunk in events.chunks(db.config.order_events_insert_batch_size.get()) {
let batch = chunk.iter().map(|(uid, label)| OrderEvent {
order_uid: ByteArray(uid.0),
timestamp,
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/database/quotes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl Postgres {
.with_label_values(&["remove_expired_quotes"])
.start_timer();

let mut ex = self.0.acquire().await?;
let mut ex = self.pool.acquire().await?;
database::quotes::remove_expired_quotes(&mut ex, max_expiry).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/autopilot/src/database/recent_settlements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl super::Postgres {
.with_label_values(&["recent_settlement_tx_hashes"])
.start_timer();

let mut ex = self.0.acquire().await.context("acquire")?;
let mut ex = self.pool.acquire().await.context("acquire")?;
let hashes = database::settlements::recent_settlement_tx_hashes(&mut ex, block_range)
.await
.context("recent_settlement_tx_hashes")?;
Expand Down
Loading

0 comments on commit 0bdbfdc

Please sign in to comment.