From 37e266d74dc415676ac2a957d750fbcd1f11b81c Mon Sep 17 00:00:00 2001 From: stevensJourney <51082125+stevensJourney@users.noreply.github.com> Date: Thu, 21 Mar 2024 19:58:24 +0200 Subject: [PATCH] [Fix] concurrent connections issue (#101) --- .changeset/lemon-zoos-sort.md | 5 ++++ .changeset/lucky-planes-prove.md | 5 ++++ .changeset/modern-terms-admire.md | 5 ++++ .../AbstractStreamingSyncImplementation.ts | 28 +++++++++++++++++-- .../SharedWebStreamingSyncImplementation.ts | 2 -- .../src/worker/sync/BroadcastLogger.ts | 24 ++++++++++++++-- .../worker/sync/SharedSyncImplementation.ts | 3 +- 7 files changed, 64 insertions(+), 8 deletions(-) create mode 100644 .changeset/lemon-zoos-sort.md create mode 100644 .changeset/lucky-planes-prove.md create mode 100644 .changeset/modern-terms-admire.md diff --git a/.changeset/lemon-zoos-sort.md b/.changeset/lemon-zoos-sort.md new file mode 100644 index 00000000..b6ce1ad3 --- /dev/null +++ b/.changeset/lemon-zoos-sort.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/powersync-sdk-web': patch +--- + +Added some serialization checks for broadcasted logs from shared web worker. Unserializable items will return a warning. diff --git a/.changeset/lucky-planes-prove.md b/.changeset/lucky-planes-prove.md new file mode 100644 index 00000000..2d499883 --- /dev/null +++ b/.changeset/lucky-planes-prove.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/powersync-sdk-common': patch +--- + +Fixed issue where sync stream exceptions would not close previous streaming connections. diff --git a/.changeset/modern-terms-admire.md b/.changeset/modern-terms-admire.md new file mode 100644 index 00000000..f008ab65 --- /dev/null +++ b/.changeset/modern-terms-admire.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/powersync-sdk-web': patch +--- + +Fixed issue where SyncBucketStorage logs would not be broadcasted from the shared sync worker to individual tabs. diff --git a/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index ec681dc1..18cf1e66 100644 --- a/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/powersync-sdk-common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -220,7 +220,10 @@ export abstract class AbstractStreamingSyncImplementation } } - connect() { + async connect() { + if (this.abortController) { + await this.disconnect(); + } this.abortController = new AbortController(); this.streamingSync(this.abortController.signal); return this.waitForStatus({ connected: true }); @@ -231,6 +234,8 @@ export abstract class AbstractStreamingSyncImplementation throw new Error('Disconnect not possible'); } this.abortController.abort('Disconnected'); + this.abortController = null; + this.updateSyncStatus({ connected: false }); } /** @@ -249,7 +254,14 @@ export abstract class AbstractStreamingSyncImplementation crudUpdate: () => this.triggerCrudUpload() }); + /** + * Create a new abort controller which aborts items downstream. + * This is needed to close any previous connections on exception. + */ + let nestedAbortController = new AbortController(); + signal.addEventListener('abort', () => { + nestedAbortController.abort(); this.crudUpdateListener?.(); this.crudUpdateListener = undefined; this.updateSyncStatus({ @@ -265,7 +277,10 @@ export abstract class AbstractStreamingSyncImplementation if (signal?.aborted) { break; } - await this.streamingSyncIteration(signal); + const { retry } = await this.streamingSyncIteration(nestedAbortController.signal); + if (!retry) { + break; + } // Continue immediately } catch (ex) { this.logger.error(ex); @@ -274,8 +289,17 @@ export abstract class AbstractStreamingSyncImplementation }); // On error, wait a little before retrying await this.delayRetry(); + } finally { + // Abort any open network requests. Create a new nested controller for retry. + nestedAbortController.abort(); + nestedAbortController = new AbortController(); } } + + // Mark as disconnected if here + if (this.abortController) { + await this.disconnect(); + } } protected async streamingSyncIteration(signal: AbortSignal, progress?: () => void): Promise<{ retry?: boolean }> { diff --git a/packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts index b53f48a2..706806cb 100644 --- a/packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/powersync-sdk-web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -165,8 +165,6 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem data: {} }; - (localStorage as any).setItem('posting close' + Math.random(), `$}`); - this.messagePort.postMessage(closeMessagePayload); // Release the proxy diff --git a/packages/powersync-sdk-web/src/worker/sync/BroadcastLogger.ts b/packages/powersync-sdk-web/src/worker/sync/BroadcastLogger.ts index a681b25d..9ac425c9 100644 --- a/packages/powersync-sdk-web/src/worker/sync/BroadcastLogger.ts +++ b/packages/powersync-sdk-web/src/worker/sync/BroadcastLogger.ts @@ -40,17 +40,17 @@ export class BroadcastLogger implements ILogger { log(...x: any[]): void { console.log(...x); - this.clients.forEach((p) => p.clientProvider.log(...x)); + this.sanitizeArgs(x, (params) => this.clients.forEach((p) => p.clientProvider.log(...params))); } warn(...x: any[]): void { console.warn(...x); - this.clients.forEach((p) => p.clientProvider.warn(...x)); + this.sanitizeArgs(x, (params) => this.clients.forEach((p) => p.clientProvider.warn(...params))); } error(...x: any[]): void { console.error(...x); - this.clients.forEach((p) => p.clientProvider.error(...x)); + this.sanitizeArgs(x, (params) => this.clients.forEach((p) => p.clientProvider.error(...params))); } time(label: string): void { @@ -76,4 +76,22 @@ export class BroadcastLogger implements ILogger { // Levels are not adjustable on this level. return true; } + + /** + * Guards against any logging errors. + * We don't want a logging exception to cause further issues upstream + */ + private sanitizeArgs(x: any[], handler: (...params: any[]) => void) { + const sanitizedParams = x.map((param) => { + try { + // Try and clone here first. If it fails it won't be passable over a MessagePort + return structuredClone(x); + } catch (ex) { + console.error(ex); + return 'Could not serialize log params. Check shared worker logs for more details.'; + } + }); + + return handler(...sanitizedParams); + } } diff --git a/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts b/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts index d4ac15dc..d77503bf 100644 --- a/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/powersync-sdk-web/src/worker/sync/SharedSyncImplementation.ts @@ -125,7 +125,8 @@ export class SharedSyncImplementation flags: { enableMultiTabs: true }, logger: this.broadCastLogger }), - new Mutex() + new Mutex(), + this.broadCastLogger ), remote: new WebRemote({ fetchCredentials: async () => {