From 69ae2f34c3f8f7e9b1aa09e74275fc636ea12186 Mon Sep 17 00:00:00 2001 From: sunce86 Date: Mon, 4 Nov 2024 12:03:14 +0100 Subject: [PATCH] Fix missing gaps in database --- src/database_solver_competition.rs | 12 +-- src/run.rs | 146 ++++++++++++++++++++++------- 2 files changed, 117 insertions(+), 41 deletions(-) diff --git a/src/database_solver_competition.rs b/src/database_solver_competition.rs index 0b0f0f8..be6a6fe 100644 --- a/src/database_solver_competition.rs +++ b/src/database_solver_competition.rs @@ -83,7 +83,6 @@ pub struct RichSolverCompetition { /// Entries are fetched going from higher auction_id to lower auction_id. pub async fn fetch_batch( ex: &mut PgConnection, - auction_id: i64, batch_size: i64, ) -> Result, sqlx::Error> { const QUERY: &str = r#" @@ -95,15 +94,12 @@ pub async fn fetch_batch( FROM solver_competitions sc LEFT JOIN settlement_scores ss ON sc.id = ss.auction_id LEFT JOIN surplus_capturing_jit_order_owners jit ON sc.id = jit.auction_id - WHERE sc.id < $1 + LEFT JOIN competition_auctions ca ON sc.id = ca.id + WHERE ca.id IS NULL ORDER BY sc.id DESC - LIMIT $2;"#; + LIMIT $1;"#; - sqlx::query_as(QUERY) - .bind(auction_id) - .bind(batch_size) - .fetch_all(ex) - .await + sqlx::query_as(QUERY).bind(batch_size).fetch_all(ex).await } #[derive(Debug, Clone, PartialEq, sqlx::FromRow)] diff --git a/src/run.rs b/src/run.rs index f2a9c33..d38cf08 100644 --- a/src/run.rs +++ b/src/run.rs @@ -5,7 +5,7 @@ use crate::{ }; use anyhow::{Context, Result}; use clap::Parser; -use std::{num::NonZero, ops::DerefMut}; +use std::num::NonZero; pub async fn start(args: impl Iterator) { let args = crate::arguments::Arguments::parse_from(args); @@ -14,57 +14,140 @@ pub async fn start(args: impl Iterator) { .await .unwrap(); - populate_historic_auctions(&db).await.unwrap(); + fix_missing_historic_auctions(&db).await.unwrap(); // sleep for 10 minutes std::thread::sleep(std::time::Duration::from_secs(600)); } -pub async fn populate_historic_auctions(db: &Postgres) -> Result<()> { - println!("starting data migration for auction data"); +// pub async fn populate_historic_auctions(db: &Postgres) -> Result<()> { +// println!("starting data migration for auction data"); + +// const BATCH_SIZE: i64 = 10; + +// let mut ex = db.pool.begin().await?; + +// // find entry in `competition_auctions` with the lowest auction_id, as a +// // starting point +// let current_auction_id: Option = +// sqlx::query_scalar::<_, Option>("SELECT MIN(id) FROM competition_auctions;") +// .fetch_one(ex.deref_mut()) +// .await +// .context("fetch lowest auction id")?; + +// let Some(mut current_auction_id) = current_auction_id else { +// println!("competition_auctions is empty, nothing to process"); +// return Ok(()); +// }; + +// let starting_auction_number = current_auction_id; + +// loop { +// // fetch the next batch of auctions +// let competitions: Vec = +// fetch_batch(&mut ex, BATCH_SIZE).await?; + +// if competitions.is_empty() { +// println!("no more auctions to process"); +// break; +// } + +// println!("processing {} auctions, first one: {}", competitions.len(), competitions.first().map(|c| c.id).unwrap_or(0)); + +// for solver_competition in &competitions { +// let competition: SolverCompetitionDB = +// serde_json::from_value(solver_competition.json.clone()) +// .context("deserialize SolverCompetitionDB")?; + +// // populate historic auctions +// let auction = Auction { +// id: solver_competition.id, +// block: i64::try_from(competition.auction_start_block).context("block overflow")?, +// deadline: solver_competition.deadline, +// order_uids: competition +// .auction +// .orders +// .iter() +// .map(|order| ByteArray(order.0)) +// .collect(), +// price_tokens: competition +// .auction +// .prices +// .keys() +// .map(|token| ByteArray(token.0)) +// .collect(), +// price_values: competition +// .auction +// .prices +// .values() +// .map(crate::database_solver_competition::u256_to_big_decimal) +// .collect(), +// surplus_capturing_jit_order_owners: solver_competition +// .surplus_capturing_jit_order_owners +// .clone(), +// }; + +// if let Err(err) = crate::database_solver_competition::save(&mut ex, auction).await { +// println!( +// "failed to save auction: {:?}, auction: {}", +// err, solver_competition.id +// ); +// } +// } + +// // commit each batch separately +// ex.commit().await?; + +// // sleep for 50ms +// std::thread::sleep(std::time::Duration::from_millis(50)); + +// ex = db.pool.begin().await?; + +// // update the current auction id +// current_auction_id = competitions.last().unwrap().id; +// } + +// Ok(()) +// } + +pub async fn fix_missing_historic_auctions(db: &Postgres) -> Result<()> { + println!("starting data migration fix for auction data"); const BATCH_SIZE: i64 = 10; let mut ex = db.pool.begin().await?; - // find entry in `competition_auctions` with the lowest auction_id, as a - // starting point - let current_auction_id: Option = - sqlx::query_scalar::<_, Option>("SELECT MIN(id) FROM competition_auctions;") - .fetch_one(ex.deref_mut()) - .await - .context("fetch lowest auction id")?; + // there is a gap of entries in `competition_auctions` that need to be filled - let Some(mut current_auction_id) = current_auction_id else { - println!("competition_auctions is empty, nothing to process"); - return Ok(()); - }; - - let starting_auction_number = current_auction_id; + // we identify this gap by looking at the `solver_competitions` table loop { - println!( - "populating historic auctions from auction {}, executed in percent: {}", - current_auction_id, - (starting_auction_number - current_auction_id) as f64 / starting_auction_number as f64 - * 100.0 - ); - // fetch the next batch of auctions - let competitions: Vec = - fetch_batch(&mut ex, current_auction_id, BATCH_SIZE).await?; + let competitions: Vec = fetch_batch(&mut ex, BATCH_SIZE).await?; if competitions.is_empty() { println!("no more auctions to process"); break; } - println!("processing {} auctions", competitions.len()); + println!( + "processing {} auctions, first one {}", + competitions.len(), + competitions.last().map(|c| c.id).unwrap_or(0) + ); for solver_competition in &competitions { - let competition: SolverCompetitionDB = - serde_json::from_value(solver_competition.json.clone()) - .context("deserialize SolverCompetitionDB")?; + let competition = + serde_json::from_value::(solver_competition.json.clone()) + .context("deserialize SolverCompetitionDB"); + + let Ok(competition) = competition else { + println!( + "failed to deserialize SolverCompetitionDB, auction: {}", + solver_competition.id + ); + continue; + }; // populate historic auctions let auction = Auction { @@ -109,9 +192,6 @@ pub async fn populate_historic_auctions(db: &Postgres) -> Result<()> { std::thread::sleep(std::time::Duration::from_millis(50)); ex = db.pool.begin().await?; - - // update the current auction id - current_auction_id = competitions.last().unwrap().id; } Ok(())