diff --git a/dev/prod/src/platform.ts b/dev/prod/src/platform.ts index 54141344df..8ab1bd2927 100644 --- a/dev/prod/src/platform.ts +++ b/dev/prod/src/platform.ts @@ -420,7 +420,9 @@ export async function configurePlatform() { setMetadata(login.metadata.TransactorOverride, config.TRANSACTOR_OVERRIDE) // Use binary response transfer for faster performance and small transfer sizes. - setMetadata(client.metadata.UseBinaryProtocol, config.USE_BINARY_PROTOCOL ?? true) + const binaryOverride = localStorage.getItem(client.metadata.UseBinaryProtocol) + setMetadata(client.metadata.UseBinaryProtocol, binaryOverride != null ? binaryOverride === 'true' : (config.USE_BINARY_PROTOCOL ?? true)) + // Disable for now, since it causes performance issues on linux/docker/kubernetes boxes for now. setMetadata(client.metadata.UseProtocolCompression, true) diff --git a/dev/tool/package.json b/dev/tool/package.json index 0e1da1edcc..440618bb89 100644 --- a/dev/tool/package.json +++ b/dev/tool/package.json @@ -160,7 +160,6 @@ "csv-parse": "~5.1.0", "email-addresses": "^5.0.0", "fast-equals": "^5.0.1", - "got": "^11.8.3", "libphonenumber-js": "^1.9.46", "mime-types": "~2.1.34", "mongodb": "^6.12.0", diff --git a/packages/core/src/classes.ts b/packages/core/src/classes.ts index 94dd4b02a3..86bc962760 100644 --- a/packages/core/src/classes.ts +++ b/packages/core/src/classes.ts @@ -661,6 +661,7 @@ export type WorkspaceMode = | 'pending-deletion' // -> 'deleting' | 'deleting' // -> "deleted" | 'active' + | 'deleted' | 'archiving-pending-backup' // -> 'cleaning' | 'archiving-backup' // -> 'archiving-pending-clean' | 'archiving-pending-clean' // -> 'archiving-clean' diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 6c74eedcbc..37241ce9d8 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -549,6 +549,7 @@ class Connection implements ClientConnection { once?: boolean // Require handleResult to retrieve result measure?: (time: number, result: any, serverTime: number, queue: number, toRecieve: number) => void allowReconnect?: boolean + overrideId?: number }): Promise { return this.ctx.newChild('send-request', {}).with(data.method, {}, async (ctx) => { if (this.closed) { @@ -566,7 +567,7 @@ class Connection implements ClientConnection { } } - const id = this.lastId++ + const id = data.overrideId ?? this.lastId++ const promise = new RequestPromise(data.method, data.params, data.handleResult) promise.handleTime = data.measure @@ -725,7 +726,7 @@ class Connection implements ClientConnection { } sendForceClose (): Promise { - return this.sendRequest({ method: 'forceClose', params: [], allowReconnect: false }) + return this.sendRequest({ method: 'forceClose', params: [], allowReconnect: false, overrideId: -2, once: true }) } } diff --git a/pods/server/src/server.ts b/pods/server/src/server.ts index dbe06c3791..5e024b9ce2 100644 --- a/pods/server/src/server.ts +++ b/pods/server/src/server.ts @@ -14,22 +14,16 @@ // limitations under the License. // -import { - type Branding, - type BrandingMap, - type MeasureContext, - type Tx, - type WorkspaceIdWithUrl -} from '@hcengineering/core' +import { type BrandingMap, type MeasureContext, type Tx } from '@hcengineering/core' import { buildStorageFromConfig } from '@hcengineering/server-storage' import { ClientSession, startSessionManager } from '@hcengineering/server' import { - type Pipeline, type ServerFactory, type Session, type SessionManager, - type StorageConfiguration + type StorageConfiguration, + type Workspace } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' @@ -42,9 +36,9 @@ import { registerTxAdapterFactory } from '@hcengineering/server-pipeline' -import { readFileSync } from 'node:fs' import { createMongoAdapter, createMongoDestroyAdapter, createMongoTxAdapter } from '@hcengineering/mongo' import { createPostgreeDestroyAdapter, createPostgresAdapter, createPostgresTxAdapter } from '@hcengineering/postgres' +import { readFileSync } from 'node:fs' const model = JSON.parse(readFileSync(process.env.MODEL_JSON ?? 'model.json').toString()) as Tx[] registerStringLoaders() @@ -93,13 +87,8 @@ export function start ( { ...opt, externalStorage, adapterSecurity: dbUrl.startsWith('postgresql') }, {} ) - const sessionFactory = ( - token: Token, - pipeline: Pipeline, - workspaceId: WorkspaceIdWithUrl, - branding: Branding | null - ): Session => { - return new ClientSession(token, pipeline, workspaceId, branding, token.extra?.mode === 'backup') + const sessionFactory = (token: Token, workspace: Workspace): Session => { + return new ClientSession(token, workspace, token.extra?.mode === 'backup') } const { shutdown: onClose, sessionManager } = startSessionManager(metrics, { diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts index 88b76e7685..c9d768620e 100644 --- a/server/account/src/operations.ts +++ b/server/account/src/operations.ts @@ -1712,7 +1712,7 @@ export async function getAllWorkspaces ( } return (await db.workspace.find({})).map((it) => { - it.accounts = (it.accounts ?? []).map((it) => it.toString()) + ;(it as any).accounts = (it.accounts ?? []).length return it }) } diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 2a45c7869d..189caca7dc 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -43,7 +43,7 @@ import { } from '@hcengineering/core' import type { Asset, Resource } from '@hcengineering/platform' import type { LiveQuery } from '@hcengineering/query' -import type { Request, Response } from '@hcengineering/rpc' +import type { ReqId, Request, Response } from '@hcengineering/rpc' import type { Token } from '@hcengineering/server-token' import { type Readable } from 'stream' import type { DbAdapter, DomainHelper } from './adapter' @@ -496,32 +496,21 @@ export interface SessionRequest { export interface ClientSessionCtx { ctx: MeasureContext - sendResponse: (msg: any) => Promise + + pipeline: Pipeline + + requestId: ReqId | undefined + sendResponse: (id: ReqId | undefined, msg: any) => Promise sendPong: () => void - sendError: (msg: any, error: any) => Promise + sendError: (id: ReqId | undefined, msg: any, error: any) => Promise } /** * @public */ export interface Session { + workspace: Workspace createTime: number - getUser: () => string - pipeline: () => Pipeline - ping: (ctx: ClientSessionCtx) => Promise - findAll: ( - ctx: ClientSessionCtx, - _class: Ref>, - query: DocumentQuery, - options?: FindOptions - ) => Promise - findAllRaw: ( - ctx: MeasureContext, - _class: Ref>, - query: DocumentQuery, - options?: FindOptions - ) => Promise> - tx: (ctx: ClientSessionCtx, tx: Tx) => Promise // Session restore information sessionId: string @@ -544,6 +533,28 @@ export interface Session { getMode: () => string broadcast: (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]) => void + + // Client methods + ping: (ctx: ClientSessionCtx) => Promise + getUser: () => string + + loadModel: (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string) => Promise + getAccount: (ctx: ClientSessionCtx) => Promise + + getRawAccount: (pipeline: Pipeline) => Account + findAll: ( + ctx: ClientSessionCtx, + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ) => Promise + searchFulltext: (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions) => Promise + tx: (ctx: ClientSessionCtx, tx: Tx) => Promise + loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number) => Promise + closeChunk: (ctx: ClientSessionCtx, idx: number) => Promise + loadDocs: (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]) => Promise + upload: (ctx: ClientSessionCtx, domain: Domain, docs: Doc[]) => Promise + clean: (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]) => Promise } /** @@ -587,7 +598,7 @@ export interface Workspace { context: MeasureContext id: string token: string // Account workspace update token. - pipeline: Promise + pipeline: Promise | Pipeline tickHash: number tickHandlers: Map @@ -599,7 +610,7 @@ export interface Workspace { softShutdown: number workspaceInitCompleted: boolean - workspaceId: WorkspaceId + workspaceId: WorkspaceIdWithUrl workspaceName: string workspaceUuid?: string branding: Branding | null @@ -622,12 +633,7 @@ export interface SessionManager { workspaces: Map sessions: Map - createSession: ( - token: Token, - pipeline: Pipeline, - workspaceId: WorkspaceIdWithUrl, - branding: Branding | null - ) => Session + createSession: (token: Token, workspace: Workspace) => Session addSession: ( ctx: MeasureContext, diff --git a/server/core/src/utils.ts b/server/core/src/utils.ts index 287b09fe4e..5fb08a4021 100644 --- a/server/core/src/utils.ts +++ b/server/core/src/utils.ts @@ -167,20 +167,51 @@ export function getUser (modelDb: ModelDb, userEmail: string | undefined, admin? export class SessionDataImpl implements SessionData { _account: Account | undefined + _removedMap: Map, Doc> | undefined + _contextCache: Map | undefined + _broadcast: SessionData['broadcast'] | undefined constructor ( readonly userEmail: string, readonly sessionId: string, readonly admin: boolean | undefined, - readonly broadcast: SessionData['broadcast'], + _broadcast: SessionData['broadcast'] | undefined, readonly workspace: WorkspaceIdWithUrl, readonly branding: Branding | null, readonly isAsyncContext: boolean, - readonly removedMap: Map, Doc>, - readonly contextCache: Map, + _removedMap: Map, Doc> | undefined, + _contextCache: Map | undefined, readonly modelDb: ModelDb, readonly rawAccount?: Account - ) {} + ) { + this._removedMap = _removedMap + this._contextCache = _contextCache + this._broadcast = _broadcast + } + + get broadcast (): SessionData['broadcast'] { + if (this._broadcast === undefined) { + this._broadcast = { + targets: {}, + txes: [] + } + } + return this._broadcast + } + + get removedMap (): Map, Doc> { + if (this._removedMap === undefined) { + this._removedMap = new Map() + } + return this._removedMap + } + + get contextCache (): Map { + if (this._contextCache === undefined) { + this._contextCache = new Map() + } + return this._contextCache + } get account (): Account { this._account = this.rawAccount ?? this._account ?? getUser(this.modelDb, this.userEmail, this.admin) @@ -234,8 +265,8 @@ export function wrapPipeline ( wsUrl, null, true, - new Map(), - new Map(), + undefined, + undefined, pipeline.context.modelDb ) ctx.contextData = contextData diff --git a/server/indexer/src/indexer/indexer.ts b/server/indexer/src/indexer/indexer.ts index 2f70cf00f6..0777e7445c 100644 --- a/server/indexer/src/indexer/indexer.ts +++ b/server/indexer/src/indexer/indexer.ts @@ -759,12 +759,12 @@ export class FullTextIndexPipeline implements FullTextPipeline { systemAccountEmail, '', true, - { targets: {}, txes: [] }, + undefined, this.workspace, null, false, - new Map(), - new Map(), + undefined, + undefined, this.model ) } diff --git a/server/server/src/client.ts b/server/server/src/client.ts index 0c8b7ed32b..6d1c825e62 100644 --- a/server/server/src/client.ts +++ b/server/server/src/client.ts @@ -13,13 +13,10 @@ // limitations under the License. // -import core, { - AccountRole, +import { generateId, - TxFactory, TxProcessor, type Account, - type Branding, type Class, type Doc, type DocumentQuery, @@ -33,20 +30,21 @@ import core, { type SessionData, type Timestamp, type Tx, - type TxCUD, - type WorkspaceIdWithUrl + type TxCUD } from '@hcengineering/core' import { PlatformError, unknownError } from '@hcengineering/platform' import { BackupClientOps, createBroadcastEvent, + getUser, SessionDataImpl, type ClientSessionCtx, type ConnectionSocket, type Pipeline, type Session, type SessionRequest, - type StatisticsElement + type StatisticsElement, + type Workspace } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' import { handleSend } from './utils' @@ -72,14 +70,18 @@ export class ClientSession implements Session { measures: { id: string, message: string, time: 0 }[] = [] ops: BackupClientOps | undefined + opsPipeline: Pipeline | undefined + + account?: Account + isAdmin: boolean constructor ( protected readonly token: Token, - protected readonly _pipeline: Pipeline, - readonly workspaceId: WorkspaceIdWithUrl, - readonly branding: Branding | null, + readonly workspace: Workspace, readonly allowUpload: boolean - ) {} + ) { + this.isAdmin = this.token.extra?.admin === 'true' + } getUser (): string { return this.token.email @@ -93,76 +95,48 @@ export class ClientSession implements Session { return this.token.extra?.mode ?? 'normal' } - pipeline (): Pipeline { - return this._pipeline - } - async ping (ctx: ClientSessionCtx): Promise { this.lastRequest = Date.now() ctx.sendPong() } async loadModel (ctx: ClientSessionCtx, lastModelTx: Timestamp, hash?: string): Promise { - this.includeSessionContext(ctx.ctx) - const result = await ctx.ctx.with('load-model', {}, () => this._pipeline.loadModel(ctx.ctx, lastModelTx, hash)) - await ctx.sendResponse(result) + this.includeSessionContext(ctx.ctx, ctx.pipeline) + const result = await ctx.ctx.with('load-model', {}, () => ctx.pipeline.loadModel(ctx.ctx, lastModelTx, hash)) + await ctx.sendResponse(ctx.requestId, result) } async getAccount (ctx: ClientSessionCtx): Promise { - const account = this._pipeline.context.modelDb.getAccountByEmail(this.token.email) - if (account === undefined && this.token.extra?.admin === 'true') { - await ctx.sendResponse(this.getSystemAccount()) - return - } - await ctx.sendResponse(account) + await ctx.sendResponse(ctx.requestId, this.getRawAccount(ctx.pipeline)) } - private getSystemAccount (): Account { - // Generate account for admin user - const factory = new TxFactory(core.account.System) - const email = `system:${this.token.email}` - const createTx = factory.createTxCreateDoc( - core.class.Account, - core.space.Model, - { - role: AccountRole.Owner, - email - }, - email as Ref - ) - return TxProcessor.createDoc2Doc(createTx) - } - - includeSessionContext (ctx: MeasureContext): void { - let account: Account | undefined - if (this.token.extra?.admin === 'true') { - account = this._pipeline.context.modelDb.getAccountByEmail(this.token.email) - if (account === undefined) { - account = this.getSystemAccount() - } + getRawAccount (pipeline: Pipeline): Account { + if (this.account === undefined) { + this.account = getUser(pipeline.context.modelDb, this.token.email, this.isAdmin) } + return this.account + } + includeSessionContext (ctx: MeasureContext, pipeline: Pipeline): void { const contextData = new SessionDataImpl( this.token.email, this.sessionId, - this.token.extra?.admin === 'true', - { - txes: [], - targets: {} - }, - this.workspaceId, - this.branding, + this.isAdmin, + undefined, + this.workspace.workspaceId, + this.workspace.branding, false, - new Map(), - new Map(), - this._pipeline.context.modelDb, - account + undefined, + undefined, + pipeline.context.modelDb, + this.getRawAccount(pipeline) ) ctx.contextData = contextData } findAllRaw( ctx: MeasureContext, + pipeline: Pipeline, _class: Ref>, query: DocumentQuery, options?: FindOptions @@ -170,8 +144,8 @@ export class ClientSession implements Session { this.lastRequest = Date.now() this.total.find++ this.current.find++ - this.includeSessionContext(ctx) - return this._pipeline.findAll(ctx, _class, query, options) + this.includeSessionContext(ctx, pipeline) + return pipeline.findAll(ctx, _class, query, options) } async findAll( @@ -180,32 +154,32 @@ export class ClientSession implements Session { query: DocumentQuery, options?: FindOptions ): Promise { - await ctx.sendResponse(await this.findAllRaw(ctx.ctx, _class, query, options)) + await ctx.sendResponse(ctx.requestId, await this.findAllRaw(ctx.ctx, ctx.pipeline, _class, query, options)) } async searchFulltext (ctx: ClientSessionCtx, query: SearchQuery, options: SearchOptions): Promise { this.lastRequest = Date.now() - this.includeSessionContext(ctx.ctx) - await ctx.sendResponse(await this._pipeline.searchFulltext(ctx.ctx, query, options)) + this.includeSessionContext(ctx.ctx, ctx.pipeline) + await ctx.sendResponse(ctx.requestId, await ctx.pipeline.searchFulltext(ctx.ctx, query, options)) } async tx (ctx: ClientSessionCtx, tx: Tx): Promise { this.lastRequest = Date.now() this.total.tx++ this.current.tx++ - this.includeSessionContext(ctx.ctx) + this.includeSessionContext(ctx.ctx, ctx.pipeline) let cid = 'client_' + generateId() ctx.ctx.id = cid - let onEnd = useReserveContext ? this._pipeline.context.adapterManager?.reserveContext?.(cid) : undefined + let onEnd = useReserveContext ? ctx.pipeline.context.adapterManager?.reserveContext?.(cid) : undefined try { - const result = await this._pipeline.tx(ctx.ctx, [tx]) + const result = await ctx.pipeline.tx(ctx.ctx, [tx]) // Send result immideately - await ctx.sendResponse(result) + await ctx.sendResponse(ctx.requestId, result) // We need to broadcast all collected transactions - await this._pipeline.handleBroadcast(ctx.ctx) + await ctx.pipeline.handleBroadcast(ctx.ctx) } finally { onEnd?.() } @@ -215,7 +189,7 @@ export class ClientSession implements Session { if (asyncs.length > 0) { cid = 'client_async_' + generateId() ctx.ctx.id = cid - onEnd = useReserveContext ? this._pipeline.context.adapterManager?.reserveContext?.(cid) : undefined + onEnd = useReserveContext ? ctx.pipeline.context.adapterManager?.reserveContext?.(cid) : undefined try { for (const r of (ctx.ctx.contextData as SessionData).asyncRequests ?? []) { await r() @@ -252,12 +226,13 @@ export class ClientSession implements Session { } } - getOps (): BackupClientOps { - if (this.ops === undefined) { - if (this._pipeline.context.lowLevelStorage === undefined) { + getOps (pipeline: Pipeline): BackupClientOps { + if (this.ops === undefined || this.opsPipeline !== pipeline) { + if (pipeline.context.lowLevelStorage === undefined) { throw new PlatformError(unknownError('Low level storage is not available')) } - this.ops = new BackupClientOps(this._pipeline.context.lowLevelStorage) + this.ops = new BackupClientOps(pipeline.context.lowLevelStorage) + this.opsPipeline = pipeline } return this.ops } @@ -265,67 +240,58 @@ export class ClientSession implements Session { async loadChunk (ctx: ClientSessionCtx, domain: Domain, idx?: number): Promise { this.lastRequest = Date.now() try { - const result = await this.getOps().loadChunk(ctx.ctx, domain, idx) - await ctx.sendResponse(result) + const result = await this.getOps(ctx.pipeline).loadChunk(ctx.ctx, domain, idx) + await ctx.sendResponse(ctx.requestId, result) } catch (err: any) { - await ctx.sendError('Failed to upload', unknownError(err)) + await ctx.sendError(ctx.requestId, 'Failed to upload', unknownError(err)) ctx.ctx.error('failed to loadChunk', { domain, err }) } } async closeChunk (ctx: ClientSessionCtx, idx: number): Promise { this.lastRequest = Date.now() - await this.getOps().closeChunk(ctx.ctx, idx) - await ctx.sendResponse({}) + await this.getOps(ctx.pipeline).closeChunk(ctx.ctx, idx) + await ctx.sendResponse(ctx.requestId, {}) } async loadDocs (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]): Promise { this.lastRequest = Date.now() try { - const result = await this.getOps().loadDocs(ctx.ctx, domain, docs) - await ctx.sendResponse(result) + const result = await this.getOps(ctx.pipeline).loadDocs(ctx.ctx, domain, docs) + await ctx.sendResponse(ctx.requestId, result) } catch (err: any) { - await ctx.sendError('Failed to loadDocs', unknownError(err)) + await ctx.sendError(ctx.requestId, 'Failed to loadDocs', unknownError(err)) ctx.ctx.error('failed to loadDocs', { domain, err }) } } async upload (ctx: ClientSessionCtx, domain: Domain, docs: Doc[]): Promise { if (!this.allowUpload) { - await ctx.sendResponse({ error: 'Upload not allowed' }) + await ctx.sendResponse(ctx.requestId, { error: 'Upload not allowed' }) } this.lastRequest = Date.now() try { - await this.getOps().upload(ctx.ctx, domain, docs) + await this.getOps(ctx.pipeline).upload(ctx.ctx, domain, docs) } catch (err: any) { - await ctx.sendError('Failed to upload', unknownError(err)) + await ctx.sendError(ctx.requestId, 'Failed to upload', unknownError(err)) ctx.ctx.error('failed to loadDocs', { domain, err }) return } - await ctx.sendResponse({}) + await ctx.sendResponse(ctx.requestId, {}) } async clean (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]): Promise { if (!this.allowUpload) { - await ctx.sendResponse({ error: 'Clean not allowed' }) + await ctx.sendResponse(ctx.requestId, { error: 'Clean not allowed' }) } this.lastRequest = Date.now() try { - await this.getOps().clean(ctx.ctx, domain, docs) + await this.getOps(ctx.pipeline).clean(ctx.ctx, domain, docs) } catch (err: any) { - await ctx.sendError('Failed to clean', unknownError(err)) + await ctx.sendError(ctx.requestId, 'Failed to clean', unknownError(err)) ctx.ctx.error('failed to clean', { domain, err }) return } - await ctx.sendResponse({}) + await ctx.sendResponse(ctx.requestId, {}) } } - -/** - * @public - */ -export interface BackupSession extends Session { - loadChunk: (ctx: ClientSessionCtx, domain: Domain, idx?: number) => Promise - closeChunk: (ctx: ClientSessionCtx, idx: number) => Promise - loadDocs: (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]) => Promise -} diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index d93c572e75..aeb5b3a904 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -33,8 +33,7 @@ import core, { type MeasureContext, type Tx, type TxWorkspaceEvent, - type WorkspaceId, - type WorkspaceIdWithUrl + type WorkspaceId } from '@hcengineering/core' import { unknownError, type Status } from '@hcengineering/platform' import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc' @@ -109,12 +108,7 @@ class TSessionManager implements SessionManager { constructor ( readonly ctx: MeasureContext, - readonly sessionFactory: ( - token: Token, - pipeline: Pipeline, - workspaceId: WorkspaceIdWithUrl, - branding: Branding | null - ) => Session, + readonly sessionFactory: (token: Token, workspace: Workspace) => Session, readonly timeouts: Timeouts, readonly brandingMap: BrandingMap, readonly profiling: @@ -288,8 +282,8 @@ class TSessionManager implements SessionManager { this.ticks++ } - createSession (token: Token, pipeline: Pipeline, workspaceId: WorkspaceIdWithUrl, branding: Branding | null): Session { - return this.sessionFactory(token, pipeline, workspaceId, branding) + createSession (token: Token, workspace: Workspace): Session { + return this.sessionFactory(token, workspace) } async getWorkspaceInfo (ctx: MeasureContext, token: string): Promise { @@ -473,7 +467,12 @@ class TSessionManager implements SessionManager { return { upgrade: true } } try { - pipeline = await ctx.with('💤 wait', { workspaceName }, () => (workspace as Workspace).pipeline) + if (workspace.pipeline instanceof Promise) { + pipeline = await workspace.pipeline + workspace.pipeline = pipeline + } else { + pipeline = workspace.pipeline + } } catch (err: any) { // Failed to create pipeline, etc Analytics.handleError(err) @@ -482,16 +481,7 @@ class TSessionManager implements SessionManager { } } - const session = this.createSession( - token, - pipeline, - { - ...workspace.workspaceId, - workspaceName: workspaceInfo.workspaceName ?? '', - workspaceUrl: workspaceInfo.workspaceUrl ?? '' - }, - branding - ) + const session = this.createSession(token, workspace) session.sessionId = sessionId !== undefined && (sessionId ?? '').trim().length > 0 ? sessionId : generateId() session.sessionInstanceId = generateId() @@ -508,7 +498,7 @@ class TSessionManager implements SessionManager { // We do not need to wait for set-status, just return session to client const _workspace = workspace void ctx - .with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true, _workspace.workspaceId)) + .with('set-status', {}, (ctx) => this.trySetStatus(ctx, pipeline, session, true, _workspace.workspaceId)) .catch(() => {}) if (this.timeMinutes > 0) { @@ -661,6 +651,7 @@ class TSessionManager implements SessionManager { workspaceUuid: string | undefined, branding: Branding | null ): Workspace { + const wsId = toWorkspaceString(token.workspace) const upgrade = token.extra?.model === 'upgrade' const context = ctx.newChild('🧲 session', {}) const pipelineCtx = context.newChild('🧲 pipeline-factory', {}) @@ -679,7 +670,11 @@ class TSessionManager implements SessionManager { sessions: new Map(), softShutdown: workspaceSoftShutdownTicks, upgrade, - workspaceId: token.workspace, + workspaceId: { + ...token.workspace, + workspaceName, + workspaceUrl + }, workspaceName, workspaceUuid, branding, @@ -688,12 +683,13 @@ class TSessionManager implements SessionManager { tickHandlers: new Map(), token: generateToken(systemAccountEmail, token.workspace) } - this.workspaces.set(toWorkspaceString(token.workspace), workspace) + this.workspaces.set(wsId, workspace) return workspace } private async trySetStatus ( ctx: MeasureContext, + pipeline: Pipeline, session: Session, online: boolean, workspaceId: WorkspaceId @@ -702,7 +698,7 @@ class TSessionManager implements SessionManager { if (current !== undefined) { await current } - const promise = this.setStatus(ctx, session, online, workspaceId) + const promise = this.setStatus(ctx, pipeline, session, online, workspaceId) this.statusPromises.set(session.getUser(), promise) await promise this.statusPromises.delete(session.getUser()) @@ -710,38 +706,28 @@ class TSessionManager implements SessionManager { private async setStatus ( ctx: MeasureContext, + pipeline: Pipeline, session: Session, online: boolean, workspaceId: WorkspaceId ): Promise { try { - const user = session.pipeline().context.modelDb.getAccountByEmail(session.getUser()) + const user = pipeline.context.modelDb.getAccountByEmail(session.getUser()) if (user === undefined) return - const clientCtx: ClientSessionCtx = { - sendResponse: async (msg) => { - // No response - }, - ctx, - sendError: async (msg, error: Status) => { - // Assume no error send - }, - sendPong: () => {} - } - - const status = (await session.findAllRaw(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0] + const status = (await pipeline.findAll(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0] const txFactory = new TxFactory(user._id, true) if (status === undefined) { const tx = txFactory.createTxCreateDoc(core.class.UserStatus, core.space.Space, { online, user: user._id }) - await session.tx(clientCtx, tx) + await pipeline.tx(ctx, [tx]) } else if (status.online !== online) { const tx = txFactory.createTxUpdateDoc(status._class, status.space, status._id, { online }) - await session.tx(clientCtx, tx) + await pipeline.tx(ctx, [tx]) } } catch {} } @@ -763,6 +749,7 @@ class TSessionManager implements SessionManager { this.sessions.delete(ws.id) if (workspace !== undefined) { workspace.sessions.delete(sessionRef.session.sessionId) + const pipeline = workspace.pipeline instanceof Promise ? await workspace.pipeline : workspace.pipeline workspace.tickHandlers.set(sessionRef.session.sessionId, { ticks: this.timeouts.reconnectTimeout * ticksPerSecond, @@ -773,9 +760,13 @@ class TSessionManager implements SessionManager { if (workspace !== undefined) { const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user) if (another === -1 && !workspace.upgrade) { - void this.trySetStatus(workspace.context, sessionRef.session, false, workspace.workspaceId).catch( - () => {} - ) + void this.trySetStatus( + workspace.context, + pipeline, + sessionRef.session, + false, + workspace.workspaceId + ).catch(() => {}) } } } @@ -929,6 +920,41 @@ class TSessionManager implements SessionManager { } } + createOpContext ( + ctx: MeasureContext, + pipeline: Pipeline, + request: Request, + service: Session, + ws: ConnectionSocket + ): ClientSessionCtx { + const st = Date.now() + return { + ctx, + pipeline, + requestId: request.id, + sendResponse: (reqId, msg) => + sendResponse(ctx, service, ws, { + id: reqId, + result: msg, + time: Date.now() - st, + bfst: Date.now(), + queue: service.requests.size + }), + sendPong: () => { + ws.sendPong() + }, + sendError: (reqId, msg, error: Status) => + sendResponse(ctx, service, ws, { + id: reqId, + result: msg, + error, + time: Date.now() - st, + bfst: Date.now(), + queue: service.requests.size + }) + } + } + handleRequest( requestCtx: MeasureContext, service: S, @@ -936,43 +962,38 @@ class TSessionManager implements SessionManager { request: Request, workspace: string // wsId, toWorkspaceString() ): Promise { - const backupMode = service.getMode() === 'backup' - - const userCtx = requestCtx.newChild( - '📞 client', - !backupMode - ? { - workspace: '🧲 ' + workspace - } - : {} - ) + const userCtx = requestCtx.newChild('📞 client', {}) // Calculate total number of clients const reqId = generateId() const st = Date.now() return userCtx - .with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { + .with('🧭 handleRequest', {}, async (ctx) => { if (request.time != null) { const delta = Date.now() - request.time requestCtx.measure('msg-receive-delta', delta) } - const wsRef = this.workspaces.get(workspace) - if (wsRef === undefined) { + if (service.workspace.closing !== undefined) { ws.send( ctx, { id: request.id, - error: unknownError('No workspace') + error: unknownError('Workspace is closing') }, service.binaryMode, service.useCompression ) return } - if (request.method === 'forceClose') { + if (request.id === -1 && request.method === 'hello') { + this.handleHello(request, service, ctx, workspace, ws, requestCtx) + return + } + if (request.id === -2 && request.method === 'forceClose') { let done = false - if (wsRef.upgrade) { + const wsRef = this.workspaces.get(workspace) + if (wsRef?.upgrade ?? false) { done = true this.ctx.warn('FORCE CLOSE', { workspace }) // In case of upgrade, we need to force close workspace not in interval handler @@ -985,62 +1006,6 @@ class TSessionManager implements SessionManager { ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression) return } - if (request.id === -1 && request.method === 'hello') { - const hello = request as HelloRequest - service.binaryMode = hello.binary ?? false - service.useCompression = hello.compression ?? false - - if (LOGGING_ENABLED) { - ctx.info('hello happen', { - workspace, - user: service.getUser(), - binary: service.binaryMode, - compression: service.useCompression, - timeToHello: Date.now() - service.createTime, - workspaceUsers: this.workspaces.get(workspace)?.sessions?.size, - totalUsers: this.sessions.size - }) - } - const reconnect = this.reconnectIds.has(service.sessionId) - if (reconnect) { - this.reconnectIds.delete(service.sessionId) - } - const helloResponse: HelloResponse = { - id: -1, - result: 'hello', - binary: service.binaryMode, - reconnect, - serverVersion: this.serverVersion - } - ws.send(requestCtx, helloResponse, false, false) - return - } - const opContext = (ctx: MeasureContext): ClientSessionCtx => ({ - sendResponse: async (msg) => { - await sendResponse(requestCtx, service, ws, { - id: request.id, - result: msg, - time: Date.now() - st, - bfst: Date.now(), - queue: service.requests.size - }) - userCtx.end() - }, - sendPong: () => { - ws.sendPong() - }, - ctx, - sendError: async (msg, error: Status) => { - await sendResponse(ctx, service, ws, { - id: request.id, - result: msg, - error, - time: Date.now() - st, - bfst: Date.now(), - queue: service.requests.size - }) - } - }) service.requests.set(reqId, { id: reqId, @@ -1052,11 +1017,16 @@ class TSessionManager implements SessionManager { return } + const pipeline = + service.workspace.pipeline instanceof Promise ? await service.workspace.pipeline : service.workspace.pipeline + const f = (service as any)[request.method] try { const params = [...request.params] - await ctx.with('🧨 process', {}, (callTx) => f.apply(service, [opContext(callTx), ...params])) + await ctx.with('🧨 process', {}, (callTx) => + f.apply(service, [this.createOpContext(callTx, pipeline, request, service, ws), ...params]) + ) } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { @@ -1079,16 +1049,48 @@ class TSessionManager implements SessionManager { service.requests.delete(reqId) }) } + + private handleHello( + request: Request, + service: S, + ctx: MeasureContext, + workspace: string, + ws: ConnectionSocket, + requestCtx: MeasureContext + ): void { + const hello = request as HelloRequest + service.binaryMode = hello.binary ?? false + service.useCompression = hello.compression ?? false + + if (LOGGING_ENABLED) { + ctx.info('hello happen', { + workspace, + user: service.getUser(), + binary: service.binaryMode, + compression: service.useCompression, + timeToHello: Date.now() - service.createTime, + workspaceUsers: this.workspaces.get(workspace)?.sessions?.size, + totalUsers: this.sessions.size + }) + } + const reconnect = this.reconnectIds.has(service.sessionId) + if (reconnect) { + this.reconnectIds.delete(service.sessionId) + } + const helloResponse: HelloResponse = { + id: -1, + result: 'hello', + binary: service.binaryMode, + reconnect, + serverVersion: this.serverVersion + } + ws.send(requestCtx, helloResponse, false, false) + } } export function createSessionManager ( ctx: MeasureContext, - sessionFactory: ( - token: Token, - pipeline: Pipeline, - workspaceId: WorkspaceIdWithUrl, - branding: Branding | null - ) => Session, + sessionFactory: (token: Token, workspace: Workspace) => Session, brandingMap: BrandingMap, timeouts: Timeouts, profiling: @@ -1110,12 +1112,7 @@ export function startSessionManager ( opt: { port: number pipelineFactory: PipelineFactory - sessionFactory: ( - token: Token, - pipeline: Pipeline, - workspaceId: WorkspaceIdWithUrl, - branding: Branding | null - ) => Session + sessionFactory: (token: Token, workspace: Workspace) => Session brandingMap: BrandingMap serverFactory: ServerFactory enableCompression?: boolean diff --git a/server/workspace-service/src/ws-operations.ts b/server/workspace-service/src/ws-operations.ts index e5587cb15f..602408aa56 100644 --- a/server/workspace-service/src/ws-operations.ts +++ b/server/workspace-service/src/ws-operations.ts @@ -258,12 +258,12 @@ export async function upgradeWorkspaceWith ( systemAccountEmail, 'backup', true, - { targets: {}, txes: [] }, + undefined, wsUrl, null, true, - new Map(), - new Map(), + undefined, + undefined, pipeline.context.modelDb ) ctx.contextData = contextData diff --git a/server/ws/src/__tests__/server.test.ts b/server/ws/src/__tests__/server.test.ts index ce07c04c7a..e8524542e2 100644 --- a/server/ws/src/__tests__/server.test.ts +++ b/server/ws/src/__tests__/server.test.ts @@ -39,8 +39,8 @@ import { type Tx, type TxResult } from '@hcengineering/core' -import { createDummyStorageAdapter } from '@hcengineering/server-core' import { ClientSession, startSessionManager } from '@hcengineering/server' +import { createDummyStorageAdapter } from '@hcengineering/server-core' import { startHttpServer } from '../server_http' import { genMinModel } from './minmodel' @@ -94,8 +94,7 @@ describe('server', () => { loadModel: async (ctx, lastModelTx, hash) => [] } }, - sessionFactory: (token, pipeline, workspaceId, branding) => - new ClientSession(token, pipeline, workspaceId, branding, true), + sessionFactory: (token, workspace) => new ClientSession(token, workspace, true), port: 3335, brandingMap: {}, serverFactory: startHttpServer, @@ -206,8 +205,7 @@ describe('server', () => { loadModel: async (ctx, lastModelTx, hash) => [] } }, - sessionFactory: (token, pipeline, workspaceId, branding) => - new ClientSession(token, pipeline, workspaceId, branding, true), + sessionFactory: (token, workspace) => new ClientSession(token, workspace, true), port: 3336, brandingMap: {}, serverFactory: startHttpServer, diff --git a/services/ses/pod-ses/package.json b/services/ses/pod-ses/package.json index deaaea8b9b..87d233158d 100644 --- a/services/ses/pod-ses/package.json +++ b/services/ses/pod-ses/package.json @@ -61,7 +61,6 @@ "aws-sdk": "^2.1423.0", "cors": "^2.8.5", "dotenv": "~16.0.0", - "express": "^4.21.2", - "got": "^11.8.3" + "express": "^4.21.2" } } diff --git a/workers/transactor/src/transactor.ts b/workers/transactor/src/transactor.ts index decbcaa149..d474090da5 100644 --- a/workers/transactor/src/transactor.ts +++ b/workers/transactor/src/transactor.ts @@ -1,7 +1,6 @@ // Copyright © 2024 Huly Labs. import { - Branding, generateId, type Class, type Doc, @@ -9,20 +8,17 @@ import { type FindOptions, type MeasureContext, type Ref, - type Tx, - type WorkspaceIdWithUrl + type Tx } from '@hcengineering/core' import { setMetadata } from '@hcengineering/platform' import { RPCHandler } from '@hcengineering/rpc' import { ClientSession, createSessionManager, doSessionOp, type WebsocketData } from '@hcengineering/server' import serverClient from '@hcengineering/server-client' import { - ClientSessionCtx, createDummyStorageAdapter, initStatisticsContext, loadBrandingMap, pingConst, - Pipeline, pongConst, Session, type ConnectionSocket, @@ -108,8 +104,7 @@ export class Transactor extends DurableObject { this.sessionManager = createSessionManager( this.measureCtx, - (token: Token, pipeline: Pipeline, workspaceId: WorkspaceIdWithUrl, branding: Branding | null) => - new ClientSession(token, pipeline, workspaceId, branding, false), + (token: Token, workspace) => new ClientSession(token, workspace, false), loadBrandingMap(), // TODO: Support branding map { pingTimeout: 10000, @@ -383,7 +378,9 @@ export class Transactor extends DurableObject { const cs = this.createDummyClientSocket() try { const session = await this.makeRpcSession(rawToken, cs) - result = await session.findAllRaw(this.measureCtx, _class, query ?? {}, options ?? {}) + const pipeline = + session.workspace.pipeline instanceof Promise ? await session.workspace.pipeline : session.workspace.pipeline + result = await pipeline.findAll(this.measureCtx, _class, query ?? {}, options ?? {}) } catch (error: any) { result = { error: `${error}` } } finally { @@ -397,21 +394,9 @@ export class Transactor extends DurableObject { const cs = this.createDummyClientSocket() try { const session = await this.makeRpcSession(rawToken, cs) - const sessionCtx: ClientSessionCtx = { - ctx: this.measureCtx, - sendResponse: async (msg) => { - result = msg - }, - // TODO: Inedeed, the pipeline doesn't return errors, - // it just logs them to console and return an empty result - sendError: async (msg, error) => { - result = { error: `${msg}`, status: `${error}` } - }, - sendPong: () => { - cs.sendPong() - } - } - await session.tx(sessionCtx, tx) + const pipeline = + session.workspace.pipeline instanceof Promise ? await session.workspace.pipeline : session.workspace.pipeline + await pipeline.tx(this.measureCtx, [tx]) } catch (error: any) { result = { error: `${error}` } } finally { @@ -420,35 +405,43 @@ export class Transactor extends DurableObject { return result } - async getModel (): Promise { + async getModel (rawToken: string): Promise { + let result: Tx[] = [] + const cs = this.createDummyClientSocket() + try { + const session = await this.makeRpcSession(rawToken, cs) + const pipeline = + session.workspace.pipeline instanceof Promise ? await session.workspace.pipeline : session.workspace.pipeline + const ret = await pipeline.loadModel(this.measureCtx, 0) + if (Array.isArray(ret)) { + result = ret + } else { + result = ret.transactions + } + } catch (error: any) { + result = [] + } finally { + await this.sessionManager.close(this.measureCtx, cs, this.workspace) + } + const encoder = new TextEncoder() - const buffer = encoder.encode(JSON.stringify(model)) + const buffer = encoder.encode(JSON.stringify(result)) const gzipAsync = promisify(gzip) const compressed = await gzipAsync(buffer) return compressed } - async getAccount (rawToken: string, workspaceId: string, tx: Tx): Promise { - let result + async getAccount (rawToken: string): Promise { const cs = this.createDummyClientSocket() try { const session = await this.makeRpcSession(rawToken, cs) - const sessionCtx: ClientSessionCtx = { - ctx: this.measureCtx, - sendResponse: async (msg) => { - result = msg - }, - sendError: async (msg, error) => { - result = { error: `${msg}`, status: `${error}` } - }, - sendPong: () => {} - } - await (session as any).getAccount(sessionCtx) + const pipeline = + session.workspace.pipeline instanceof Promise ? await session.workspace.pipeline : session.workspace.pipeline + return session.getRawAccount(pipeline) } catch (error: any) { - result = { error: `${error}` } + return { error: `${error}` } } finally { await this.sessionManager.close(this.measureCtx, cs, this.workspace) } - return result } }