Skip to content

Commit

Permalink
Merge branch 'develop' into staging
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo committed Jan 22, 2025
2 parents 72bbad7 + 63c6a30 commit 1a3ee1d
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 75 deletions.
33 changes: 33 additions & 0 deletions dev/tool/src/fulltext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright © 2025 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//

import { type MeasureContext } from '@hcengineering/core'

export async function reindexWorkspace (ctx: MeasureContext, fulltextUrl: string, token: string): Promise<void> {
try {
const res = await fetch(fulltextUrl + '/api/v1/reindex', {
method: 'PUT',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ token })
})
if (!res.ok) {
throw new Error(`HTTP Error ${res.status} ${res.statusText}`)
}
} catch (err: any) {
ctx.error('failed to reset index', { err })
}
}
37 changes: 27 additions & 10 deletions dev/tool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ import { buildStorageFromConfig, createStorageFromConfig, storageConfigFromEnv }
import { program, type Command } from 'commander'
import { addControlledDocumentRank } from './qms'
import { clearTelegramHistory } from './telegram'
import { diffWorkspace, recreateElastic, updateField } from './workspace'
import { diffWorkspace, updateField } from './workspace'

import core, {
AccountRole,
Expand Down Expand Up @@ -149,6 +149,7 @@ import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount } from './renameAccount'
import { copyToDatalake, moveFiles, showLostFiles } from './storage'
import { createPostgresTxAdapter, createPostgresAdapter, createPostgreeDestroyAdapter } from '@hcengineering/postgres'
import { reindexWorkspace } from './fulltext'

