Skip to content

Commit

Permalink
UBERF-9172: Fix $lookup order by
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo committed Jan 20, 2025
1 parent 46084e0 commit ec8859b
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 124 deletions.
9 changes: 3 additions & 6 deletions pods/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDbUnsafePrepareOptions
setDBExtraOptions
} from '@hcengineering/postgres'
import { readFileSync } from 'node:fs'
const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[]
Expand Down Expand Up @@ -83,11 +83,8 @@ export function start (

const usePrepare = process.env.DB_PREPARE === 'true'

setDbUnsafePrepareOptions({
find: usePrepare,
model: false,
update: usePrepare,
upload: usePrepare
setDBExtraOptions({
prepare: usePrepare // We override defaults
})

registerServerPlugins()
Expand Down
4 changes: 2 additions & 2 deletions server/core/src/dbAdapterManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
}
}

async initAdapters (): Promise<void> {
async initAdapters (ctx: MeasureContext): Promise<void> {
for (const [key, adapter] of this.adapters) {
// already initialized
if (key !== this.conf.domains[DOMAIN_TX] && adapter.init !== undefined) {
Expand All @@ -130,7 +130,7 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
}
}
}
await adapter?.init?.(this.metrics, domains, excludeDomains)
await ctx.with(`init adapter ${key}`, {}, (ctx) => adapter?.init?.(ctx, domains, excludeDomains))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { getDBClient, retryTxn } from './utils'

export { getDocFieldsByDomains, translateDomain } from './schemas'
export * from './storage'
export { convertDoc, createTables, getDBClient, retryTxn, setDBExtraOptions, setDbUnsafePrepareOptions } from './utils'
export { convertDoc, createTables, getDBClient, retryTxn, setDBExtraOptions, setExtraOptions } from './utils'

export function createPostgreeDestroyAdapter (url: string): WorkspaceDestroyAdapter {
return {
Expand Down
122 changes: 62 additions & 60 deletions server/postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ import {
createTables,
DBCollectionHelper,
type DBDoc,
dbUnsafePrepareOptions,
getDBClient,
getPrepare,
inferType,
isDataField,
isOwner,
Expand Down Expand Up @@ -325,14 +325,15 @@ class ValuesVariables {

add (value: any, type: string = ''): string {
// Compact value if string and same
if (typeof value === 'string') {
const v = this.valueHashes.get(value + ':' + type)
if (typeof value === 'string' || typeof value === 'number' || typeof value === 'boolean') {
const vkey = `${value}:${type}`
const v = this.valueHashes.get(vkey)
if (v !== undefined) {
return v
}
this.values.push(value)
const idx = type !== '' ? `$${this.index++}${type}` : `$${this.index++}`
this.valueHashes.set(value + ':' + type, idx)
this.valueHashes.set(vkey, idx)
return idx
} else {
this.values.push(value)
Expand All @@ -359,6 +360,33 @@ class ValuesVariables {
})
return vv
}

injectVars (sql: string): string {
const escQuote = (d: any | any[]): string => {
if (d == null) {
return 'NULL'
}
if (Array.isArray(d)) {
return 'ARRAY[' + d.map(escQuote).join(',') + ']'
}
switch (typeof d) {
case 'number':
if (isNaN(d) || !isFinite(d)) {
throw new Error('Invalid number value')
}
return d.toString()
case 'boolean':
return d ? 'TRUE' : 'FALSE'
case 'string':
return `'${d.replace(/'/g, "''")}'`
default:
throw new Error(`Unsupported value type: ${typeof d}`)
}
}
return sql.replaceAll(/(\$\d+)/g, (_, v) => {
return escQuote(this.getValues()[parseInt(v.substring(1)) - 1] ?? v)
})
}
}

abstract class PostgresAdapterBase implements DbAdapter {
Expand Down Expand Up @@ -457,9 +485,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const finalSql: string = [select, ...sqlChunks].join(' ')
const result: DBDoc[] = await this.mgr.retry(undefined, (client) =>
client.unsafe(finalSql, vars.getValues(), {
prepare: dbUnsafePrepareOptions.find
})
client.unsafe(finalSql, vars.getValues(), getPrepare())
)
return result.map((p) => parseDocWithProjection(p, domain, options?.projection))
}
Expand Down Expand Up @@ -519,9 +545,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const res = await client.unsafe(
`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`,
vars.getValues(),
{
prepare: dbUnsafePrepareOptions.find
}
getPrepare()
)
const docs = res.map((p) => parseDoc(p as any, schemaFields.schema))
for (const doc of docs) {
Expand Down Expand Up @@ -553,9 +577,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
WHERE "workspaceId" = ${params.add(this.workspaceId.name, '::uuid')}
AND _id = ${params.add(doc._id, '::text')}`,
params.getValues(),
{
prepare: dbUnsafePrepareOptions.update
}
getPrepare()
)
}
})
Expand Down Expand Up @@ -593,9 +615,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
await client.unsafe(
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE ${translatedQuery};`,
vars.getValues(),
{
prepare: dbUnsafePrepareOptions.find
}
getPrepare()
)
})
}
Expand All @@ -604,9 +624,11 @@ abstract class PostgresAdapterBase implements DbAdapter {
const vars = new ValuesVariables()
const translatedQuery = this.buildRawQuery(vars, domain, query)
await this.mgr.retry(undefined, async (client) => {
await client.unsafe(`DELETE FROM ${translateDomain(domain)} WHERE ${translatedQuery}`, vars.getValues(), {
prepare: dbUnsafePrepareOptions.update
})
await client.unsafe(
`DELETE FROM ${translateDomain(domain)} WHERE ${translatedQuery}`,
vars.getValues(),
getPrepare()
)
})
}

Expand Down Expand Up @@ -670,18 +692,15 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (options?.total === true) {
const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}`
const totalSql = [totalReq, ...totalSqlChunks].join(' ')
const totalResult = await connection.unsafe(totalSql, vars.getValues(), {
prepare: dbUnsafePrepareOptions.find
})
const totalResult = await connection.unsafe(totalSql, vars.getValues(), getPrepare())
const parsed = Number.parseInt(totalResult[0].count)
total = Number.isNaN(parsed) ? 0 : parsed
}

const finalSql: string = [select, ...sqlChunks].join(' ')
fquery = finalSql
const result = await connection.unsafe(finalSql, vars.getValues(), {
prepare: dbUnsafePrepareOptions.find
})

const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare())
if (
options?.lookup === undefined &&
options?.domainLookup === undefined &&
Expand All @@ -697,7 +716,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
})) as FindResult<T>
} catch (err) {
ctx.error('Error in findAll', { err })
ctx.error('Error in findAll', { err, sql: vars.injectVars(fquery) })
throw err
}
},
Expand Down Expand Up @@ -1170,8 +1189,13 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (join.isReverse) {
return `${join.toAlias}->'${tKey}'`
}
const res = isDataField(domain, tKey) ? (isDataArray ? `data->'${tKey}'` : `data#>>'{${tKey}}'`) : key
return `${join.toAlias}.${res}`
if (isDataField(domain, tKey)) {
if (isDataArray) {
return `${join.toAlias}."data"->'${tKey}'`
}
return `${join.toAlias}."data"#>>'{${tKey}}'`
}
return `${join.toAlias}."${tKey}"`
}

private transformKey<T extends Doc>(
Expand Down Expand Up @@ -1505,9 +1529,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
`SELECT * FROM ${translateDomain(domain)}
WHERE "workspaceId" = $1::uuid AND _id = ANY($2::text[])`,
[this.workspaceId.name, docs],
{
prepare: dbUnsafePrepareOptions.find
}
getPrepare()
)
return res.map((p) => parseDocWithProjection(p as any, domain))
})
Expand Down Expand Up @@ -1562,9 +1584,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
values.getValues(),
{
prepare: dbUnsafePrepareOptions.upload
}
getPrepare()
)
)
} else {
Expand All @@ -1574,9 +1594,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
await client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals};`,
values.getValues(),
{
prepare: dbUnsafePrepareOptions.upload
}
getPrepare()
)
)
}
Expand All @@ -1598,9 +1616,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
client.unsafe(
`DELETE FROM ${tdomain} WHERE "workspaceId" = $1 AND _id = ANY($2::text[])`,
[this.workspaceId.name, part],
{
prepare: dbUnsafePrepareOptions.upload
}
getPrepare()
)
)
})
Expand All @@ -1619,9 +1635,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const vars = new ValuesVariables()
const finalSql = `SELECT DISTINCT ${key} as ${field}, Count(*) AS count FROM ${translateDomain(domain)} WHERE ${this.buildRawQuery(vars, domain, query ?? {})} GROUP BY ${key}`
return await this.mgr.retry(ctx.id, async (connection) => {
const result = await connection.unsafe(finalSql, vars.getValues(), {
prepare: dbUnsafePrepareOptions.find
})
const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare())
return new Map(result.map((r) => [r[field.toLocaleLowerCase()], parseInt(r.count)]))
})
} catch (err) {
Expand Down Expand Up @@ -1722,9 +1736,7 @@ class PostgresAdapter extends PostgresAdapterBase {
SET ${updates.join(', ')}
WHERE "workspaceId" = ${wsId} AND _id = ${oId}`,
params.getValues(),
{
prepare: dbUnsafePrepareOptions.update
}
getPrepare()
)
})
})
Expand Down Expand Up @@ -1837,9 +1849,7 @@ class PostgresAdapter extends PostgresAdapterBase {
WHERE "workspaceId" = ${wsId}
AND _id = ${oId}`,
params.getValues(),
{
prepare: dbUnsafePrepareOptions.update
}
getPrepare()
)
})
if (tx.retrieve === true && doc !== undefined) {
Expand Down Expand Up @@ -1928,11 +1938,7 @@ class PostgresAdapter extends PostgresAdapterBase {
WHERE "workspaceId" = $1::uuid AND "_id" = update_data.__id`

await this.mgr.retry(ctx.id, (client) =>
ctx.with('bulk-update', {}, () =>
client.unsafe(op, data, {
prepare: dbUnsafePrepareOptions.update
})
)
ctx.with('bulk-update', {}, () => client.unsafe(op, data, getPrepare()))
)
}
}
Expand Down Expand Up @@ -1966,9 +1972,7 @@ class PostgresAdapter extends PostgresAdapterBase {
forUpdate ? ' FOR UPDATE' : ''
}`,
[this.workspaceId.name, _id],
{
prepare: dbUnsafePrepareOptions.find
}
getPrepare()
)
const dbDoc = res[0] as any
return dbDoc !== undefined ? parseDoc(dbDoc, getSchema(domain)) : undefined
Expand Down Expand Up @@ -2015,9 +2019,7 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
async getModel (ctx: MeasureContext): Promise<Tx[]> {
const res: DBDoc[] = await this.mgr.retry(undefined, (client) => {
return client.unsafe(
`SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = $1::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`,
[this.workspaceId.name],
{ prepare: dbUnsafePrepareOptions.model }
`SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = '${this.workspaceId.name}'::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`
)
})

Expand Down
Loading

0 comments on commit ec8859b

Please sign in to comment.