Skip to content

Commit

Permalink
Fix missing gaps in database
Browse files Browse the repository at this point in the history
  • Loading branch information
sunce86 committed Nov 4, 2024
1 parent 8a3f524 commit 69ae2f3
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 41 deletions.
12 changes: 4 additions & 8 deletions src/database_solver_competition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ pub struct RichSolverCompetition {
/// Entries are fetched going from higher auction_id to lower auction_id.
pub async fn fetch_batch(
ex: &mut PgConnection,
auction_id: i64,
batch_size: i64,
) -> Result<Vec<RichSolverCompetition>, sqlx::Error> {
const QUERY: &str = r#"
Expand All @@ -95,15 +94,12 @@ pub async fn fetch_batch(
FROM solver_competitions sc
LEFT JOIN settlement_scores ss ON sc.id = ss.auction_id
LEFT JOIN surplus_capturing_jit_order_owners jit ON sc.id = jit.auction_id
WHERE sc.id < $1
LEFT JOIN competition_auctions ca ON sc.id = ca.id
WHERE ca.id IS NULL
ORDER BY sc.id DESC
LIMIT $2;"#;
LIMIT $1;"#;

sqlx::query_as(QUERY)
.bind(auction_id)
.bind(batch_size)
.fetch_all(ex)
.await
sqlx::query_as(QUERY).bind(batch_size).fetch_all(ex).await
}

#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
Expand Down
146 changes: 113 additions & 33 deletions src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};
use anyhow::{Context, Result};
use clap::Parser;
use std::{num::NonZero, ops::DerefMut};
use std::num::NonZero;

pub async fn start(args: impl Iterator<Item = String>) {
let args = crate::arguments::Arguments::parse_from(args);
Expand All @@ -14,57 +14,140 @@ pub async fn start(args: impl Iterator<Item = String>) {
.await
.unwrap();

populate_historic_auctions(&db).await.unwrap();
fix_missing_historic_auctions(&db).await.unwrap();

// sleep for 10 minutes
std::thread::sleep(std::time::Duration::from_secs(600));
}

pub async fn populate_historic_auctions(db: &Postgres) -> Result<()> {
println!("starting data migration for auction data");
// pub async fn populate_historic_auctions(db: &Postgres) -> Result<()> {
// println!("starting data migration for auction data");

// const BATCH_SIZE: i64 = 10;

// let mut ex = db.pool.begin().await?;

// // find entry in `competition_auctions` with the lowest auction_id, as a
// // starting point
// let current_auction_id: Option<i64> =
// sqlx::query_scalar::<_, Option<i64>>("SELECT MIN(id) FROM competition_auctions;")
// .fetch_one(ex.deref_mut())
// .await
// .context("fetch lowest auction id")?;

// let Some(mut current_auction_id) = current_auction_id else {
// println!("competition_auctions is empty, nothing to process");
// return Ok(());
// };

// let starting_auction_number = current_auction_id;

// loop {
// // fetch the next batch of auctions
// let competitions: Vec<RichSolverCompetition> =
// fetch_batch(&mut ex, BATCH_SIZE).await?;

// if competitions.is_empty() {
// println!("no more auctions to process");
// break;
// }

// println!("processing {} auctions, first one: {}", competitions.len(), competitions.first().map(|c| c.id).unwrap_or(0));

// for solver_competition in &competitions {
// let competition: SolverCompetitionDB =
// serde_json::from_value(solver_competition.json.clone())
// .context("deserialize SolverCompetitionDB")?;

// // populate historic auctions
// let auction = Auction {
// id: solver_competition.id,
// block: i64::try_from(competition.auction_start_block).context("block overflow")?,
// deadline: solver_competition.deadline,
// order_uids: competition
// .auction
// .orders
// .iter()
// .map(|order| ByteArray(order.0))
// .collect(),
// price_tokens: competition
// .auction
// .prices
// .keys()
// .map(|token| ByteArray(token.0))
// .collect(),
// price_values: competition
// .auction
// .prices
// .values()
// .map(crate::database_solver_competition::u256_to_big_decimal)
// .collect(),
// surplus_capturing_jit_order_owners: solver_competition
// .surplus_capturing_jit_order_owners
// .clone(),
// };

// if let Err(err) = crate::database_solver_competition::save(&mut ex, auction).await {
// println!(
// "failed to save auction: {:?}, auction: {}",
// err, solver_competition.id
// );
// }
// }

// // commit each batch separately
// ex.commit().await?;

// // sleep for 50ms
// std::thread::sleep(std::time::Duration::from_millis(50));

// ex = db.pool.begin().await?;

// // update the current auction id
// current_auction_id = competitions.last().unwrap().id;
// }

// Ok(())
// }

pub async fn fix_missing_historic_auctions(db: &Postgres) -> Result<()> {
println!("starting data migration fix for auction data");

const BATCH_SIZE: i64 = 10;

let mut ex = db.pool.begin().await?;

// find entry in `competition_auctions` with the lowest auction_id, as a
// starting point
let current_auction_id: Option<i64> =
sqlx::query_scalar::<_, Option<i64>>("SELECT MIN(id) FROM competition_auctions;")
.fetch_one(ex.deref_mut())
.await
.context("fetch lowest auction id")?;
// there is a gap of entries in `competition_auctions` that need to be filled

let Some(mut current_auction_id) = current_auction_id else {
println!("competition_auctions is empty, nothing to process");
return Ok(());
};

let starting_auction_number = current_auction_id;
// we identify this gap by looking at the `solver_competitions` table

loop {
println!(
"populating historic auctions from auction {}, executed in percent: {}",
current_auction_id,
(starting_auction_number - current_auction_id) as f64 / starting_auction_number as f64
* 100.0
);

// fetch the next batch of auctions
let competitions: Vec<RichSolverCompetition> =
fetch_batch(&mut ex, current_auction_id, BATCH_SIZE).await?;
let competitions: Vec<RichSolverCompetition> = fetch_batch(&mut ex, BATCH_SIZE).await?;

if competitions.is_empty() {
println!("no more auctions to process");
break;
}

println!("processing {} auctions", competitions.len());
println!(
"processing {} auctions, first one {}",
competitions.len(),
competitions.last().map(|c| c.id).unwrap_or(0)
);

for solver_competition in &competitions {
let competition: SolverCompetitionDB =
serde_json::from_value(solver_competition.json.clone())
.context("deserialize SolverCompetitionDB")?;
let competition =
serde_json::from_value::<SolverCompetitionDB>(solver_competition.json.clone())
.context("deserialize SolverCompetitionDB");

let Ok(competition) = competition else {
println!(
"failed to deserialize SolverCompetitionDB, auction: {}",
solver_competition.id
);
continue;
};

// populate historic auctions
let auction = Auction {
Expand Down Expand Up @@ -109,9 +192,6 @@ pub async fn populate_historic_auctions(db: &Postgres) -> Result<()> {
std::thread::sleep(std::time::Duration::from_millis(50));

ex = db.pool.begin().await?;

// update the current auction id
current_auction_id = competitions.last().unwrap().id;
}

Ok(())
Expand Down

0 comments on commit 69ae2f3

Please sign in to comment.