const colorConstants = {
colorRed: '\u001b[31m',
Expand Down Expand Up @@ -1925,27 +1926,43 @@ export function devTool (
)

program
.command('recreate-elastic-indexes-mongo <workspace>')
.description('reindex workspace to elastic')
.command('fulltext-reindex <workspace>')
.description('reindex workspace')
.action(async (workspace: string) => {
const mongodbUri = getMongoDBUrl()
const fulltextUrl = process.env.FULLTEXT_URL
if (fulltextUrl === undefined) {
console.error('please provide FULLTEXT_URL')
process.exit(1)
}

const wsid = getWorkspaceId(workspace)
await recreateElastic(mongodbUri, wsid)
const token = generateToken(systemAccountEmail, wsid)

console.log('reindex workspace', workspace)
await reindexWorkspace(toolCtx, fulltextUrl, token)
console.log('done', workspace)
})

program
.command('recreate-all-elastic-indexes-mongo')
.description('reindex elastic')
.command('fulltext-reindex-all')
.description('reindex workspaces')
.action(async () => {
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
const fulltextUrl = process.env.FULLTEXT_URL
if (fulltextUrl === undefined) {
console.error('please provide FULLTEXT_URL')
process.exit(1)
}

await withAccountDatabase(async (db) => {
const workspaces = await listWorkspacesRaw(db)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
for (const workspace of workspaces) {
const wsid = getWorkspaceId(workspace.workspace)
await recreateElastic(mongodbUri ?? dbUrl, wsid)
const token = generateToken(systemAccountEmail, wsid)

console.log('reindex workspace', workspace)
await reindexWorkspace(toolCtx, fulltextUrl, token)
console.log('done', workspace)
}
})
})
Expand Down
14 changes: 0 additions & 14 deletions dev/tool/src/workspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import core, {
type Class,
type Client as CoreClient,
type Doc,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_TX,
type Ref,
type Tx,
Expand Down Expand Up @@ -96,16 +95,3 @@ export async function updateField (
await connection.close()
}
}

export async function recreateElastic (mongoUrl: string, workspaceId: WorkspaceId): Promise<void> {
const client = getMongoClient(mongoUrl)
const _client = await client.getClient()
try {
const db = getWorkspaceMongoDB(_client, workspaceId)
await db
.collection(DOMAIN_DOC_INDEX_STATE)
.updateMany({ _class: core.class.DocIndexState }, { $set: { needIndex: true } })
} finally {
client.close()
}
}
8 changes: 4 additions & 4 deletions packages/core/src/__tests__/limits.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ describe('TimeRateLimiter', () => {
const operations = Promise.all([limiter.exec(mockFn), limiter.exec(mockFn), limiter.exec(mockFn)])

expect(mockFn).toHaveBeenCalledTimes(2)
expect(limiter.processingQueue.size).toBe(2)
expect(limiter.active).toBe(2)

jest.advanceTimersByTime(500)
await Promise.resolve()
Expand All @@ -84,7 +84,7 @@ describe('TimeRateLimiter', () => {
await Promise.resolve()
await Promise.resolve()

expect(limiter.processingQueue.size).toBe(0)
expect(limiter.active).toBe(0)

expect(mockFn).toHaveBeenCalledTimes(3)

Expand All @@ -104,7 +104,7 @@ describe('TimeRateLimiter', () => {
console.log('wait complete')
})

expect(limiter.processingQueue.size).toBe(1)
expect(limiter.active).toBe(1)

jest.advanceTimersByTime(1001)
await Promise.resolve()
Expand All @@ -113,6 +113,6 @@ describe('TimeRateLimiter', () => {

await waitPromise
await operation
expect(limiter.processingQueue.size).toBe(0)
expect(limiter.active).toBe(0)
})
})
14 changes: 6 additions & 8 deletions packages/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ export function pluginFilterTx (

export class TimeRateLimiter {
idCounter: number = 0
processingQueue = new Map<number, Promise<void>>()
active: number = 0
last: number = 0
rate: number
period: number
Expand All @@ -866,9 +866,7 @@ export class TimeRateLimiter {
}

async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
const processingId = this.idCounter++

while (this.processingQueue.size >= this.rate || this.executions.length >= this.rate) {
while (this.active >= this.rate || this.executions.length >= this.rate) {
this.cleanupExecutions()
if (this.executions.length < this.rate) {
break
Expand All @@ -882,11 +880,11 @@ export class TimeRateLimiter {
try {
this.executions.push(v)
const p = op(args)
this.processingQueue.set(processingId, p as Promise<void>)
this.active++
return await p
} finally {
v.running = false
this.processingQueue.delete(processingId)
this.active--
this.cleanupExecutions()
const n = this.notify.shift()
if (n !== undefined) {
Expand All @@ -896,8 +894,8 @@ export class TimeRateLimiter {
}

async waitProcessing (): Promise<void> {
while (this.processingQueue.size > 0) {
console.log('wait', this.processingQueue.size)
while (this.active > 0) {
console.log('wait', this.active)
await new Promise<void>((resolve) => {
this.notify.push(resolve)
})
Expand Down
32 changes: 32 additions & 0 deletions pods/fulltext/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ class WorkspaceIndexer {
return result
}

async reindex (): Promise<void> {
await this.fulltext.cancel()
await this.fulltext.clearIndex()
await this.fulltext.startIndexing(() => {
this.lastUpdate = Date.now()
})
}

async close (): Promise<void> {
await this.fulltext.cancel()
await this.pipeline.close()
Expand Down Expand Up @@ -188,6 +196,10 @@ interface Search {
fullTextLimit: number
}

interface Reindex {
token: string
}

export async function startIndexer (
ctx: MeasureContext,
opt: {
Expand Down Expand Up @@ -391,6 +403,26 @@ export async function startIndexer (
}
})

router.put('/api/v1/reindex', async (req, res) => {
try {
const request = req.request.body as Reindex
const decoded = decodeToken(request.token) // Just to be safe
req.body = {}

ctx.info('reindex', { workspace: decoded.workspace })
const indexer = await getIndexer(ctx, decoded.workspace, request.token, true)
if (indexer !== undefined) {
indexer.lastUpdate = Date.now()
await indexer.reindex()
}
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
req.res.writeHead(404, {})
req.res.end()
}
})

app.use(router.routes()).use(router.allowedMethods())

const server = app.listen(opt.port, () => {
Expand Down
29 changes: 12 additions & 17 deletions server-plugins/notification-resources/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//

import activity, { ActivityMessage, DocUpdateMessage } from '@hcengineering/activity'
import { Analytics } from '@hcengineering/analytics'
import chunter, { ChatMessage } from '@hcengineering/chunter'
import contact, {
Employee,
Expand All @@ -39,7 +40,6 @@ import core, {
generateId,
MeasureContext,
MixinUpdate,
RateLimiter,
Ref,
RefTo,
SortingOrder,
Expand Down Expand Up @@ -82,7 +82,6 @@ import serverView from '@hcengineering/server-view'
import { markupToText, stripTags } from '@hcengineering/text-core'
import { encodeObjectURI } from '@hcengineering/view'
import { workbenchId } from '@hcengineering/workbench'
import { Analytics } from '@hcengineering/analytics'

import { Content, ContextsCache, ContextsCacheKey, NotifyParams, NotifyResult } from './types'
import {
Expand All @@ -92,6 +91,7 @@ import {
getNotificationContent,
getNotificationLink,
getNotificationProviderControl,
getObjectSpace,
getTextPresenter,
getUsersInfo,
isAllowed,
Expand All @@ -103,8 +103,7 @@ import {
replaceAll,
toReceiverInfo,
updateNotifyContextsSpace,
type NotificationProviderControl,
getObjectSpace
type NotificationProviderControl
} from './utils'

export function getPushCollaboratorTx (
Expand Down Expand Up @@ -602,25 +601,19 @@ export async function createPushNotification (
}
}

const limiter = new RateLimiter(5)
for (const subscription of userSubscriptions) {
await limiter.add(async () => {
await sendPushToSubscription(sesURL, sesAuth, control, target, subscription, data)
})
}
await limiter.waitProcessing()
void sendPushToSubscription(sesURL, sesAuth, control, target, userSubscriptions, data)
}

async function sendPushToSubscription (
sesURL: string,
sesAuth: string | undefined,
control: TriggerControl,
targetUser: Ref<Account>,
subscription: PushSubscription,
subscriptions: PushSubscription[],
data: PushData
): Promise<void> {
try {
const result: 'ok' | 'clear-push' = (
const result: Ref<PushSubscription>[] = (
await (
await fetch(concatLink(sesURL, '/web-push'), {
method: 'post',
Expand All @@ -629,15 +622,17 @@ async function sendPushToSubscription (
...(sesAuth != null ? { Authorization: `Bearer ${sesAuth}` } : {})
},
body: JSON.stringify({
subscription,
subscriptions,
data
})
})
).json()
).result
if (result === 'clear-push') {
const tx = control.txFactory.createTxRemoveDoc(subscription._class, subscription.space, subscription._id)
await control.apply(control.ctx, [tx])
if (result.length > 0) {
const domain = control.hierarchy.findDomain(notification.class.PushSubscription)
if (domain !== undefined) {
await control.lowLevel.clean(control.ctx, domain, result)
}
}
} catch (err) {
control.ctx.info('Cannot send push notification to', { user: targetUser, err })
Expand Down
14 changes: 14 additions & 0 deletions server/indexer/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
triggerIndexing = (): void => {}

async startIndexing (indexing: () => void): Promise<void> {
this.cancelling = false
this.verify = this.verifyWorkspace(this.metrics, indexing)
void this.verify.then(() => {
this.indexing = this.doIndexing(indexing)
Expand Down Expand Up @@ -282,6 +283,19 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
}

async clearIndex (): Promise<void> {
const ctx = this.metrics
const migrations = await this.storage.findAll<MigrationState>(ctx, core.class.MigrationState, {
plugin: coreId,
state: {
$in: ['verify-indexes-v2', 'full-text-indexer-v4', 'full-text-structure-v4']
}
})

const refs = migrations.map((it) => it._id)
await this.storage.clean(ctx, DOMAIN_MIGRATION, refs)
}

broadcastClasses = new Set<Ref<Class<Doc>>>()
broadcasts: number = 0

Expand Down
Loading

0 comments on commit 1a3ee1d

Please sign in to comment.