Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii-grpc): chunk schema joins to avoid sqlite join limit #2839

Merged
merged 5 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions crates/torii/core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,155 @@
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub async fn fetch_entities(
pool: &Pool<sqlx::Sqlite>,
schemas: &[Ty],
table_name: &str,
model_relation_table: &str,
entity_relation_column: &str,
where_clause: Option<&str>,
having_clause: Option<&str>,
order_by: Option<&str>,
limit: Option<u32>,
offset: Option<u32>,
bind_values: Vec<String>,
) -> Result<(Vec<sqlx::sqlite::SqliteRow>, u32), Error> {
// Helper function to collect columns (existing implementation)
fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
match ty {
Ty::Struct(s) => {
for child in &s.children {
let new_path = if path.is_empty() {
child.name.clone()
} else {
format!("{}.{}", path, child.name)
};
collect_columns(table_prefix, &new_path, &child.ty, selections);
}
}
Ty::Tuple(t) => {
for (i, child) in t.iter().enumerate() {
let new_path =

Check warning on line 430 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L430

Added line #L430 was not covered by tests
if path.is_empty() { format!("{}", i) } else { format!("{}.{}", path, i) };
collect_columns(table_prefix, &new_path, child, selections);
}
}
Ty::Enum(e) => {
// Add the enum variant column with table prefix and alias
selections.push(format!("[{table_prefix}].[{path}] as \"{table_prefix}.{path}\"",));

// Add columns for each variant's value (if not empty tuple)
for option in &e.options {
if let Ty::Tuple(t) = &option.ty {
if t.is_empty() {
continue;
}
}
let variant_path = format!("{}.{}", path, option.name);
collect_columns(table_prefix, &variant_path, &option.ty, selections);

Check warning on line 447 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L447

Added line #L447 was not covered by tests
}
}
Ty::Array(_) | Ty::Primitive(_) | Ty::ByteArray(_) => {
selections.push(format!("[{table_prefix}].[{path}] as \"{table_prefix}.{path}\"",));
}
}
}
Comment on lines +415 to +454
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Extract duplicated helper function to a shared location.

The collect_columns helper function is duplicated from build_sql_query. Consider extracting it to a module-level function to maintain DRY principles.

+impl ModelSQLReader {
+    fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
+        // Move the existing implementation here
+    }
+}

-fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
-    // Remove duplicated implementation
-}

Committable suggestion skipped: line range outside the PR's diff.


const MAX_JOINS: usize = 64;
let schema_chunks = schemas.chunks(MAX_JOINS);
let mut total_count = 0;
let mut all_rows = Vec::new();

for chunk in schema_chunks {
let mut selections = Vec::new();
let mut joins = Vec::new();

// Add base table columns
selections.push(format!("{}.id", table_name));
selections.push(format!("{}.keys", table_name));
selections.push(format!("group_concat({model_relation_table}.model_id) as model_ids"));

// Process each model schema in the chunk
for model in chunk {
let model_table = model.name();
joins.push(format!(
"LEFT JOIN [{model_table}] ON {table_name}.id = \
[{model_table}].{entity_relation_column}"
));
collect_columns(&model_table, "", model, &mut selections);
}

joins.push(format!(
"JOIN {model_relation_table} ON {table_name}.id = {model_relation_table}.entity_id"
));

let selections_clause = selections.join(", ");
let joins_clause = joins.join(" ");

// Build count query
let count_query = format!(
"SELECT COUNT(*) FROM (SELECT {}.id, group_concat({}.model_id) as model_ids FROM [{}] \
{} {} GROUP BY {}.id {})",
table_name,
model_relation_table,
table_name,
joins_clause,
where_clause.map_or(String::new(), |w| format!(" WHERE {}", w)),
table_name,
having_clause.map_or(String::new(), |h| format!(" HAVING {}", h))
);

// Execute count query
let mut count_stmt = sqlx::query_scalar(&count_query);
for value in &bind_values {
count_stmt = count_stmt.bind(value);
}
let chunk_count: u32 = count_stmt.fetch_one(pool).await?;
total_count += chunk_count;

if chunk_count > 0 {
// Build main query
let mut query =
format!("SELECT {} FROM [{}] {}", selections_clause, table_name, joins_clause);

if let Some(where_clause) = where_clause {
query += &format!(" WHERE {}", where_clause);
}

query += &format!(" GROUP BY {table_name}.id");

if let Some(having_clause) = having_clause {
query += &format!(" HAVING {}", having_clause);
}

if let Some(order_clause) = order_by {
query += &format!(" ORDER BY {}", order_clause);

Check warning on line 524 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L524

Added line #L524 was not covered by tests
} else {
query += &format!(" ORDER BY {}.event_id DESC", table_name);
}

if let Some(limit) = limit {
query += &format!(" LIMIT {}", limit);
}

if let Some(offset) = offset {
query += &format!(" OFFSET {}", offset);

Check warning on line 534 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L534

Added line #L534 was not covered by tests
}

// Execute main query
let mut stmt = sqlx::query(&query);
for value in &bind_values {
stmt = stmt.bind(value);
}
let chunk_rows = stmt.fetch_all(pool).await?;
all_rows.extend(chunk_rows);
}

Check warning on line 544 in crates/torii/core/src/model.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/model.rs#L544

Added line #L544 was not covered by tests
}

Ok((all_rows, total_count))
}

#[cfg(test)]
mod tests {
use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty};
Expand Down
Loading
Loading