From ad4d20b2bbeae33a7b3e777a78ee721e25414096 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Thu, 8 Aug 2024 14:34:53 +0200 Subject: [PATCH] fix shared sync credentials refresh --- .../SharedWebStreamingSyncImplementation.ts | 19 +- .../worker/sync/SharedSyncImplementation.ts | 186 ++++++++++-------- 2 files changed, 118 insertions(+), 87 deletions(-) diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index 5dc0a8d7..def6f503 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -1,16 +1,16 @@ +import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common'; import * as Comlink from 'comlink'; -import { - WebStreamingSyncImplementation, - WebStreamingSyncImplementationOptions -} from './WebStreamingSyncImplementation'; +import { openWorkerDatabasePort } from '../../worker/db/open-worker-database'; +import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider'; import { ManualSharedSyncPayload, SharedSyncClientEvent, SharedSyncImplementation } from '../../worker/sync/SharedSyncImplementation'; -import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider'; -import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common'; -import { openWorkerDatabasePort } from '../../worker/db/open-worker-database'; +import { + WebStreamingSyncImplementation, + WebStreamingSyncImplementationOptions +} from './WebStreamingSyncImplementation'; /** * The shared worker will trigger methods on this side of the message port @@ -144,6 +144,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem */ async connect(options?: PowerSyncConnectionOptions): Promise { await this.waitForReady(); + // This is needed since a new tab won't have any reference to the + // shared worker sync implementation since that is only created on the first call to `connect`. + await this.disconnect(); return this.syncManager.connect(options); } @@ -170,9 +173,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem }; this.messagePort.postMessage(closeMessagePayload); - // Release the proxy this.syncManager[Comlink.releaseProxy](); + this.messagePort.close(); } async waitForReady() { diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index a87b8214..0ab6a4d0 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -1,23 +1,24 @@ -import * as Comlink from 'comlink'; -import Logger, { type ILogger } from 'js-logger'; import { type AbstractStreamingSyncImplementation, - type StreamingSyncImplementation, type LockOptions, + type PowerSyncConnectionOptions, + type StreamingSyncImplementation, type StreamingSyncImplementationListener, type SyncStatusOptions, - type PowerSyncConnectionOptions, + AbortOperation, BaseObserver, + DBAdapter, SqliteBucketStorage, - SyncStatus, - AbortOperation + SyncStatus } from '@powersync/common'; +import { Mutex } from 'async-mutex'; +import * as Comlink from 'comlink'; +import Logger, { type ILogger } from 'js-logger'; +import { WebRemote } from '../../db/sync/WebRemote'; import { WebStreamingSyncImplementation, WebStreamingSyncImplementationOptions } from '../../db/sync/WebStreamingSyncImplementation'; -import { Mutex } from 'async-mutex'; -import { WebRemote } from '../../db/sync/WebRemote'; import { WASQLiteDBAdapter } from '../../db/adapters/wa-sqlite/WASQLiteDBAdapter'; import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProvider'; @@ -66,7 +67,7 @@ export class SharedSyncImplementation implements StreamingSyncImplementation { protected ports: WrappedSyncPort[]; - protected syncStreamClient?: AbstractStreamingSyncImplementation; + protected syncStreamClient: AbstractStreamingSyncImplementation | null; protected isInitialized: Promise; protected statusListener?: () => void; @@ -74,12 +75,20 @@ export class SharedSyncImplementation protected fetchCredentialsController?: RemoteOperationAbortController; protected uploadDataController?: RemoteOperationAbortController; + protected dbAdapter: DBAdapter | null; + protected syncParams: SharedSyncInitOptions | null; + protected logger: ILogger; + syncStatus: SyncStatus; broadCastLogger: ILogger; constructor() { super(); this.ports = []; + this.dbAdapter = null; + this.syncParams = null; + this.syncStreamClient = null; + this.logger = Logger.get('shared-sync'); this.isInitialized = new Promise((resolve) => { const callback = this.registerListener({ @@ -115,82 +124,29 @@ export class SharedSyncImplementation * Configures the DBAdapter connection and a streaming sync client. */ async init(dbWorkerPort: MessagePort, params: SharedSyncInitOptions) { - if (this.syncStreamClient) { + if (this.dbAdapter) { // Cannot modify already existing sync implementation return; } - const logger = params.streamOptions?.flags?.broadcastLogs ? this.broadCastLogger : Logger.get('shared-sync'); + this.dbAdapter = new WASQLiteDBAdapter({ + dbFilename: params.dbName, + workerPort: dbWorkerPort, + flags: { enableMultiTabs: true, useWebWorker: true }, + logger: this.logger + }); + + this.syncParams = params; + + if (params.streamOptions?.flags?.broadcastLogs) { + this.logger = this.broadCastLogger; + } self.onerror = (event) => { // Share any uncaught events on the broadcast logger - logger.error('Uncaught exception in PowerSync shared sync worker', event); + this.logger.error('Uncaught exception in PowerSync shared sync worker', event); }; - this.syncStreamClient = new WebStreamingSyncImplementation({ - adapter: new SqliteBucketStorage( - new WASQLiteDBAdapter({ - dbFilename: params.dbName, - workerPort: dbWorkerPort, - flags: { enableMultiTabs: true, useWebWorker: true }, - logger - }), - new Mutex(), - logger - ), - remote: new WebRemote({ - fetchCredentials: async () => { - const lastPort = this.ports[this.ports.length - 1]; - return new Promise(async (resolve, reject) => { - const abortController = new AbortController(); - this.fetchCredentialsController = { - controller: abortController, - activePort: lastPort - }; - - abortController.signal.onabort = reject; - try { - resolve(await lastPort.clientProvider.fetchCredentials()); - } catch (ex) { - reject(ex); - } finally { - this.fetchCredentialsController = undefined; - } - }); - } - }), - uploadCrud: async () => { - const lastPort = this.ports[this.ports.length - 1]; - - return new Promise(async (resolve, reject) => { - const abortController = new AbortController(); - this.uploadDataController = { - controller: abortController, - activePort: lastPort - }; - - // Resolving will make it retry - abortController.signal.onabort = () => resolve(); - try { - resolve(await lastPort.clientProvider.uploadCrud()); - } catch (ex) { - reject(ex); - } finally { - this.uploadDataController = undefined; - } - }); - }, - ...params.streamOptions, - // Logger cannot be transferred just yet - logger - }); - - this.syncStreamClient.registerListener({ - statusChanged: (status) => { - this.updateAllStatuses(status.toJSON()); - } - }); - this.iterateListeners((l) => l.initialized?.()); } @@ -209,13 +165,28 @@ export class SharedSyncImplementation async connect(options?: PowerSyncConnectionOptions) { await this.waitForReady(); // This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized - return navigator.locks.request('shared-sync-connect', () => this.syncStreamClient?.connect(options)); + return navigator.locks.request('shared-sync-connect', async () => { + this.syncStreamClient = this.generateStreamingImplementation(); + + this.syncStreamClient.registerListener({ + statusChanged: (status) => { + this.updateAllStatuses(status.toJSON()); + } + }); + + await this.syncStreamClient?.connect(options); + }); } async disconnect() { + this.logger.info('disconnecting'); await this.waitForReady(); // This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized - return navigator.locks.request('shared-sync-connect', () => this.syncStreamClient?.disconnect()); + return navigator.locks.request('shared-sync-connect', async () => { + await this.syncStreamClient?.disconnect(); + await this.syncStreamClient?.dispose(); + this.syncStreamClient = null; + }); } /** @@ -281,6 +252,62 @@ export class SharedSyncImplementation return this.syncStreamClient!.getWriteCheckpoint(); } + protected generateStreamingImplementation() { + // The waitForReady call ensures these should be present + const syncParams = this.syncParams!; + + // Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs. + return new WebStreamingSyncImplementation({ + adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger), + remote: new WebRemote({ + fetchCredentials: async () => { + const lastPort = this.ports[this.ports.length - 1]; + return new Promise(async (resolve, reject) => { + const abortController = new AbortController(); + this.fetchCredentialsController = { + controller: abortController, + activePort: lastPort + }; + + abortController.signal.onabort = reject; + try { + console.log('calling the last port client provider for credentials'); + resolve(await lastPort.clientProvider.fetchCredentials()); + } catch (ex) { + reject(ex); + } finally { + this.fetchCredentialsController = undefined; + } + }); + } + }), + uploadCrud: async () => { + const lastPort = this.ports[this.ports.length - 1]; + + return new Promise(async (resolve, reject) => { + const abortController = new AbortController(); + this.uploadDataController = { + controller: abortController, + activePort: lastPort + }; + + // Resolving will make it retry + abortController.signal.onabort = () => resolve(); + try { + resolve(await lastPort.clientProvider.uploadCrud()); + } catch (ex) { + reject(ex); + } finally { + this.uploadDataController = undefined; + } + }); + }, + ...syncParams.streamOptions, + // Logger cannot be transferred just yet + logger: this.logger + }); + } + /** * A method to update the all shared statuses for each * client. @@ -296,7 +323,8 @@ export class SharedSyncImplementation */ private _testUpdateAllStatuses(status: SyncStatusOptions) { if (!this.syncStreamClient) { - console.warn('no stream client has been initialized yet'); + // This is just for testing purposes + this.syncStreamClient = this.generateStreamingImplementation(); } // Only assigning, don't call listeners for this test