Skip to content

Commit

Permalink
feat: allow for web sdk to be used without web worker (#178)
Browse files Browse the repository at this point in the history
Co-authored-by: DominicGBauer <[email protected]>
  • Loading branch information
DominicGBauer and DominicGBauer authored May 22, 2024
1 parent 3a584a0 commit 7943626
Show file tree
Hide file tree
Showing 21 changed files with 276 additions and 203 deletions.
5 changes: 5 additions & 0 deletions .changeset/rotten-cherries-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/web': minor
---

Allow package to be used without web workers
22 changes: 11 additions & 11 deletions packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
import {
type AbstractStreamingSyncImplementation,
type PowerSyncBackendConnector,
type BucketStorageAdapter,
type PowerSyncDatabaseOptions,
type PowerSyncCloseOptions,
type PowerSyncConnectionOptions,
AbstractPowerSyncDatabase,
AbstractStreamingSyncImplementation,
PowerSyncBackendConnector,
SqliteBucketStorage,
BucketStorageAdapter,
PowerSyncDatabaseOptions,
PowerSyncCloseOptions,
DEFAULT_POWERSYNC_CLOSE_OPTIONS,
PowerSyncConnectionOptions
DEFAULT_POWERSYNC_CLOSE_OPTIONS
} from '@powersync/common';

import { Mutex } from 'async-mutex';
import { WebRemote } from './sync/WebRemote';
import { SharedWebStreamingSyncImplementation } from './sync/SharedWebStreamingSyncImplementation';
import { SSRStreamingSyncImplementation } from './sync/SSRWebStreamingSyncImplementation';
import {
WebStreamingSyncImplementation,
WebStreamingSyncImplementationOptions
} from './sync/WebStreamingSyncImplementation';
import { Mutex } from 'async-mutex';

export interface WebPowerSyncFlags {
/**
* Enables multi tab support
*/
enableMultiTabs?: boolean;
useWebWorker?: boolean;
/**
* Open in SSR placeholder mode. DB operations and Sync operations will be a No-op
*/
Expand Down Expand Up @@ -126,8 +126,8 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
case flags?.enableMultiTabs:
if (!flags?.broadcastLogs) {
const warning = `
Multiple tabs are enabled, but broadcasting of logs is disabled.
Logs for shared sync worker will only be available in the shared worker context
Multiple tabs are enabled, but broadcasting of logs is disabled.
Logs for shared sync worker will only be available in the shared worker context
`;
const logger = this.options.logger;
logger ? logger.warn(warning) : console.warn(warning);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface WebPowerSyncOpenFactoryOptions extends PowerSyncOpenFactoryOpti
}

export const DEFAULT_POWERSYNC_FLAGS: WebPowerSyncOpenFlags = {
useWebWorker: true,
/**
* Multiple tabs are by default not supported on Android, iOS and Safari.
* Other platforms will have multiple tabs enabled by default.
Expand Down Expand Up @@ -80,6 +81,9 @@ export abstract class AbstractWebPowerSyncDatabaseOpenFactory extends AbstractPo
if (typeof this.options.flags?.enableMultiTabs != 'undefined') {
flags.enableMultiTabs = this.options.flags.enableMultiTabs;
}
if (flags.useWebWorker === false) {
flags.enableMultiTabs = false;
}
return flags;
}

Expand Down
63 changes: 37 additions & 26 deletions packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import {
BaseObserver,
DBAdapter,
DBAdapterListener,
DBGetUtils,
DBLockOptions,
LockContext,
PowerSyncOpenFactoryOptions,
QueryResult,
Transaction
type DBAdapter,
type DBAdapterListener,
type DBGetUtils,
type DBLockOptions,
type LockContext,
type PowerSyncOpenFactoryOptions,
type QueryResult,
type Transaction,
BaseObserver
} from '@powersync/common';
import * as Comlink from 'comlink';
import Logger, { ILogger } from 'js-logger';
import type { DBWorkerInterface, OpenDB } from '../../../worker/db/open-db';
import Logger, { type ILogger } from 'js-logger';
import type { DBFunctionsInterface, OpenDB } from '../../../shared/types';
import { _openDB } from '../../../shared/open-db';
import { getWorkerDatabaseOpener } from '../../../worker/db/open-worker-database';

export type WASQLiteFlags = {
enableMultiTabs?: boolean;
useWebWorker?: boolean;
};

export interface WASQLiteDBAdapterOptions extends Omit<PowerSyncOpenFactoryOptions, 'schema'> {
Expand All @@ -34,13 +36,13 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
private initialized: Promise<void>;
private logger: ILogger;
private dbGetHelpers: DBGetUtils | null;
private workerMethods: DBWorkerInterface | null;
private methods: DBFunctionsInterface | null;

constructor(protected options: WASQLiteDBAdapterOptions) {
super();
this.logger = Logger.get('WASQLite');
this.dbGetHelpers = null;
this.workerMethods = null;
this.methods = null;
this.initialized = this.init();
this.dbGetHelpers = this.generateDBHelpers({ execute: this._execute.bind(this) });
}
Expand All @@ -56,22 +58,31 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
getWorker() {}

protected async init() {
const { enableMultiTabs } = this.flags;
const { enableMultiTabs, useWebWorker } = this.flags;
if (!enableMultiTabs) {
this.logger.warn('Multiple tabs are not enabled in this browser');
}

const dbOpener = this.options.workerPort
? Comlink.wrap<OpenDB>(this.options.workerPort)
: getWorkerDatabaseOpener(this.options.dbFilename, enableMultiTabs);
if (useWebWorker) {
const dbOpener = this.options.workerPort
? Comlink.wrap<OpenDB>(this.options.workerPort)
: getWorkerDatabaseOpener(this.options.dbFilename, enableMultiTabs);

this.workerMethods = await dbOpener(this.options.dbFilename);
this.methods = await dbOpener(this.options.dbFilename);

this.workerMethods.registerOnTableChange(
Comlink.proxy((opType: number, tableName: string, rowId: number) => {
this.iterateListeners((cb) => cb.tablesUpdated?.({ opType, table: tableName, rowId }));
})
);
this.methods!.registerOnTableChange(
Comlink.proxy((opType: number, tableName: string, rowId: number) => {
this.iterateListeners((cb) => cb.tablesUpdated?.({ opType, table: tableName, rowId }));
})
);

return;
}
this.methods = await _openDB(this.options.dbFilename, { useWebWorker: false });

this.methods.registerOnTableChange((opType: number, tableName: string, rowId: number) => {
this.iterateListeners((cb) => cb.tablesUpdated?.({ opType, table: tableName, rowId }));
});
}

async execute(query: string, params?: any[] | undefined): Promise<QueryResult> {
Expand All @@ -87,7 +98,7 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
*/
private _execute = async (sql: string, bindings?: any[]): Promise<QueryResult> => {
await this.initialized;
const result = await this.workerMethods!.execute!(sql, bindings);
const result = await this.methods!.execute!(sql, bindings);
return {
...result,
rows: {
Expand All @@ -102,7 +113,7 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
*/
private _executeBatch = async (query: string, params?: any[]): Promise<QueryResult> => {
await this.initialized;
const result = await this.workerMethods!.executeBatch!(query, params);
const result = await this.methods!.executeBatch!(query, params);
return {
...result,
rows: undefined
Expand All @@ -115,7 +126,7 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
* tabs are still using it.
*/
close() {
this.workerMethods?.close?.();
this.methods?.close?.();
}

async getAll<T>(sql: string, parameters?: any[] | undefined): Promise<T[]> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,14 @@
import * as SQLite from '@journeyapps/wa-sqlite';
import '@journeyapps/wa-sqlite';
import * as Comlink from 'comlink';
import { QueryResult } from '@powersync/common';

export type WASQLExecuteResult = Omit<QueryResult, 'rows'> & {
rows: {
_array: any[];
length: number;
};
};

export type DBWorkerInterface = {
// Close is only exposed when used in a single non shared webworker
close?: () => void;
execute: WASQLiteExecuteMethod;
executeBatch: WASQLiteExecuteBatchMethod;
registerOnTableChange: (callback: OnTableChangeCallback) => void;
};

export type WASQLiteExecuteMethod = (sql: string, params?: any[]) => Promise<WASQLExecuteResult>;
export type WASQLiteExecuteBatchMethod = (sql: string, params?: any[]) => Promise<WASQLExecuteResult>;
export type OnTableChangeCallback = (opType: number, tableName: string, rowId: number) => void;
export type OpenDB = (dbFileName: string) => DBWorkerInterface;

export type SQLBatchTuple = [string] | [string, Array<any> | Array<Array<any>>];
import type { DBFunctionsInterface, OnTableChangeCallback, WASQLExecuteResult } from './types';

let nextId = 1;

export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {
export async function _openDB(
dbFileName: string,
options: { useWebWorker: boolean } = { useWebWorker: true }
): Promise<DBFunctionsInterface> {
const { default: moduleFactory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs');
const module = await moduleFactory();
const sqlite3 = SQLite.Factory(module);
Expand All @@ -47,14 +28,6 @@ export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {
Array.from(listeners.values()).forEach((l) => l(opType, tableName, rowId));
});

const registerOnTableChange = (callback: OnTableChangeCallback) => {
const id = nextId++;
listeners.set(id, callback);
return Comlink.proxy(() => {
listeners.delete(id);
});
};

/**
* This executes single SQL statements inside a requested lock.
*/
Expand Down Expand Up @@ -198,12 +171,37 @@ export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {
});
};

if (options.useWebWorker) {
const registerOnTableChange = (callback: OnTableChangeCallback) => {
const id = nextId++;
listeners.set(id, callback);
return Comlink.proxy(() => {
listeners.delete(id);
});
};

return {
execute: Comlink.proxy(execute),
executeBatch: Comlink.proxy(executeBatch),
registerOnTableChange: Comlink.proxy(registerOnTableChange),
close: Comlink.proxy(() => {
sqlite3.close(db);
})
};
}

const registerOnTableChange = (callback: OnTableChangeCallback) => {
const id = nextId++;
listeners.set(id, callback);
return () => {
listeners.delete(id);
};
};

return {
execute: Comlink.proxy(execute),
executeBatch: Comlink.proxy(executeBatch),
registerOnTableChange: Comlink.proxy(registerOnTableChange),
close: Comlink.proxy(() => {
sqlite3.close(db);
})
execute: execute,
executeBatch: executeBatch,
registerOnTableChange: registerOnTableChange,
close: () => sqlite3.close(db)
};
}
28 changes: 28 additions & 0 deletions packages/web/src/shared/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type { QueryResult } from '@powersync/common';

export type WASQLExecuteResult = Omit<QueryResult, 'rows'> & {
rows: {
_array: any[];
length: number;
};
};

export type DBFunctionsInterface = {
// Close is only exposed when used in a single non shared webworker
close?: () => void;
execute: WASQLiteExecuteMethod;
executeBatch: WASQLiteExecuteBatchMethod;
registerOnTableChange: (callback: OnTableChangeCallback) => void;
};

/**
* @deprecated use [DBFunctionsInterface instead]
*/
export type DBWorkerInterface = DBFunctionsInterface;

export type WASQLiteExecuteMethod = (sql: string, params?: any[]) => Promise<WASQLExecuteResult>;
export type WASQLiteExecuteBatchMethod = (sql: string, params?: any[]) => Promise<WASQLExecuteResult>;
export type OnTableChangeCallback = (opType: number, tableName: string, rowId: number) => void;
export type OpenDB = (dbFileName: string) => DBWorkerInterface;

export type SQLBatchTuple = [string] | [string, Array<any> | Array<Array<any>>];
9 changes: 4 additions & 5 deletions packages/web/src/worker/db/SharedWASQLiteDB.worker.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import '@journeyapps/wa-sqlite';

import * as Comlink from 'comlink';

import { DBWorkerInterface, _openDB } from './open-db';
import { _openDB } from '../../shared/open-db';
import type { DBFunctionsInterface } from '../../shared/types';

/**
* Keeps track of open DB connections and the clients which
* are using it.
*/
type SharedDBWorkerConnection = {
clientIds: Set<number>;
db: DBWorkerInterface;
db: DBFunctionsInterface;
};

const _self: SharedWorkerGlobalScope = self as any;
Expand All @@ -20,7 +19,7 @@ const OPEN_DB_LOCK = 'open-wasqlite-db';

let nextClientId = 1;

const openDB = async (dbFileName: string): Promise<DBWorkerInterface> => {
const openDB = async (dbFileName: string): Promise<DBFunctionsInterface> => {
// Prevent multiple simultaneous opens from causing race conditions
return navigator.locks.request(OPEN_DB_LOCK, async () => {
const clientId = nextClientId++;
Expand Down
2 changes: 1 addition & 1 deletion packages/web/src/worker/db/WASQLiteDB.worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as Comlink from 'comlink';
import { _openDB } from './open-db';
import { _openDB } from '../../shared/open-db';

Comlink.expose(async (dbFileName: string) => Comlink.proxy(await _openDB(dbFileName)));
2 changes: 1 addition & 1 deletion packages/web/src/worker/db/open-worker-database.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Comlink from 'comlink';
import { OpenDB } from './open-db';
import type { OpenDB } from '../../shared/types';

/**
* Opens a shared or dedicated worker which exposes opening of database connections
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PowerSyncCredentials, SyncStatusOptions } from '@powersync/common';
import type { PowerSyncCredentials, SyncStatusOptions } from '@powersync/common';

/**
* The client side port should provide these methods.
Expand Down
4 changes: 2 additions & 2 deletions packages/web/src/worker/sync/BroadcastLogger.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Logger, { ILogLevel, ILogger } from 'js-logger';
import Logger, { type ILogLevel, type ILogger } from 'js-logger';
import { type WrappedSyncPort } from './SharedSyncImplementation';

/**
Expand Down Expand Up @@ -88,7 +88,7 @@ export class BroadcastLogger implements ILogger {
* and proceeds to execute for all clients.
*/
protected async iterateClients(callback: (client: WrappedSyncPort) => Promise<void>) {
for (let client of this.clients) {
for (const client of this.clients) {
try {
await callback(client);
} catch (ex) {
Expand Down
Loading

0 comments on commit 7943626

Please sign in to comment.