Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QFIX: Few PG optimisations and fixes. #7776

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -644,22 +644,22 @@ jobs:
run: |
./desktop-package/scripts/publish-version.sh
- name: Upload MacOS
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: Huly-MacOS-x64
path: ./desktop-package/deploy/Huly-macos-*-x64.dmg
- name: Upload MacOS arm64
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: Huly-MacOS-arm64
path: ./desktop-package/deploy/Huly-macos-*-arm64.dmg
- name: Upload Windows
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: Huly-Windows
path: ./desktop-package/deploy/Huly-windows-*.zip
- name: Upload Linux
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: Huly-Linux
path: ./desktop-package/deploy/Huly-linux-*.zip
15 changes: 13 additions & 2 deletions pods/backup/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import {
} from '@hcengineering/server-pipeline'
import { join } from 'path'

import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDBExtraOptions
} from '@hcengineering/postgres'
import { readFileSync } from 'node:fs'
import { createMongoTxAdapter, createMongoAdapter, createMongoDestroyAdapter } from '@hcengineering/mongo'
import { createPostgresTxAdapter, createPostgresAdapter, createPostgreeDestroyAdapter } from '@hcengineering/postgres'
const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[]

const metricsContext = initStatisticsContext('backup', {
Expand All @@ -51,6 +56,12 @@ const sentryDSN = process.env.SENTRY_DSN
configureAnalytics(sentryDSN, {})
Analytics.setTag('application', 'backup-service')

const usePrepare = process.env.DB_PREPARE === 'true'

setDBExtraOptions({
prepare: usePrepare // We override defaults
})

registerTxAdapterFactory('mongodb', createMongoTxAdapter)
registerAdapterFactory('mongodb', createMongoAdapter)
registerDestroyFactory('mongodb', createMongoDestroyAdapter)
Expand Down
13 changes: 12 additions & 1 deletion pods/fulltext/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ import {
} from '@hcengineering/middleware'
import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import { PlatformError, setMetadata, unknownError } from '@hcengineering/platform'
import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter } from '@hcengineering/postgres'
import {
createPostgreeDestroyAdapter,
createPostgresAdapter,
createPostgresTxAdapter,
setDBExtraOptions
} from '@hcengineering/postgres'
import serverClientPlugin, { getTransactorEndpoint, getWorkspaceInfo } from '@hcengineering/server-client'
import serverCore, {
createContentAdapter,
Expand Down Expand Up @@ -215,6 +220,12 @@ export async function startIndexer (
): Promise<() => void> {
const closeTimeout = 5 * 60 * 1000

const usePrepare = process.env.DB_PREPARE === 'true'

setDBExtraOptions({
prepare: usePrepare // We override defaults
})

setMetadata(serverToken.metadata.Secret, opt.serverSecret)
setMetadata(serverCore.metadata.ElasticIndexName, opt.elasticIndexName)
setMetadata(serverClientPlugin.metadata.Endpoint, opt.accountsUrl)
Expand Down
7 changes: 7 additions & 0 deletions pods/server/src/__start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { startHttpServer } from '@hcengineering/server-ws'
import { join } from 'path'
import { start } from '.'
import { profileStart, profileStop } from './inspector'
import { setDBExtraOptions } from '@hcengineering/postgres'

configureAnalytics(process.env.SENTRY_DSN, {})
Analytics.setTag('application', 'transactor')
Expand Down Expand Up @@ -58,6 +59,12 @@ setOperationLogProfiling(process.env.OPERATION_PROFILING === 'true')
const config = serverConfigFromEnv()
const storageConfig: StorageConfiguration = storageConfigFromEnv()

const usePrepare = process.env.DB_PREPARE === 'true'

setDBExtraOptions({
prepare: usePrepare // We override defaults
})

const lastNameFirst = process.env.LAST_NAME_FIRST === 'true'
setMetadata(contactPlugin.metadata.LastNameFirst, lastNameFirst)
setMetadata(serverCore.metadata.FrontUrl, config.frontUrl)
Expand Down
121 changes: 57 additions & 64 deletions server/postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ async function * createCursorGenerator (
}
} catch (err: any) {
console.error('failed to recieve data', { err })
throw err // Rethrow the error after logging
}
}

Expand Down Expand Up @@ -156,7 +157,11 @@ class ConnectionInfo {
throw err
} finally {
if (this.released) {
reserved?.release()
try {
reserved?.release()
} catch (err: any) {
console.error('failed to release', err)
}
} else {
// after use we put into available
if (reserved !== undefined) {
Expand All @@ -168,15 +173,19 @@ class ConnectionInfo {
const toRelease = this.available.splice(1, this.available.length - 1)

for (const r of toRelease) {
r.release()
try {
r.release()
} catch (err: any) {
console.error('failed to relase', err)
}
}
}
}
}
}

release (): void {
for (const c of this.available) {
for (const c of [...this.available]) {
c.release()
}
this.available = []
Expand Down Expand Up @@ -302,7 +311,11 @@ class ConnectionMgr {
([, it]: [string, ConnectionInfo]) => it.mgrId === this.mgrId
)) {
connections.delete(k)
conn.release()
try {
conn.release()
} catch (err: any) {
console.error('failed to release connection')
}
}
}

Expand Down Expand Up @@ -1336,7 +1349,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
case '$options':
break
case '$all':
res.push(`${tkey} @> ARRAY[${value}]`)
res.push(`${tkey} @> ${vars.addArray(value, inferType(value))}`)
break
default:
res.push(`${tkey} @> '[${JSON.stringify(value)}]'`)
Expand Down Expand Up @@ -1542,64 +1555,39 @@ abstract class PostgresAdapterBase implements DbAdapter {
return ctx.with('upload', { domain }, async (ctx) => {
const schemaFields = getSchemaAndFields(domain)
const filedsWithData = [...schemaFields.fields, 'data']
const insertFields: string[] = []
const onConflict: string[] = []
for (const field of filedsWithData) {
insertFields.push(`"${field}"`)
if (handleConflicts) {
onConflict.push(`"${field}" = EXCLUDED."${field}"`)
}
}

const insertFields = filedsWithData.map((field) => `"${field}"`)
const onConflict = handleConflicts ? filedsWithData.map((field) => `"${field}" = EXCLUDED."${field}"`) : []

const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')

try {
const toUpload = [...docs]
const tdomain = translateDomain(domain)
while (toUpload.length > 0) {
const part = toUpload.splice(0, 200)
const batchSize = 200
for (let i = 0; i < docs.length; i += batchSize) {
const part = docs.slice(i, i + batchSize)
const values = new ValuesVariables()
const vars: string[] = []
const wsId = values.add(this.workspaceId.name, '::uuid')
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const variables: string[] = []

for (const doc of part) {
if (!('%hash%' in doc) || doc['%hash%'] === '' || doc['%hash%'] == null) {
;(doc as any)['%hash%'] = this.curHash() // We need to set current hash
}
const d = convertDoc(domain, doc, this.workspaceId.name, schemaFields)
variables.push(wsId)
for (const field of schemaFields.fields) {
variables.push(values.add(d[field], `::${schemaFields.schema[field].type}`))
}
variables.push(values.add(d.data, '::json'))
const variables = [
wsId,
...schemaFields.fields.map((field) => values.add(d[field], `::${schemaFields.schema[field].type}`)),
values.add(d.data, '::json')
]
vars.push(`(${variables.join(', ')})`)
}

const vals = vars.join(',')
if (handleConflicts) {
await this.mgr.retry(
ctx.id,
async (client) =>
await client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
values.getValues(),
getPrepare()
)
)
} else {
await this.mgr.retry(
ctx.id,
async (client) =>
await client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals};`,
values.getValues(),
getPrepare()
)
)
}
const query = `INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals} ${
handleConflicts ? `ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr}` : ''
};`
await this.mgr.retry(ctx.id, async (client) => await client.unsafe(query, values.getValues(), getPrepare()))
}
} catch (err: any) {
ctx.error('failed to upload', { err })
Expand All @@ -1610,17 +1598,14 @@ abstract class PostgresAdapterBase implements DbAdapter {

async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
const tdomain = translateDomain(domain)
const toClean = [...docs]
while (toClean.length > 0) {
const part = toClean.splice(0, 2500)
const batchSize = 2500
const query = `DELETE FROM ${tdomain} WHERE "workspaceId" = $1 AND _id = ANY($2::text[])`

for (let i = 0; i < docs.length; i += batchSize) {
const part = docs.slice(i, i + batchSize)
await ctx.with('clean', {}, () => {
return this.mgr.retry(ctx.id, (client) =>
client.unsafe(
`DELETE FROM ${tdomain} WHERE "workspaceId" = $1 AND _id = ANY($2::text[])`,
[this.workspaceId.name, part],
getPrepare()
)
)
const params = [this.workspaceId.name, part]
return this.mgr.retry(ctx.id, (client) => client.unsafe(query, params, getPrepare()))
})
}
}
Expand All @@ -1635,10 +1620,16 @@ abstract class PostgresAdapterBase implements DbAdapter {
return ctx.with('groupBy', { domain }, async (ctx) => {
try {
const vars = new ValuesVariables()
const finalSql = `SELECT DISTINCT ${key} as ${field}, Count(*) AS count FROM ${translateDomain(domain)} WHERE ${this.buildRawQuery(vars, domain, query ?? {})} GROUP BY ${key}`
const sqlChunks: string[] = [
`SELECT ${key} as ${field}, Count(*) AS count`,
`FROM ${translateDomain(domain)}`,
`WHERE ${this.buildRawQuery(vars, domain, query ?? {})}`,
`GROUP BY ${key}`
]
const finalSql = sqlChunks.join(' ')
return await this.mgr.retry(ctx.id, async (connection) => {
const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare())
return new Map(result.map((r) => [r[field.toLocaleLowerCase()], parseInt(r.count)]))
return new Map(result.map((r) => [r[field.toLowerCase()], r.count]))
})
} catch (err) {
ctx.error('Error while grouping by', { domain, field })
Expand Down Expand Up @@ -1920,10 +1911,10 @@ class PostgresAdapter extends PostgresAdapterBase {
const result: TxResult[] = []
try {
const schema = getSchema(domain)
const updates = groupByArray(operations, (it) => it.fields.join(','))
for (const upds of updates.values()) {
while (upds.length > 0) {
const part = upds.splice(0, 200)
const groupedUpdates = groupByArray(operations, (it) => it.fields.join(','))
for (const groupedOps of groupedUpdates.values()) {
for (let i = 0; i < groupedOps.length; i += 200) {
const part = groupedOps.slice(i, i + 200)
let idx = 1
const indexes: string[] = []
const data: any[] = []
Expand Down Expand Up @@ -2021,7 +2012,9 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
async getModel (ctx: MeasureContext): Promise<Tx[]> {
const res: DBDoc[] = await this.mgr.retry(undefined, (client) => {
return client.unsafe(
`SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = '${this.workspaceId.name}'::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`
`SELECT * FROM "${translateDomain(DOMAIN_MODEL_TX)}" WHERE "workspaceId" = '${this.workspaceId.name}'::uuid ORDER BY _id::text ASC, "modifiedOn"::bigint ASC`,
undefined,
getPrepare()
)
})

Expand Down
5 changes: 5 additions & 0 deletions server/postgres/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ export function getDBClient (connectionString: string, database?: string): Postg
},
database,
max: 10,
min: 2,
connect_timeout: 10,
idle_timeout: 30,
max_lifetime: 300,
fetch_types: true,
transform: {
undefined: null
},
Expand Down
30 changes: 17 additions & 13 deletions server/workspace-service/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,24 @@ export function serveWorkspaceAccount (
brandings
)

void worker.start(
measureCtx,
{
errorHandler: async (ws, err) => {
Analytics.handleError(err)
void worker
.start(
measureCtx,
{
errorHandler: async (ws, err) => {
Analytics.handleError(err)
},
force: false,
console: false,
logs: 'upgrade-logs',
waitTimeout,
backup
},
force: false,
console: false,
logs: 'upgrade-logs',
waitTimeout,
backup
},
() => canceled
)
() => canceled
)
.catch((err) => {
measureCtx.error('failed to start', { err })
})

const close = (): void => {
canceled = true
Expand Down
Loading
Loading