From 1840dc5edc870eb515dab2bd53205892ea28a8c4 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Mon, 20 Jan 2025 16:28:23 +0700 Subject: [PATCH] UBERF-9172: Fix $lookup order by (#7714) --- pods/server/src/server.ts | 9 +- server/core/src/dbAdapterManager.ts | 4 +- server/postgres/src/index.ts | 2 +- server/postgres/src/storage.ts | 122 ++++++++++++++------------- server/postgres/src/utils.ts | 65 +++++++------- workers/transactor/package.json | 2 +- workers/transactor/src/transactor.ts | 44 +++++----- workers/transactor/wrangler.toml | 4 +- 8 files changed, 128 insertions(+), 124 deletions(-) diff --git a/pods/server/src/server.ts b/pods/server/src/server.ts index 3a45c340fca..88504849624 100644 --- a/pods/server/src/server.ts +++ b/pods/server/src/server.ts @@ -41,7 +41,7 @@ import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter, - setDbUnsafePrepareOptions + setDBExtraOptions } from '@hcengineering/postgres' import { readFileSync } from 'node:fs' const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[] @@ -83,11 +83,8 @@ export function start ( const usePrepare = process.env.DB_PREPARE === 'true' - setDbUnsafePrepareOptions({ - find: usePrepare, - model: false, - update: usePrepare, - upload: usePrepare + setDBExtraOptions({ + prepare: usePrepare // We override defaults }) registerServerPlugins() diff --git a/server/core/src/dbAdapterManager.ts b/server/core/src/dbAdapterManager.ts index 6119fe0e3ee..0bb863ea522 100644 --- a/server/core/src/dbAdapterManager.ts +++ b/server/core/src/dbAdapterManager.ts @@ -109,7 +109,7 @@ export class DbAdapterManagerImpl implements DBAdapterManager { } } - async initAdapters (): Promise { + async initAdapters (ctx: MeasureContext): Promise { for (const [key, adapter] of this.adapters) { // already initialized if (key !== this.conf.domains[DOMAIN_TX] && adapter.init !== undefined) { @@ -130,7 +130,7 @@ export class DbAdapterManagerImpl implements DBAdapterManager { } } } - await adapter?.init?.(this.metrics, domains, excludeDomains) + await ctx.with(`init adapter ${key}`, {}, (ctx) => adapter?.init?.(ctx, domains, excludeDomains)) } } } diff --git a/server/postgres/src/index.ts b/server/postgres/src/index.ts index 995a03a45c0..7d339eb9cea 100644 --- a/server/postgres/src/index.ts +++ b/server/postgres/src/index.ts @@ -19,7 +19,7 @@ import { getDBClient, retryTxn } from './utils' export { getDocFieldsByDomains, translateDomain } from './schemas' export * from './storage' -export { convertDoc, createTables, getDBClient, retryTxn, setDBExtraOptions, setDbUnsafePrepareOptions } from './utils' +export { convertDoc, createTables, getDBClient, retryTxn, setDBExtraOptions, setExtraOptions } from './utils' export function createPostgreeDestroyAdapter (url: string): WorkspaceDestroyAdapter { return { diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 3fdb7d997da..2af395a2bd9 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -80,8 +80,8 @@ import { createTables, DBCollectionHelper, type DBDoc, - dbUnsafePrepareOptions, getDBClient, + getPrepare, inferType, isDataField, isOwner, @@ -325,14 +325,15 @@ class ValuesVariables { add (value: any, type: string = ''): string { // Compact value if string and same - if (typeof value === 'string') { - const v = this.valueHashes.get(value + ':' + type) + if (typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') { + const vkey = `${value}:${type}` + const v = this.valueHashes.get(vkey) if (v !== undefined) { return v } this.values.push(value) const idx = type !== '' ? `$${this.index++}${type}` : `$${this.index++}` - this.valueHashes.set(value + ':' + type, idx) + this.valueHashes.set(vkey, idx) return idx } else { this.values.push(value) @@ -359,6 +360,33 @@ class ValuesVariables { }) return vv } + + injectVars (sql: string): string { + const escQuote = (d: any | any[]): string => { + if (d == null) { + return 'NULL' + } + if (Array.isArray(d)) { + return 'ARRAY[' + d.map(escQuote).join(',') + ']' + } + switch (typeof d) { + case 'number': + if (isNaN(d) || !isFinite(d)) { + throw new Error('Invalid number value') + } + return d.toString() + case 'boolean': + return d ? 'TRUE' : 'FALSE' + case 'string': + return `'${d.replace(/'/g, "''")}'` + default: + throw new Error(`Unsupported value type: ${typeof d}`) + } + } + return sql.replaceAll(/(\$\d+)/g, (_, v) => { + return escQuote(this.getValues()[parseInt(v.substring(1)) - 1] ?? v) + }) + } } abstract class PostgresAdapterBase implements DbAdapter { @@ -457,9 +485,7 @@ abstract class PostgresAdapterBase implements DbAdapter { } const finalSql: string = [select, ...sqlChunks].join(' ') const result: DBDoc[] = await this.mgr.retry(undefined, (client) => - client.unsafe(finalSql, vars.getValues(), { - prepare: dbUnsafePrepareOptions.find - }) + client.unsafe(finalSql, vars.getValues(), getPrepare()) ) return result.map((p) => parseDocWithProjection(p, domain, options?.projection)) } @@ -519,9 +545,7 @@ abstract class PostgresAdapterBase implements DbAdapter { const res = await client.unsafe( `SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`, vars.getValues(), - { - prepare: dbUnsafePrepareOptions.find - } + getPrepare() ) const docs = res.map((p) => parseDoc(p as any, schemaFields.schema)) for (const doc of docs) { @@ -553,9 +577,7 @@ abstract class PostgresAdapterBase implements DbAdapter { WHERE "workspaceId" = ${params.add(this.workspaceId.name, '::uuid')} AND _id = ${params.add(doc._id, '::text')}`, params.getValues(), - { - prepare: dbUnsafePrepareOptions.update - } + getPrepare() ) } }) @@ -593,9 +615,7 @@ abstract class PostgresAdapterBase implements DbAdapter { await client.unsafe( `UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE ${translatedQuery};`, vars.getValues(), - { - prepare: dbUnsafePrepareOptions.find - } + getPrepare() ) }) } @@ -604,9 +624,11 @@ abstract class PostgresAdapterBase implements DbAdapter { const vars = new ValuesVariables() const translatedQuery = this.buildRawQuery(vars, domain, query) await this.mgr.retry(undefined, async (client) => { - await client.unsafe(`DELETE FROM ${translateDomain(domain)} WHERE ${translatedQuery}`, vars.getValues(), { - prepare: dbUnsafePrepareOptions.update - }) + await client.unsafe( + `DELETE FROM ${translateDomain(domain)} WHERE ${translatedQuery}`, + vars.getValues(), + getPrepare() + ) }) } @@ -670,18 +692,15 @@ abstract class PostgresAdapterBase implements DbAdapter { if (options?.total === true) { const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}` const totalSql = [totalReq, ...totalSqlChunks].join(' ') - const totalResult = await connection.unsafe(totalSql, vars.getValues(), { - prepare: dbUnsafePrepareOptions.find - }) + const totalResult = await connection.unsafe(totalSql, vars.getValues(), getPrepare()) const parsed = Number.parseInt(totalResult[0].count) total = Number.isNaN(parsed) ? 0 : parsed } const finalSql: string = [select, ...sqlChunks].join(' ') fquery = finalSql - const result = await connection.unsafe(finalSql, vars.getValues(), { - prepare: dbUnsafePrepareOptions.find - }) + + const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare()) if ( options?.lookup === undefined && options?.domainLookup === undefined && @@ -697,7 +716,7 @@ abstract class PostgresAdapterBase implements DbAdapter { } })) as FindResult } catch (err) { - ctx.error('Error in findAll', { err }) + ctx.error('Error in findAll', { err, sql: vars.injectVars(fquery) }) throw err } }, @@ -1170,8 +1189,13 @@ abstract class PostgresAdapterBase implements DbAdapter { if (join.isReverse) { return `${join.toAlias}->'${tKey}'` } - const res = isDataField(domain, tKey) ? (isDataArray ? `data->'${tKey}'` : `data#>>'{${tKey}}'`) : key - return `${join.toAlias}.${res}` + if (isDataField(domain, tKey)) { + if (isDataArray) { + return `${join.toAlias}."data"->'${tKey}'` + } + return `${join.toAlias}."data"#>>'{${tKey}}'` + } + return `${join.toAlias}."${tKey}"` } private transformKey( @@ -1505,9 +1529,7 @@ abstract class PostgresAdapterBase implements DbAdapter { `SELECT * FROM ${translateDomain(domain)} WHERE "workspaceId" = $1::uuid AND _id = ANY($2::text[])`, [this.workspaceId.name, docs], - { - prepare: dbUnsafePrepareOptions.find - } + getPrepare() ) return res.map((p) => parseDocWithProjection(p as any, domain)) }) @@ -1562,9 +1584,7 @@ abstract class PostgresAdapterBase implements DbAdapter { `INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals} ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`, values.getValues(), - { - prepare: dbUnsafePrepareOptions.upload - } + getPrepare() ) ) } else { @@ -1574,9 +1594,7 @@ abstract class PostgresAdapterBase implements DbAdapter { await client.unsafe( `INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals};`, values.getValues(), - { - prepare: dbUnsafePrepareOptions.upload - } + getPrepare() ) ) } @@ -1598,9 +1616,7 @@ abstract class PostgresAdapterBase implements DbAdapter { client.unsafe( `DELETE FROM ${tdomain} WHERE "workspaceId" = $1 AND _id = ANY($2::text[])`, [this.workspaceId.name, part], - { - prepare: dbUnsafePrepareOptions.upload - } + getPrepare() ) ) }) @@ -1619,9 +1635,7 @@ abstract class PostgresAdapterBase implements DbAdapter { const vars = new ValuesVariables() const finalSql = `SELECT DISTINCT ${key} as ${field}, Count(*) AS count FROM ${translateDomain(domain)} WHERE ${this.buildRawQuery(vars, domain, query ?? {})} GROUP BY ${key}` return await this.mgr.retry(ctx.id, async (connection) => { - const result = await connection.unsafe(finalSql, vars.getValues(), { - prepare: dbUnsafePrepareOptions.find - }) + const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare()) return new Map(result.map((r) => [r[field.toLocaleLowerCase()], parseInt(r.count)])) }) } catch (err) { @@ -1722,9 +1736,7 @@ class PostgresAdapter extends PostgresAdapterBase { SET ${updates.join(', ')} WHERE "workspaceId" = ${wsId} AND _id = ${oId}`, params.getValues(), - { - prepare: dbUnsafePrepareOptions.update - } + getPrepare() ) }) }) @@ -1837,9 +1849,7 @@ class PostgresAdapter extends PostgresAdapterBase { WHERE "workspaceId" = ${wsId} AND _id = ${oId}`, params.getValues(), - { - prepare: dbUnsafePrepareOptions.update - } + getPrepare() ) }) if (tx.retrieve === true && doc !== undefined) { @@ -1928,11 +1938,7 @@ class PostgresAdapter extends PostgresAdapterBase { WHERE "workspaceId" = $1::uuid AND "_id" = update_data.__id` await this.mgr.retry(ctx.id, (client) => - ctx.with('bulk-update', {}, () => - client.unsafe(op, data, { - prepare: dbUnsafePrepareOptions.update - }) - ) + ctx.with('bulk-update', {}, () => client.unsafe(op, data, getPrepare())) ) } } @@ -1966,9 +1972,7 @@ class PostgresAdapter extends PostgresAdapterBase { forUpdate ? ' FOR UPDATE' : '' }`, [this.workspaceId.name, _id], - { - prepare: dbUnsafePrepareOptions.find - } + getPrepare() ) const dbDoc = res[0] as any return dbDoc !== undefined ? parseDoc(dbDoc, getSchema(domain)) : undefined @@ -2015,9 +2019,7 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter { async getModel (ctx: MeasureContext): Promise { const res: DBDoc[] = await this.mgr.retry(undefined, (client) => { return client.unsafe( - `SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = $1::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`, - [this.workspaceId.name], - { prepare: dbUnsafePrepareOptions.model } + `SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = '${this.workspaceId.name}'::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC` ) }) diff --git a/server/postgres/src/utils.ts b/server/postgres/src/utils.ts index 0b24125c060..2938070b22f 100644 --- a/server/postgres/src/utils.ts +++ b/server/postgres/src/utils.ts @@ -55,6 +55,8 @@ process.on('exit', () => { const clientRefs = new Map() const loadedDomains = new Set() +let loadedTables = new Set() + export async function retryTxn ( pool: postgres.Sql, operation: (client: postgres.TransactionSql) => Promise @@ -83,26 +85,30 @@ export async function createTables ( return } const mapped = filtered.map((p) => translateDomain(p)) - const tables = await ctx.with('load-table', {}, () => - client.unsafe( - ` + const t = Date.now() + loadedTables = + loadedTables.size === 0 + ? new Set( + ( + await ctx.with('load-table', {}, () => + client.unsafe(` SELECT table_name FROM information_schema.tables - WHERE table_name = ANY( $1::text[] ) - `, - [mapped] - ) - ) - - const exists = new Set(tables.map((it) => it.table_name)) - - const domainsToLoad = mapped.filter((it) => exists.has(it)) + WHERE table_schema NOT IN ('pg_catalog', 'information_schema') + AND table_name NOT LIKE 'pg_%'`) + ) + ).map((it) => it.table_name) + ) + : loadedTables + console.log('load-table', Date.now() - t) + + const domainsToLoad = mapped.filter((it) => loadedTables.has(it)) if (domainsToLoad.length > 0) { await ctx.with('load-schemas', {}, () => getTableSchema(client, domainsToLoad)) } const domainsToCreate: string[] = [] for (const domain of mapped) { - if (!exists.has(domain)) { + if (!loadedTables.has(domain)) { domainsToCreate.push(domain) } else { loadedDomains.add(url + domain) @@ -120,13 +126,10 @@ export async function createTables ( } async function getTableSchema (client: postgres.Sql, domains: string[]): Promise { - const res = await client.unsafe( - `SELECT column_name::name, data_type::text, is_nullable::text, table_name::name + const res = await client.unsafe(`SELECT column_name::name, data_type::text, is_nullable::text, table_name::name FROM information_schema.columns - WHERE table_name = ANY($1::text[]) and table_schema = 'public'::name - ORDER BY table_name::name, ordinal_position::int ASC;`, - [domains] - ) + WHERE table_name IN (${domains.map((it) => `'${it}'`).join(', ')}) and table_schema = 'public'::name + ORDER BY table_name::name, ordinal_position::int ASC;`) const schemas: Record = {} for (const column of res) { @@ -277,27 +280,25 @@ export class ClientRef implements PostgresClientReference { } } -let dbExtraOptions: Partial> = {} +export let dbExtraOptions: Partial> = {} export function setDBExtraOptions (options: Partial>): void { dbExtraOptions = options } -export interface DbUnsafePrepareOptions { - upload: boolean - find: boolean - update: boolean - model: boolean +export function getPrepare (): { prepare: boolean } { + return { prepare: dbExtraOptions.prepare ?? false } +} + +export interface DBExtraOptions { + useCF: boolean } -export let dbUnsafePrepareOptions: DbUnsafePrepareOptions = { - upload: true, - find: true, - update: true, - model: true +export let dbExtra: DBExtraOptions = { + useCF: false } -export function setDbUnsafePrepareOptions (options: DbUnsafePrepareOptions): void { - dbUnsafePrepareOptions = options +export function setExtraOptions (options: DBExtraOptions): void { + dbExtra = options } /** diff --git a/workers/transactor/package.json b/workers/transactor/package.json index 74c45a6b9aa..2d4fc98185c 100644 --- a/workers/transactor/package.json +++ b/workers/transactor/package.json @@ -5,7 +5,7 @@ "template": "cloud", "scripts": { "deploy": "wrangler deploy", - "dev": "wrangler dev --port 3335", + "dev": "wrangler dev --port 3335 --remote", "dev-local": "wrangler dev --port 3335 --local --upstream-protocol=http", "start": "wrangler dev --port 3335", "logs": "npx wrangler tail --format pretty", diff --git a/workers/transactor/src/transactor.ts b/workers/transactor/src/transactor.ts index 5b0a30ece7d..a1f64bd565a 100644 --- a/workers/transactor/src/transactor.ts +++ b/workers/transactor/src/transactor.ts @@ -38,8 +38,9 @@ import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter, + getDBClient, setDBExtraOptions, - setDbUnsafePrepareOptions + setExtraOptions } from '@hcengineering/postgres' import { createServerPipeline, @@ -75,13 +76,11 @@ export class Transactor extends DurableObject { ssl: false, connection: { application_name: 'cloud-transactor' - } + }, + prepare: false }) - setDbUnsafePrepareOptions({ - upload: false, - find: false, - update: false, - model: false + setExtraOptions({ + useCF: true }) registerTxAdapterFactory('postgresql', createPostgresTxAdapter, true) registerAdapterFactory('postgresql', createPostgresAdapter, true) @@ -105,23 +104,28 @@ export class Transactor extends DurableObject { console.log({ message: 'use stats', url: this.env.STATS_URL }) console.log({ message: 'use fulltext', url: this.env.FULLTEXT_URL }) + const dbUrl = env.DB_MODE === 'direct' ? env.DB_URL ?? '' : env.HYPERDRIVE.connectionString + // TODO: const storage = createDummyStorageAdapter() this.pipelineFactory = async (ctx, ws, upgrade, broadcast, branding) => { - const pipeline = createServerPipeline( - this.measureCtx, - env.DB_MODE === 'direct' ? env.DB_URL ?? '' : env.HYPERDRIVE.connectionString, - model, - { - externalStorage: storage, - adapterSecurity: false, - disableTriggers: false, - fulltextUrl: env.FULLTEXT_URL, - extraLogging: true - } - ) - return await pipeline(ctx, ws, upgrade, broadcast, branding) + const pipeline = createServerPipeline(this.measureCtx, dbUrl, model, { + externalStorage: storage, + adapterSecurity: false, + disableTriggers: false, + fulltextUrl: env.FULLTEXT_URL, + extraLogging: true + }) + const result = await pipeline(ctx, ws, upgrade, broadcast, branding) + + const client = getDBClient(dbUrl) + const connection = await client.getClient() + const t1 = Date.now() + await connection`select now()` + console.log('DB query time', Date.now() - t1) + client.close() + return result } void this.ctx diff --git a/workers/transactor/wrangler.toml b/workers/transactor/wrangler.toml index fc918979880..92cad4fa3d0 100644 --- a/workers/transactor/wrangler.toml +++ b/workers/transactor/wrangler.toml @@ -13,8 +13,8 @@ head_sampling_rate = 1 # optional. default = 1. # If you are running back-end logic in a Worker, running it closer to your back-end infrastructure # rather than the end user may result in better performance. # Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement -# [placement] -# mode = "smart" +[placement] +mode = "smart" # Variable bindings. These are arbitrary, plaintext strings (similar to environment variables) # Docs: