Skip to content

Commit

Permalink
update from main
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Sep 5, 2024
2 parents 62168d2 + c600362 commit eae09fc
Show file tree
Hide file tree
Showing 28 changed files with 346 additions and 79 deletions.
5 changes: 5 additions & 0 deletions .changeset/brown-eggs-dress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Improve performance of MOVE and REMOVE operations.
5 changes: 5 additions & 0 deletions .changeset/cuddly-paws-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/react-native': minor
---

Use react-native-quick-sqlite 1.3.0 / powersync-sqlite-core 0.2.1.
5 changes: 5 additions & 0 deletions .changeset/popular-phones-knock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Always cast `target_op` (write checkpoint) to ensure it's an integer.
5 changes: 5 additions & 0 deletions .changeset/slow-lizards-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Add custom x-user-agent header and client_id parameter to requests.
5 changes: 5 additions & 0 deletions .changeset/small-pants-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/web': minor
---

Use wa-sqlite 0.3.0 / powersync-sqlite-core 0.2.0.
5 changes: 5 additions & 0 deletions .changeset/soft-mice-type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Emit update notifications on `disconnectAndClear()`.
5 changes: 5 additions & 0 deletions .changeset/tiny-worms-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Validate that the powersync-sqlite-core version number is in a compatible range of ^0.2.0.
5 changes: 5 additions & 0 deletions .changeset/two-walls-nail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Persist lastSyncedAt timestamp.
2 changes: 1 addition & 1 deletion demos/django-react-native-todolist/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"dependencies": {
"@azure/core-asynciterator-polyfill": "^1.0.2",
"@expo/vector-icons": "^14.0.0",
"@journeyapps/react-native-quick-sqlite": "^1.1.7",
"@journeyapps/react-native-quick-sqlite": "^1.3.0",
"@powersync/common": "workspace:*",
"@powersync/react": "workspace:*",
"@powersync/react-native": "workspace:*",
Expand Down
2 changes: 1 addition & 1 deletion demos/react-native-supabase-group-chat/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"dependencies": {
"@azure/core-asynciterator-polyfill": "^1.0.2",
"@faker-js/faker": "8.3.1",
"@journeyapps/react-native-quick-sqlite": "^1.1.7",
"@journeyapps/react-native-quick-sqlite": "^1.3.0",
"@powersync/common": "workspace:*",
"@powersync/react": "workspace:*",
"@powersync/react-native": "workspace:*",
Expand Down
2 changes: 1 addition & 1 deletion demos/react-native-supabase-todolist/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"dependencies": {
"@azure/core-asynciterator-polyfill": "^1.0.2",
"@expo/vector-icons": "^14.0.0",
"@journeyapps/react-native-quick-sqlite": "^1.1.7",
"@journeyapps/react-native-quick-sqlite": "^1.3.0",
"@powersync/attachments": "workspace:*",
"@powersync/common": "workspace:*",
"@powersync/react": "workspace:*",
Expand Down
67 changes: 41 additions & 26 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { Schema } from '../db/schema/Schema';
import { BaseObserver } from '../utils/BaseObserver';
import { ControlledExecutor } from '../utils/ControlledExecutor';
import { mutexRunExclusive } from '../utils/mutex';
import { quoteIdentifier } from '../utils/strings';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector';
import { BucketStorageAdapter, PSInternalTable } from './sync/bucket/BucketStorageAdapter';
Expand Down Expand Up @@ -292,21 +291,47 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
protected async initialize() {
await this._initialize();
await this.bucketStorageAdapter.init();
const version = await this.database.execute('SELECT powersync_rs_version()');
this.sdkVersion = version.rows?.item(0)['powersync_rs_version()'] ?? '';
await this._loadVersion();
await this.updateSchema(this.options.schema);
await this.updateHasSynced();
await this.database.execute('PRAGMA RECURSIVE_TRIGGERS=TRUE');
this.ready = true;
this.iterateListeners((cb) => cb.initialized?.());
}

private async _loadVersion() {
try {
const { version } = await this.database.get<{ version: string }>('SELECT powersync_rs_version() as version');
this.sdkVersion = version;
} catch (e) {
throw new Error(`The powersync extension is not loaded correctly. Details: ${e.message}`);
}
let versionInts: number[];
try {
versionInts = this.sdkVersion!.split(/[.\/]/)
.slice(0, 3)
.map((n) => parseInt(n));
} catch (e) {
throw new Error(
`Unsupported powersync extension version. Need ^0.2.0, got: ${this.sdkVersion}. Details: ${e.message}`
);
}

// Validate ^0.2.0
if (versionInts[0] != 0 || versionInts[1] != 2 || versionInts[2] < 0) {
throw new Error(`Unsupported powersync extension version. Need ^0.2.0, got: ${this.sdkVersion}`);
}
}

protected async updateHasSynced() {
const result = await this.database.getOptional('SELECT 1 FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1');
const hasSynced = !!result;
const result = await this.database.get<{ synced_at: string | null }>(
'SELECT powersync_last_synced_at() as synced_at'
);
const hasSynced = result.synced_at != null;
const syncedAt = result.synced_at != null ? new Date(result.synced_at! + 'Z') : undefined;

if (hasSynced != this.currentStatus.hasSynced) {
this.currentStatus = new SyncStatus({ ...this.currentStatus.toJSON(), hasSynced });
this.currentStatus = new SyncStatus({ ...this.currentStatus.toJSON(), hasSynced, lastSyncedAt: syncedAt });
this.iterateListeners((l) => l.statusChanged?.(this.currentStatus));
}
}
Expand Down Expand Up @@ -400,26 +425,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

// TODO DB name, verify this is necessary with extension
await this.database.writeTransaction(async (tx) => {
await tx.execute(`DELETE FROM ${PSInternalTable.OPLOG}`);
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD}`);
await tx.execute(`DELETE FROM ${PSInternalTable.BUCKETS}`);
await tx.execute(`DELETE FROM ${PSInternalTable.UNTYPED}`);

const tableGlob = clearLocal ? 'ps_data_*' : 'ps_data__*';

const existingTableRows = await tx.execute(
`
SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?
`,
[tableGlob]
);

if (!existingTableRows.rows?.length) {
return;
}
for (const row of existingTableRows.rows._array) {
await tx.execute(`DELETE FROM ${quoteIdentifier(row.name)} WHERE 1`);
}
await tx.execute('SELECT powersync_clear(?)', [clearLocal ? 1 : 0]);
});

// The data has been deleted - reset the sync status
Expand Down Expand Up @@ -553,6 +559,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
});
}

/**
* Get an unique client id for this database.
*
* The id is not reset when the database is cleared, only when the database is deleted.
*/
async getClientId(): Promise<string> {
return this.bucketStorageAdapter.getClientId();
}

private async handleCrudCheckpoint(lastClientId: number, writeCheckpoint?: string) {
return this.writeTransaction(async (tx) => {
await tx.execute(`DELETE FROM ${PSInternalTable.CRUD} WHERE id <= ?`, [lastClientId]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,9 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
forceCompact(): Promise<void>;

getMaxOpId(): string;

/**
* Get an unique client id.
*/
getClientId(): Promise<string>;
}
41 changes: 27 additions & 14 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
private pendingBucketDeletes: boolean;
private _hasCompletedSync: boolean;
private updateListener: () => void;
private _clientId?: Promise<string>;

/**
* Count up, and do a compact on startup.
Expand Down Expand Up @@ -62,9 +63,22 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
this.updateListener?.();
}

async _getClientId() {
const row = await this.db.get<{ client_id: string }>('SELECT powersync_client_id() as client_id');
return row['client_id'];
}

getClientId() {
if (this._clientId == null) {
this._clientId = this._getClientId();
}
return this._clientId!;
}

getMaxOpId() {
return MAX_OP_ID;
}

/**
* Reset any caches.
*/
Expand Down Expand Up @@ -103,9 +117,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
*/
private async deleteBucket(bucket: string) {
await this.writeTransaction(async (tx) => {
await tx.execute(
'INSERT INTO powersync_operations(op, data) VALUES(?, ?)',
['delete_bucket', bucket]);
await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', ['delete_bucket', bucket]);
});

this.logger.debug('done deleting bucket');
Expand All @@ -116,8 +128,8 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
if (this._hasCompletedSync) {
return true;
}
const r = await this.db.execute(`SELECT name, last_applied_op FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1`);
const completed = !!r.rows?.length;
const r = await this.db.get<{ synced_at: string | null }>(`SELECT powersync_last_synced_at() as synced_at`);
const completed = r.synced_at != null;
if (completed) {
this._hasCompletedSync = true;
}
Expand Down Expand Up @@ -219,12 +231,7 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
private async deletePendingBuckets() {
if (this.pendingBucketDeletes !== false) {
await this.writeTransaction(async (tx) => {
await tx.execute(
'DELETE FROM ps_oplog WHERE bucket IN (SELECT name FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op)'
);
await tx.execute(
'DELETE FROM ps_buckets WHERE pending_delete = 1 AND last_applied_op = last_op AND last_op >= target_op'
);
await tx.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', ['delete_pending_buckets', '']);
});
// Executed once after start-up, and again when there are pending deletes.
this.pendingBucketDeletes = false;
Expand Down Expand Up @@ -284,7 +291,9 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
return false;
}

const response = await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [opId]);
const response = await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [
opId
]);
this.logger.debug(['[updateLocalTarget] Response from updating target_op ', JSON.stringify(response)]);
return true;
});
Expand Down Expand Up @@ -333,10 +342,14 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
if (writeCheckpoint) {
const crudResult = await tx.execute('SELECT 1 FROM ps_crud LIMIT 1');
if (crudResult.rows?.length) {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [writeCheckpoint]);
await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [
writeCheckpoint
]);
}
} else {
await tx.execute("UPDATE ps_buckets SET target_op = ? WHERE name='$local'", [this.getMaxOpId()]);
await tx.execute("UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'", [
this.getMaxOpId()
]);
}
});
}
Expand Down
19 changes: 17 additions & 2 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import type { BSON } from 'bson';
import { AbortOperation } from '../../../utils/AbortOperation';
import { Buffer } from 'buffer';

import { version as POWERSYNC_JS_VERSION } from '../../../../package.json';

export type BSONImplementation = typeof BSON;

export type RemoteConnector = {
Expand Down Expand Up @@ -109,6 +111,10 @@ export abstract class AbstractRemote {
return this.credentials;
}

getUserAgent() {
return `powersync-js/${POWERSYNC_JS_VERSION}`;
}

protected async buildRequest(path: string) {
const credentials = await this.getCredentials();
if (credentials != null && (credentials.endpoint == null || credentials.endpoint == '')) {
Expand All @@ -119,11 +125,14 @@ export abstract class AbstractRemote {
throw error;
}

const userAgent = this.getUserAgent();

return {
url: credentials.endpoint + path,
headers: {
'content-type': 'application/json',
Authorization: `Token ${credentials.token}`
Authorization: `Token ${credentials.token}`,
'x-user-agent': userAgent
}
};
}
Expand Down Expand Up @@ -207,6 +216,11 @@ export abstract class AbstractRemote {

const bson = await this.getBSON();

// Add the user agent in the setup payload - we can't set custom
// headers with websockets on web. The browser userAgent is however added
// automatically as a header.
const userAgent = this.getUserAgent();

const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url: this.options.socketUrlTransformer(request.url)
Expand All @@ -220,7 +234,8 @@ export abstract class AbstractRemote {
data: null,
metadata: Buffer.from(
bson.serialize({
token: request.headers.Authorization
token: request.headers.Authorization,
user_agent: userAgent
})
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ export abstract class AbstractStreamingSyncImplementation
}

async getWriteCheckpoint(): Promise<string> {
const response = await this.options.remote.get('/write-checkpoint2.json');
const clientId = await this.options.adapter.getClientId();
let path = `/write-checkpoint2.json?client_id=${clientId}`;
const response = await this.options.remote.get(path);
return response['data']['write_checkpoint'] as string;
}

Expand Down Expand Up @@ -456,6 +458,8 @@ The next upload iteration will be delayed.`);

let bucketSet = new Set<string>(initialBuckets.keys());

const clientId = await this.options.adapter.getClientId();

this.logger.debug('Requesting stream from server');

const syncOptions: SyncStreamOptions = {
Expand All @@ -465,7 +469,8 @@ The next upload iteration will be delayed.`);
buckets: req,
include_checksum: true,
raw_data: true,
parameters: resolvedOptions.params
parameters: resolvedOptions.params,
client_id: clientId
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ export interface StreamingSyncRequest {
* Client parameters to be passed to the sync rules.
*/
parameters?: Record<string, StreamingSyncRequestParameterType>;

client_id?: string;
}

export interface StreamingSyncCheckpoint {
Expand Down
3 changes: 1 addition & 2 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export * from './client/sync/bucket/OplogEntry';
export * from './client/sync/stream/AbstractRemote';
export * from './client/sync/stream/AbstractStreamingSyncImplementation';
export * from './client/sync/stream/streaming-sync-types';
export { MAX_OP_ID } from './client/constants'
export { MAX_OP_ID } from './client/constants';

export * from './db/crud/SyncStatus';
export * from './db/crud/UploadQueueStatus';
Expand All @@ -31,7 +31,6 @@ export * from './db/schema/TableV2';

export * from './utils/AbortOperation';
export * from './utils/BaseObserver';
export * from './utils/strings';
export * from './utils/DataStream';
export * from './utils/parseQuery';

Expand Down
Loading

0 comments on commit eae09fc

Please sign in to comment.