Skip to content

Commit

Permalink
wip: multiple connections
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Aug 8, 2024
1 parent 039ae8d commit 62aa5e6
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ android {
dependencies {
// The version of react-native is set by the React Native Gradle Plugin
implementation("com.facebook.react:react-android")
implementation 'co.powersync:powersync-sqlite-core:0.1.7'

def isGifEnabled = (findProperty('expo.gif.enabled') ?: "") == "true";
def isWebpEnabled = (findProperty('expo.webp.enabled') ?: "") == "true";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,10 @@
LIBRARY_SEARCH_PATHS = "$(SDKROOT)/usr/lib/swift\"$(inherited)\"";
MTL_ENABLE_DEBUG_INFO = YES;
ONLY_ACTIVE_ARCH = YES;
OTHER_LDFLAGS = "$(inherited) ";
OTHER_LDFLAGS = (
"$(inherited)",
" ",
);
REACT_NATIVE_PATH = "${PODS_ROOT}/../../../../node_modules/react-native";
SDKROOT = iphoneos;
USE_HERMES = true;
Expand Down Expand Up @@ -552,7 +555,10 @@
);
LIBRARY_SEARCH_PATHS = "$(SDKROOT)/usr/lib/swift\"$(inherited)\"";
MTL_ENABLE_DEBUG_INFO = NO;
OTHER_LDFLAGS = "$(inherited) ";
OTHER_LDFLAGS = (
"$(inherited)",
" ",
);
REACT_NATIVE_PATH = "${PODS_ROOT}/../../../../node_modules/react-native";
SDKROOT = iphoneos;
USE_HERMES = true;
Expand Down
3 changes: 2 additions & 1 deletion packages/react-native/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
"dependencies": {
"@op-engineering/op-sqlite": "^7.1.0",
"@powersync/common": "workspace:*",
"@powersync/react": "workspace:*"
"@powersync/react": "workspace:*",
"async-mutex": "^0.5.0"
},
"devDependencies": {
"@craftzdog/react-native-buffer": "^6.0.5",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { DB, SQLBatchTuple } from '@op-engineering/op-sqlite';
import { BaseObserver, DBAdapterListener, QueryResult, RowUpdateType } from '@powersync/common';

export type OPSQLiteConnectionOptions = {
baseDB: DB;
};

export class OPSQLiteConnection extends BaseObserver<DBAdapterListener> {
protected DB: DB;
constructor(protected options: OPSQLiteConnectionOptions) {
super();
this.DB = options.baseDB;

// link table update commands
this.DB.updateHook((update) => {
this.iterateListeners((cb) => {
let opType: RowUpdateType;
switch (update.operation) {
case 'INSERT':
opType = RowUpdateType.SQLITE_INSERT;
break;
case 'DELETE':
opType = RowUpdateType.SQLITE_DELETE;
break;
case 'UPDATE':
opType = RowUpdateType.SQLITE_UPDATE;
break;
}
cb.tablesUpdated?.({ table: update.table, opType, rowId: update.rowId });
});
});
}

close() {
return this.DB.close();
}

execute(query: string, params?: any[]): Promise<QueryResult> {
return this.DB.executeAsync(query, params);
}

async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
const tuple: SQLBatchTuple[] = [[query, params[0]]];
params.slice(1).forEach((p) => tuple.push([query, p]));

const result = await this.DB.executeBatchAsync(tuple);
return {
rowsAffected: result.rowsAffected ?? 0
};
}

async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
const result = await this.DB.executeAsync(sql, parameters);
return result.rows?._array ?? [];
}

async getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
const result = await this.DB.executeAsync(sql, parameters);
return result.rows?._array?.[0] ?? null;
}

async get<T>(sql: string, parameters?: any[]): Promise<T> {
const result = await this.getOptional(sql, parameters);
if (!result) {
// TODO more consistent error
throw new Error(`No row returned for [get] query`);
}
return result as T;
}
}
144 changes: 144 additions & 0 deletions packages/react-native/src/db/adapters/op-sqlite/OPSQliteAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common';
import Lock from 'async-lock';
import { OPSQLiteConnection } from './OPSQLiteConnection';

/**
* Adapter for React Native Quick SQLite
*/
export type OPSQliteAdapterOptions = {
writeConnection: OPSQLiteConnection;
readConnections: OPSQLiteConnection[];
name: string;
};

enum LockType {
READ = 'read',
WRITE = 'write'
}

export class OPSQliteDBAdapter extends BaseObserver<DBAdapterListener> implements DBAdapter {
name: string;
protected locks: Lock;
constructor(protected options: OPSQliteAdapterOptions) {
super();
this.name = this.options.name;
// Changes should only occur in the write connection
options.writeConnection.registerListener({
tablesUpdated: (notification) => this.iterateListeners((cb) => cb.tablesUpdated?.(notification))
});
this.locks = new Lock();
}

close() {
this.options.writeConnection.close();
this.options.readConnections.forEach((c) => c.close());
}

async readLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
// TODO better
const sortedConnections = this.options.readConnections
.map((connection, index) => ({ lockKey: `${LockType.READ}-${index}`, connection }))
.sort((a, b) => {
const aBusy = this.locks.isBusy(a.lockKey);
const bBusy = this.locks.isBusy(b.lockKey);
// Sort by ones which are not busy
return aBusy > bBusy ? 1 : 0;
});

return new Promise(async (resolve, reject) => {
try {
await this.locks.acquire(
sortedConnections[0].lockKey,
async () => {
resolve(await fn(sortedConnections[0].connection));
},
{ timeout: options?.timeoutMs }
);
} catch (ex) {
reject(ex);
}
});
}

writeLock<T>(fn: (tx: OPSQLiteConnection) => Promise<T>, options?: DBLockOptions): Promise<T> {
return new Promise(async (resolve, reject) => {
try {
await this.locks.acquire(
LockType.WRITE,
async () => {
resolve(await fn(this.options.writeConnection));
},
{ timeout: options?.timeoutMs }
);
} catch (ex) {
reject(ex);
}
});
}

readTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.readLock((ctx) => this.internalTransaction(ctx, fn));
}

writeTransaction<T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions): Promise<T> {
return this.writeLock((ctx) => this.internalTransaction(ctx, fn));
}

getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
return this.readLock((ctx) => ctx.getAll(sql, parameters));
}

getOptional<T>(sql: string, parameters?: any[]): Promise<T | null> {
return this.readLock((ctx) => ctx.getOptional(sql, parameters));
}

get<T>(sql: string, parameters?: any[]): Promise<T> {
return this.readLock((ctx) => ctx.get(sql, parameters));
}

execute(query: string, params?: any[]) {
return this.writeLock((ctx) => ctx.execute(query, params));
}

async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
return this.writeLock((ctx) => ctx.executeBatch(query, params));
}

protected async internalTransaction<T>(
connection: OPSQLiteConnection,
fn: (tx: Transaction) => Promise<T>
): Promise<T> {
let finalized = false;
const commit = async (): Promise<QueryResult> => {
if (finalized) {
return { rowsAffected: 0 };
}
finalized = true;
return connection.execute('COMMIT');
};
const rollback = async (): Promise<QueryResult> => {
if (finalized) {
return { rowsAffected: 0 };
}
finalized = true;
return connection.execute('ROLLBACK');
};
try {
await connection.execute('BEGIN');
const result = await fn({
execute: (query, params) => connection.execute(query, params),
get: (query, params) => connection.get(query, params),
getAll: (query, params) => connection.getAll(query, params),
getOptional: (query, params) => connection.getOptional(query, params),
commit,
rollback
});
await commit();
return result;
} catch (ex) {
await rollback();
}

return undefined as T;
}
}
Loading

0 comments on commit 62aa5e6

Please sign in to comment.