Skip to content

Commit

Permalink
fix shared sync credentials refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Aug 8, 2024
1 parent 2203b90 commit ad4d20b
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 87 deletions.
19 changes: 11 additions & 8 deletions packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common';
import * as Comlink from 'comlink';
import {
WebStreamingSyncImplementation,
WebStreamingSyncImplementationOptions
} from './WebStreamingSyncImplementation';
import { openWorkerDatabasePort } from '../../worker/db/open-worker-database';
import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider';
import {
ManualSharedSyncPayload,
SharedSyncClientEvent,
SharedSyncImplementation
} from '../../worker/sync/SharedSyncImplementation';
import { AbstractSharedSyncClientProvider } from '../../worker/sync/AbstractSharedSyncClientProvider';
import { PowerSyncConnectionOptions, PowerSyncCredentials, SyncStatus, SyncStatusOptions } from '@powersync/common';
import { openWorkerDatabasePort } from '../../worker/db/open-worker-database';
import {
WebStreamingSyncImplementation,
WebStreamingSyncImplementationOptions
} from './WebStreamingSyncImplementation';

/**
* The shared worker will trigger methods on this side of the message port
Expand Down Expand Up @@ -144,6 +144,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
*/
async connect(options?: PowerSyncConnectionOptions): Promise<void> {
await this.waitForReady();
// This is needed since a new tab won't have any reference to the
// shared worker sync implementation since that is only created on the first call to `connect`.
await this.disconnect();
return this.syncManager.connect(options);
}

Expand All @@ -170,9 +173,9 @@ export class SharedWebStreamingSyncImplementation extends WebStreamingSyncImplem
};

this.messagePort.postMessage(closeMessagePayload);

// Release the proxy
this.syncManager[Comlink.releaseProxy]();
this.messagePort.close();
}

async waitForReady() {
Expand Down
186 changes: 107 additions & 79 deletions packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import * as Comlink from 'comlink';
import Logger, { type ILogger } from 'js-logger';
import {
type AbstractStreamingSyncImplementation,
type StreamingSyncImplementation,
type LockOptions,
type PowerSyncConnectionOptions,
type StreamingSyncImplementation,
type StreamingSyncImplementationListener,
type SyncStatusOptions,
type PowerSyncConnectionOptions,
AbortOperation,
BaseObserver,
DBAdapter,
SqliteBucketStorage,
SyncStatus,
AbortOperation
SyncStatus
} from '@powersync/common';
import { Mutex } from 'async-mutex';
import * as Comlink from 'comlink';
import Logger, { type ILogger } from 'js-logger';
import { WebRemote } from '../../db/sync/WebRemote';
import {
WebStreamingSyncImplementation,
WebStreamingSyncImplementationOptions
} from '../../db/sync/WebStreamingSyncImplementation';
import { Mutex } from 'async-mutex';
import { WebRemote } from '../../db/sync/WebRemote';

import { WASQLiteDBAdapter } from '../../db/adapters/wa-sqlite/WASQLiteDBAdapter';
import { AbstractSharedSyncClientProvider } from './AbstractSharedSyncClientProvider';
Expand Down Expand Up @@ -66,20 +67,28 @@ export class SharedSyncImplementation
implements StreamingSyncImplementation
{
protected ports: WrappedSyncPort[];
protected syncStreamClient?: AbstractStreamingSyncImplementation;
protected syncStreamClient: AbstractStreamingSyncImplementation | null;

protected isInitialized: Promise<void>;
protected statusListener?: () => void;

protected fetchCredentialsController?: RemoteOperationAbortController;
protected uploadDataController?: RemoteOperationAbortController;

protected dbAdapter: DBAdapter | null;
protected syncParams: SharedSyncInitOptions | null;
protected logger: ILogger;

syncStatus: SyncStatus;
broadCastLogger: ILogger;

constructor() {
super();
this.ports = [];
this.dbAdapter = null;
this.syncParams = null;
this.syncStreamClient = null;
this.logger = Logger.get('shared-sync');

this.isInitialized = new Promise((resolve) => {
const callback = this.registerListener({
Expand Down Expand Up @@ -115,82 +124,29 @@ export class SharedSyncImplementation
* Configures the DBAdapter connection and a streaming sync client.
*/
async init(dbWorkerPort: MessagePort, params: SharedSyncInitOptions) {
if (this.syncStreamClient) {
if (this.dbAdapter) {
// Cannot modify already existing sync implementation
return;
}

const logger = params.streamOptions?.flags?.broadcastLogs ? this.broadCastLogger : Logger.get('shared-sync');
this.dbAdapter = new WASQLiteDBAdapter({
dbFilename: params.dbName,
workerPort: dbWorkerPort,
flags: { enableMultiTabs: true, useWebWorker: true },
logger: this.logger
});

this.syncParams = params;

if (params.streamOptions?.flags?.broadcastLogs) {
this.logger = this.broadCastLogger;
}

self.onerror = (event) => {
// Share any uncaught events on the broadcast logger
logger.error('Uncaught exception in PowerSync shared sync worker', event);
this.logger.error('Uncaught exception in PowerSync shared sync worker', event);
};

this.syncStreamClient = new WebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(
new WASQLiteDBAdapter({
dbFilename: params.dbName,
workerPort: dbWorkerPort,
flags: { enableMultiTabs: true, useWebWorker: true },
logger
}),
new Mutex(),
logger
),
remote: new WebRemote({
fetchCredentials: async () => {
const lastPort = this.ports[this.ports.length - 1];
return new Promise(async (resolve, reject) => {
const abortController = new AbortController();
this.fetchCredentialsController = {
controller: abortController,
activePort: lastPort
};

abortController.signal.onabort = reject;
try {
resolve(await lastPort.clientProvider.fetchCredentials());
} catch (ex) {
reject(ex);
} finally {
this.fetchCredentialsController = undefined;
}
});
}
}),
uploadCrud: async () => {
const lastPort = this.ports[this.ports.length - 1];

return new Promise(async (resolve, reject) => {
const abortController = new AbortController();
this.uploadDataController = {
controller: abortController,
activePort: lastPort
};

// Resolving will make it retry
abortController.signal.onabort = () => resolve();
try {
resolve(await lastPort.clientProvider.uploadCrud());
} catch (ex) {
reject(ex);
} finally {
this.uploadDataController = undefined;
}
});
},
...params.streamOptions,
// Logger cannot be transferred just yet
logger
});

this.syncStreamClient.registerListener({
statusChanged: (status) => {
this.updateAllStatuses(status.toJSON());
}
});

this.iterateListeners((l) => l.initialized?.());
}

Expand All @@ -209,13 +165,28 @@ export class SharedSyncImplementation
async connect(options?: PowerSyncConnectionOptions) {
await this.waitForReady();
// This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized
return navigator.locks.request('shared-sync-connect', () => this.syncStreamClient?.connect(options));
return navigator.locks.request('shared-sync-connect', async () => {
this.syncStreamClient = this.generateStreamingImplementation();

this.syncStreamClient.registerListener({
statusChanged: (status) => {
this.updateAllStatuses(status.toJSON());
}
});

await this.syncStreamClient?.connect(options);
});
}

async disconnect() {
this.logger.info('disconnecting');
await this.waitForReady();
// This effectively queues connect and disconnect calls. Ensuring multiple tabs' requests are synchronized
return navigator.locks.request('shared-sync-connect', () => this.syncStreamClient?.disconnect());
return navigator.locks.request('shared-sync-connect', async () => {
await this.syncStreamClient?.disconnect();
await this.syncStreamClient?.dispose();
this.syncStreamClient = null;
});
}

/**
Expand Down Expand Up @@ -281,6 +252,62 @@ export class SharedSyncImplementation
return this.syncStreamClient!.getWriteCheckpoint();
}

protected generateStreamingImplementation() {
// The waitForReady call ensures these should be present
const syncParams = this.syncParams!;

// Create a new StreamingSyncImplementation for each connect call. This is usually done is all SDKs.
return new WebStreamingSyncImplementation({
adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger),
remote: new WebRemote({
fetchCredentials: async () => {
const lastPort = this.ports[this.ports.length - 1];
return new Promise(async (resolve, reject) => {
const abortController = new AbortController();
this.fetchCredentialsController = {
controller: abortController,
activePort: lastPort
};

abortController.signal.onabort = reject;
try {
console.log('calling the last port client provider for credentials');
resolve(await lastPort.clientProvider.fetchCredentials());
} catch (ex) {
reject(ex);
} finally {
this.fetchCredentialsController = undefined;
}
});
}
}),
uploadCrud: async () => {
const lastPort = this.ports[this.ports.length - 1];

return new Promise(async (resolve, reject) => {
const abortController = new AbortController();
this.uploadDataController = {
controller: abortController,
activePort: lastPort
};

// Resolving will make it retry
abortController.signal.onabort = () => resolve();
try {
resolve(await lastPort.clientProvider.uploadCrud());
} catch (ex) {
reject(ex);
} finally {
this.uploadDataController = undefined;
}
});
},
...syncParams.streamOptions,
// Logger cannot be transferred just yet
logger: this.logger
});
}

/**
* A method to update the all shared statuses for each
* client.
Expand All @@ -296,7 +323,8 @@ export class SharedSyncImplementation
*/
private _testUpdateAllStatuses(status: SyncStatusOptions) {
if (!this.syncStreamClient) {
console.warn('no stream client has been initialized yet');
// This is just for testing purposes
this.syncStreamClient = this.generateStreamingImplementation();
}

// Only assigning, don't call listeners for this test
Expand Down

0 comments on commit ad4d20b

Please sign in to comment.