From 5ace5b744b04ac76db81812e4545b15ace92f9a8 Mon Sep 17 00:00:00 2001 From: Alexander Onnikov Date: Wed, 22 Jan 2025 18:30:53 +0700 Subject: [PATCH 1/4] UBERF-9226 Workspace reindex tool (#7756) Signed-off-by: Alexander Onnikov --- dev/tool/src/fulltext.ts | 33 ++++++++++++++++++++++++ dev/tool/src/index.ts | 37 +++++++++++++++++++-------- dev/tool/src/workspace.ts | 14 ---------- pods/fulltext/src/server.ts | 32 +++++++++++++++++++++++ server/indexer/src/indexer/indexer.ts | 14 ++++++++++ 5 files changed, 106 insertions(+), 24 deletions(-) create mode 100644 dev/tool/src/fulltext.ts diff --git a/dev/tool/src/fulltext.ts b/dev/tool/src/fulltext.ts new file mode 100644 index 00000000000..fc316739161 --- /dev/null +++ b/dev/tool/src/fulltext.ts @@ -0,0 +1,33 @@ +// +// Copyright © 2025 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type MeasureContext } from '@hcengineering/core' + +export async function reindexWorkspace (ctx: MeasureContext, fulltextUrl: string, token: string): Promise { + try { + const res = await fetch(fulltextUrl + '/api/v1/reindex', { + method: 'PUT', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ token }) + }) + if (!res.ok) { + throw new Error(`HTTP Error ${res.status} ${res.statusText}`) + } + } catch (err: any) { + ctx.error('failed to reset index', { err }) + } +} diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 4095fac8cd8..a6dac6c52b5 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -77,7 +77,7 @@ import { buildStorageFromConfig, createStorageFromConfig, storageConfigFromEnv } import { program, type Command } from 'commander' import { addControlledDocumentRank } from './qms' import { clearTelegramHistory } from './telegram' -import { diffWorkspace, recreateElastic, updateField } from './workspace' +import { diffWorkspace, updateField } from './workspace' import core, { AccountRole, @@ -149,6 +149,7 @@ import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { fixAccountEmails, renameAccount } from './renameAccount' import { copyToDatalake, moveFiles, showLostFiles } from './storage' import { createPostgresTxAdapter, createPostgresAdapter, createPostgreeDestroyAdapter } from '@hcengineering/postgres' +import { reindexWorkspace } from './fulltext' const colorConstants = { colorRed: '\u001b[31m', @@ -1925,27 +1926,43 @@ export function devTool ( ) program - .command('recreate-elastic-indexes-mongo ') - .description('reindex workspace to elastic') + .command('fulltext-reindex ') + .description('reindex workspace') .action(async (workspace: string) => { - const mongodbUri = getMongoDBUrl() + const fulltextUrl = process.env.FULLTEXT_URL + if (fulltextUrl === undefined) { + console.error('please provide FULLTEXT_URL') + process.exit(1) + } + const wsid = getWorkspaceId(workspace) - await recreateElastic(mongodbUri, wsid) + const token = generateToken(systemAccountEmail, wsid) + + console.log('reindex workspace', workspace) + await reindexWorkspace(toolCtx, fulltextUrl, token) + console.log('done', workspace) }) program - .command('recreate-all-elastic-indexes-mongo') - .description('reindex elastic') + .command('fulltext-reindex-all') + .description('reindex workspaces') .action(async () => { - const { dbUrl } = prepareTools() - const mongodbUri = getMongoDBUrl() + const fulltextUrl = process.env.FULLTEXT_URL + if (fulltextUrl === undefined) { + console.error('please provide FULLTEXT_URL') + process.exit(1) + } await withAccountDatabase(async (db) => { const workspaces = await listWorkspacesRaw(db) workspaces.sort((a, b) => b.lastVisit - a.lastVisit) for (const workspace of workspaces) { const wsid = getWorkspaceId(workspace.workspace) - await recreateElastic(mongodbUri ?? dbUrl, wsid) + const token = generateToken(systemAccountEmail, wsid) + + console.log('reindex workspace', workspace) + await reindexWorkspace(toolCtx, fulltextUrl, token) + console.log('done', workspace) } }) }) diff --git a/dev/tool/src/workspace.ts b/dev/tool/src/workspace.ts index 98523116682..d704a8d3092 100644 --- a/dev/tool/src/workspace.ts +++ b/dev/tool/src/workspace.ts @@ -20,7 +20,6 @@ import core, { type Class, type Client as CoreClient, type Doc, - DOMAIN_DOC_INDEX_STATE, DOMAIN_TX, type Ref, type Tx, @@ -96,16 +95,3 @@ export async function updateField ( await connection.close() } } - -export async function recreateElastic (mongoUrl: string, workspaceId: WorkspaceId): Promise { - const client = getMongoClient(mongoUrl) - const _client = await client.getClient() - try { - const db = getWorkspaceMongoDB(_client, workspaceId) - await db - .collection(DOMAIN_DOC_INDEX_STATE) - .updateMany({ _class: core.class.DocIndexState }, { $set: { needIndex: true } }) - } finally { - client.close() - } -} diff --git a/pods/fulltext/src/server.ts b/pods/fulltext/src/server.ts index 93fc5e6d31b..605a28c37ed 100644 --- a/pods/fulltext/src/server.ts +++ b/pods/fulltext/src/server.ts @@ -161,6 +161,14 @@ class WorkspaceIndexer { return result } + async reindex (): Promise { + await this.fulltext.cancel() + await this.fulltext.clearIndex() + await this.fulltext.startIndexing(() => { + this.lastUpdate = Date.now() + }) + } + async close (): Promise { await this.fulltext.cancel() await this.pipeline.close() @@ -188,6 +196,10 @@ interface Search { fullTextLimit: number } +interface Reindex { + token: string +} + export async function startIndexer ( ctx: MeasureContext, opt: { @@ -391,6 +403,26 @@ export async function startIndexer ( } }) + router.put('/api/v1/reindex', async (req, res) => { + try { + const request = req.request.body as Reindex + const decoded = decodeToken(request.token) // Just to be safe + req.body = {} + + ctx.info('reindex', { workspace: decoded.workspace }) + const indexer = await getIndexer(ctx, decoded.workspace, request.token, true) + if (indexer !== undefined) { + indexer.lastUpdate = Date.now() + await indexer.reindex() + } + } catch (err: any) { + Analytics.handleError(err) + console.error(err) + req.res.writeHead(404, {}) + req.res.end() + } + }) + app.use(router.routes()).use(router.allowedMethods()) const server = app.listen(opt.port, () => { diff --git a/server/indexer/src/indexer/indexer.ts b/server/indexer/src/indexer/indexer.ts index 0777e7445cc..60fca5e5d78 100644 --- a/server/indexer/src/indexer/indexer.ts +++ b/server/indexer/src/indexer/indexer.ts @@ -165,6 +165,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { triggerIndexing = (): void => {} async startIndexing (indexing: () => void): Promise { + this.cancelling = false this.verify = this.verifyWorkspace(this.metrics, indexing) void this.verify.then(() => { this.indexing = this.doIndexing(indexing) @@ -282,6 +283,19 @@ export class FullTextIndexPipeline implements FullTextPipeline { } } + async clearIndex (): Promise { + const ctx = this.metrics + const migrations = await this.storage.findAll(ctx, core.class.MigrationState, { + plugin: coreId, + state: { + $in: ['verify-indexes-v2', 'full-text-indexer-v4', 'full-text-structure-v4'] + } + }) + + const refs = migrations.map((it) => it._id) + await this.storage.clean(ctx, DOMAIN_MIGRATION, refs) + } + broadcastClasses = new Set>>() broadcasts: number = 0 From 82a92040763120ff021662e197c39706418b0bdb Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 22 Jan 2025 19:22:50 +0700 Subject: [PATCH 2/4] QFIX: PG query (#7758) Signed-off-by: Andrey Sobolev --- server/postgres/src/storage.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 09f3915608e..e8b3c4fe793 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -80,6 +80,7 @@ import { createTables, DBCollectionHelper, type DBDoc, + dbExtra, getDBClient, getPrepare, inferType, @@ -673,7 +674,7 @@ abstract class PostgresAdapterBase implements DbAdapter { sqlChunks.push(secJoin) } if (joins.length > 0) { - sqlChunks.push(this.buildJoinString(joins)) + sqlChunks.push(this.buildJoinString(vars, joins)) } sqlChunks.push(`WHERE ${this.buildQuery(vars, _class, domain, query, joins, options)}`) @@ -699,7 +700,9 @@ abstract class PostgresAdapterBase implements DbAdapter { const finalSql: string = [select, ...sqlChunks].join(' ') fquery = finalSql - const result = await connection.unsafe(finalSql, vars.getValues(), getPrepare()) + const result = dbExtra?.useCF + ? await connection.unsafe(vars.injectVars(finalSql), undefined, { prepare: false }) + : await connection.unsafe(finalSql, vars.getValues(), getPrepare()) if ( options?.lookup === undefined && options?.domainLookup === undefined && @@ -934,19 +937,19 @@ abstract class PostgresAdapterBase implements DbAdapter { } } - private buildJoinString (value: JoinProps[]): string { + private buildJoinString (vars: ValuesVariables, value: JoinProps[]): string { const res: string[] = [] for (const val of value) { if (val.isReverse) continue if (val.table === DOMAIN_MODEL) continue res.push( - `LEFT JOIN ${val.table} AS ${val.toAlias} ON ${val.fromAlias}.${val.fromField} = ${val.toAlias}."${val.toField}" AND ${val.toAlias}."workspaceId" = '${this.workspaceId.name}'` + `LEFT JOIN ${val.table} AS ${val.toAlias} ON ${val.fromAlias}.${val.fromField} = ${val.toAlias}."${val.toField}" AND ${val.toAlias}."workspaceId" = ${vars.add(this.workspaceId.name, '::uuid')}` ) if (val.classes !== undefined) { if (val.classes.length === 1) { - res.push(`AND ${val.toAlias}._class = '${val.classes[0]}'`) + res.push(`AND ${val.toAlias}._class = ${vars.add(val.classes[0], '::text')}`) } else { - res.push(`AND ${val.toAlias}._class IN (${val.classes.map((c) => `'${c}'`).join(', ')})`) + res.push(`AND ${val.toAlias}._class = ANY (${vars.addArray(val.classes, '::text[]')})`) } } } @@ -1251,7 +1254,7 @@ abstract class PostgresAdapterBase implements DbAdapter { } private translateQueryValue (vars: ValuesVariables, tkey: string, value: any, type: ValueType): string | undefined { - const tkeyData = tkey.includes('data->') || tkey.includes('data#>>') + const tkeyData = tkey.includes('data') && (tkey.includes('->') || tkey.includes('#>>')) if (tkeyData && (Array.isArray(value) || (typeof value !== 'object' && typeof value !== 'string'))) { value = Array.isArray(value) ? value.map((it) => (it == null ? null : `${it}`)) : `${value}` } From 90e8ca4e976730694b3d1b299ebc831e28eafa47 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 22 Jan 2025 19:25:49 +0700 Subject: [PATCH 3/4] UBERF-9230: Fix ses webpush (#7760) Signed-off-by: Andrey Sobolev --- .../notification-resources/src/index.ts | 29 +++++------- services/ses/pod-ses/src/index.ts | 6 ++- services/ses/pod-ses/src/main.ts | 47 +++++++++++++------ 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/server-plugins/notification-resources/src/index.ts b/server-plugins/notification-resources/src/index.ts index 622ee07482a..9f5994e4e21 100644 --- a/server-plugins/notification-resources/src/index.ts +++ b/server-plugins/notification-resources/src/index.ts @@ -15,6 +15,7 @@ // import activity, { ActivityMessage, DocUpdateMessage } from '@hcengineering/activity' +import { Analytics } from '@hcengineering/analytics' import chunter, { ChatMessage } from '@hcengineering/chunter' import contact, { Employee, @@ -39,7 +40,6 @@ import core, { generateId, MeasureContext, MixinUpdate, - RateLimiter, Ref, RefTo, SortingOrder, @@ -82,7 +82,6 @@ import serverView from '@hcengineering/server-view' import { markupToText, stripTags } from '@hcengineering/text-core' import { encodeObjectURI } from '@hcengineering/view' import { workbenchId } from '@hcengineering/workbench' -import { Analytics } from '@hcengineering/analytics' import { Content, ContextsCache, ContextsCacheKey, NotifyParams, NotifyResult } from './types' import { @@ -92,6 +91,7 @@ import { getNotificationContent, getNotificationLink, getNotificationProviderControl, + getObjectSpace, getTextPresenter, getUsersInfo, isAllowed, @@ -103,8 +103,7 @@ import { replaceAll, toReceiverInfo, updateNotifyContextsSpace, - type NotificationProviderControl, - getObjectSpace + type NotificationProviderControl } from './utils' export function getPushCollaboratorTx ( @@ -602,13 +601,7 @@ export async function createPushNotification ( } } - const limiter = new RateLimiter(5) - for (const subscription of userSubscriptions) { - await limiter.add(async () => { - await sendPushToSubscription(sesURL, sesAuth, control, target, subscription, data) - }) - } - await limiter.waitProcessing() + void sendPushToSubscription(sesURL, sesAuth, control, target, userSubscriptions, data) } async function sendPushToSubscription ( @@ -616,11 +609,11 @@ async function sendPushToSubscription ( sesAuth: string | undefined, control: TriggerControl, targetUser: Ref, - subscription: PushSubscription, + subscriptions: PushSubscription[], data: PushData ): Promise { try { - const result: 'ok' | 'clear-push' = ( + const result: Ref[] = ( await ( await fetch(concatLink(sesURL, '/web-push'), { method: 'post', @@ -629,15 +622,17 @@ async function sendPushToSubscription ( ...(sesAuth != null ? { Authorization: `Bearer ${sesAuth}` } : {}) }, body: JSON.stringify({ - subscription, + subscriptions, data }) }) ).json() ).result - if (result === 'clear-push') { - const tx = control.txFactory.createTxRemoveDoc(subscription._class, subscription.space, subscription._id) - await control.apply(control.ctx, [tx]) + if (result.length > 0) { + const domain = control.hierarchy.findDomain(notification.class.PushSubscription) + if (domain !== undefined) { + await control.lowLevel.clean(control.ctx, domain, result) + } } } catch (err) { control.ctx.info('Cannot send push notification to', { user: targetUser, err }) diff --git a/services/ses/pod-ses/src/index.ts b/services/ses/pod-ses/src/index.ts index 22b45ddd139..c225f89a0f3 100644 --- a/services/ses/pod-ses/src/index.ts +++ b/services/ses/pod-ses/src/index.ts @@ -15,4 +15,8 @@ import { main } from './main' -void main() +void main().catch((err) => { + if (err != null) { + console.error(err) + } +}) diff --git a/services/ses/pod-ses/src/main.ts b/services/ses/pod-ses/src/main.ts index 6e91af0cc21..3dd3ae0227b 100644 --- a/services/ses/pod-ses/src/main.ts +++ b/services/ses/pod-ses/src/main.ts @@ -13,6 +13,7 @@ // limitations under the License. // +import type { Ref } from '@hcengineering/core' import { PushSubscription, type PushData } from '@hcengineering/notification' import type { Request, Response } from 'express' import webpush, { WebPushError } from 'web-push' @@ -22,25 +23,39 @@ import { SES } from './ses' import { Endpoint } from './types' const errorMessages = ['expired', 'Unregistered', 'No such subscription'] -async function sendPushToSubscription (subscription: PushSubscription, data: PushData): Promise<'ok' | 'clear-push'> { - try { - await webpush.sendNotification(subscription, JSON.stringify(data)) - } catch (err: any) { - if (err instanceof WebPushError) { - if (errorMessages.some((p) => JSON.stringify((err as WebPushError).body).includes(p))) { - return 'clear-push' +async function sendPushToSubscription ( + subscriptions: PushSubscription[], + data: PushData +): Promise[]> { + const result: Ref[] = [] + for (const subscription of subscriptions) { + try { + await webpush.sendNotification(subscription, JSON.stringify(data)) + } catch (err: any) { + if (err instanceof WebPushError) { + if (errorMessages.some((p) => JSON.stringify((err as WebPushError).body).includes(p))) { + result.push(subscription._id) + } } } } - return 'ok' + return result } export const main = async (): Promise => { const ses = new SES() console.log('SES service has been started') + let webpushInitDone = false if (config.PushPublicKey !== undefined && config.PushPrivateKey !== undefined) { - webpush.setVapidDetails(config.PushSubject ?? 'mailto:hey@huly.io', config.PushPublicKey, config.PushPublicKey) + try { + const subj = config.PushSubject ?? 'mailto:hey@huly.io' + console.log('Setting VAPID details', subj, config.PushPublicKey.length, config.PushPrivateKey.length) + webpush.setVapidDetails(config.PushSubject ?? 'mailto:hey@huly.io', config.PushPublicKey, config.PushPrivateKey) + webpushInitDone = true + } catch (err: any) { + console.error(err) + } } const checkAuth = (req: Request, res: Response): boolean => { @@ -104,14 +119,18 @@ export const main = async (): Promise => { res.status(400).send({ err: "'data' is missing" }) return } - const subscription: PushSubscription | undefined = req.body?.subscription - if (subscription === undefined) { - res.status(400).send({ err: "'subscription' is missing" }) + const subscriptions: PushSubscription[] | undefined = req.body?.subscriptions + if (subscriptions === undefined) { + res.status(400).send({ err: "'subscriptions' is missing" }) + return + } + if (!webpushInitDone) { + res.json({ result: [] }).end() return } - const result = await sendPushToSubscription(subscription, data) - res.json({ result }) + const result = await sendPushToSubscription(subscriptions, data) + res.json({ result }).end() } } ] From 63c6a30ef70892e72f0e6b3c10a2632fa30fadb8 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 22 Jan 2025 19:37:00 +0700 Subject: [PATCH 4/4] Remove redundant map (#7763) Signed-off-by: Andrey Sobolev --- packages/core/src/__tests__/limits.test.ts | 8 ++++---- packages/core/src/utils.ts | 14 ++++++-------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/packages/core/src/__tests__/limits.test.ts b/packages/core/src/__tests__/limits.test.ts index 5a7c15d63eb..2fc323d9f28 100644 --- a/packages/core/src/__tests__/limits.test.ts +++ b/packages/core/src/__tests__/limits.test.ts @@ -72,7 +72,7 @@ describe('TimeRateLimiter', () => { const operations = Promise.all([limiter.exec(mockFn), limiter.exec(mockFn), limiter.exec(mockFn)]) expect(mockFn).toHaveBeenCalledTimes(2) - expect(limiter.processingQueue.size).toBe(2) + expect(limiter.active).toBe(2) jest.advanceTimersByTime(500) await Promise.resolve() @@ -84,7 +84,7 @@ describe('TimeRateLimiter', () => { await Promise.resolve() await Promise.resolve() - expect(limiter.processingQueue.size).toBe(0) + expect(limiter.active).toBe(0) expect(mockFn).toHaveBeenCalledTimes(3) @@ -104,7 +104,7 @@ describe('TimeRateLimiter', () => { console.log('wait complete') }) - expect(limiter.processingQueue.size).toBe(1) + expect(limiter.active).toBe(1) jest.advanceTimersByTime(1001) await Promise.resolve() @@ -113,6 +113,6 @@ describe('TimeRateLimiter', () => { await waitPromise await operation - expect(limiter.processingQueue.size).toBe(0) + expect(limiter.active).toBe(0) }) }) diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 45e5a8b943a..022aefd8fb3 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -846,7 +846,7 @@ export function pluginFilterTx ( export class TimeRateLimiter { idCounter: number = 0 - processingQueue = new Map>() + active: number = 0 last: number = 0 rate: number period: number @@ -866,9 +866,7 @@ export class TimeRateLimiter { } async exec = any>(op: (args?: B) => Promise, args?: B): Promise { - const processingId = this.idCounter++ - - while (this.processingQueue.size >= this.rate || this.executions.length >= this.rate) { + while (this.active >= this.rate || this.executions.length >= this.rate) { this.cleanupExecutions() if (this.executions.length < this.rate) { break @@ -882,11 +880,11 @@ export class TimeRateLimiter { try { this.executions.push(v) const p = op(args) - this.processingQueue.set(processingId, p as Promise) + this.active++ return await p } finally { v.running = false - this.processingQueue.delete(processingId) + this.active-- this.cleanupExecutions() const n = this.notify.shift() if (n !== undefined) { @@ -896,8 +894,8 @@ export class TimeRateLimiter { } async waitProcessing (): Promise { - while (this.processingQueue.size > 0) { - console.log('wait', this.processingQueue.size) + while (this.active > 0) { + console.log('wait', this.active) await new Promise((resolve) => { this.notify.push(resolve) })