From 042589c3eb643b8ef9a8903f9ddd9d54ac2f3260 Mon Sep 17 00:00:00 2001 From: stevensJourney <51082125+stevensJourney@users.noreply.github.com> Date: Tue, 6 Aug 2024 13:10:44 +0200 Subject: [PATCH] [Feature] Add warning if crud transactions are not completed (#254) --- .changeset/mean-carrots-relax.md | 7 +++ .../ios/Podfile.lock | 12 ++-- .../sync/bucket/BucketStorageAdapter.ts | 5 +- .../client/sync/bucket/SqliteBucketStorage.ts | 43 +++++++------ .../AbstractStreamingSyncImplementation.ts | 60 +++++++++++-------- packages/web/tests/multiple_instances.test.ts | 37 ++++++++---- 6 files changed, 103 insertions(+), 61 deletions(-) create mode 100644 .changeset/mean-carrots-relax.md diff --git a/.changeset/mean-carrots-relax.md b/.changeset/mean-carrots-relax.md new file mode 100644 index 00000000..db4203dc --- /dev/null +++ b/.changeset/mean-carrots-relax.md @@ -0,0 +1,7 @@ +--- +'@powersync/common': minor +'@powersync/web': minor +'@powersync/react-native': minor +--- + +Added a warning if connector `uploadData` functions don't process CRUD items completely. diff --git a/demos/react-native-supabase-todolist/ios/Podfile.lock b/demos/react-native-supabase-todolist/ios/Podfile.lock index 3c3959db..6948bd50 100644 --- a/demos/react-native-supabase-todolist/ios/Podfile.lock +++ b/demos/react-native-supabase-todolist/ios/Podfile.lock @@ -21,7 +21,7 @@ PODS: - ExpoModulesCore - ExpoKeepAwake (13.0.2): - ExpoModulesCore - - ExpoModulesCore (1.12.13): + - ExpoModulesCore (1.12.19): - DoubleConversion - glog - hermes-engine @@ -1340,7 +1340,7 @@ DEPENDENCIES: - boost (from `../../../node_modules/react-native/third-party-podspecs/boost.podspec`) - DoubleConversion (from `../../../node_modules/react-native/third-party-podspecs/DoubleConversion.podspec`) - EXConstants (from `../../../node_modules/expo-constants/ios`) - - Expo (from `../node_modules/expo`) + - Expo (from `../../../node_modules/expo`) - ExpoAsset (from `../../../node_modules/expo-asset/ios`) - ExpoCamera (from `../../../node_modules/expo-camera/ios`) - ExpoCrypto (from `../../../node_modules/expo-crypto/ios`) @@ -1348,7 +1348,7 @@ DEPENDENCIES: - ExpoFont (from `../../../node_modules/expo-font/ios`) - ExpoHead (from `../../../node_modules/expo-router/ios`) - ExpoKeepAwake (from `../../../node_modules/expo-keep-awake/ios`) - - ExpoModulesCore (from `../node_modules/expo-modules-core`) + - ExpoModulesCore (from `../../../node_modules/expo-modules-core`) - ExpoSecureStore (from `../../../node_modules/expo-secure-store/ios`) - EXSplashScreen (from `../../../node_modules/expo-splash-screen/ios`) - FBLazyVector (from `../../../node_modules/react-native/Libraries/FBLazyVector`) @@ -1428,7 +1428,7 @@ EXTERNAL SOURCES: EXConstants: :path: "../../../node_modules/expo-constants/ios" Expo: - :path: "../node_modules/expo" + :path: "../../../node_modules/expo" ExpoAsset: :path: "../../../node_modules/expo-asset/ios" ExpoCamera: @@ -1444,7 +1444,7 @@ EXTERNAL SOURCES: ExpoKeepAwake: :path: "../../../node_modules/expo-keep-awake/ios" ExpoModulesCore: - :path: "../node_modules/expo-modules-core" + :path: "../../../node_modules/expo-modules-core" ExpoSecureStore: :path: "../../../node_modules/expo-secure-store/ios" EXSplashScreen: @@ -1583,7 +1583,7 @@ SPEC CHECKSUMS: ExpoFont: e7f2275c10ca8573c991e007329ad6bf98086485 ExpoHead: 8eb4deb289c2fdd8bb624f996cd31414cd07f38a ExpoKeepAwake: 3b8815d9dd1d419ee474df004021c69fdd316d08 - ExpoModulesCore: a4b45b5f081f5fe9b8e87667906d180cd52f32d7 + ExpoModulesCore: 734c1802786b23c9598f4d15273753a779969368 ExpoSecureStore: 060cebcb956b80ddae09821610ac1aa9e1ac74cd EXSplashScreen: fbf0ec78e9cee911df188bf17b4fe51d15a84b87 FBLazyVector: 898d14d17bf19e2435cafd9ea2a1033efe445709 diff --git a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts index a61a1d14..5cea03a2 100644 --- a/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts +++ b/packages/common/src/client/sync/bucket/BucketStorageAdapter.ts @@ -1,7 +1,7 @@ -import { OpId } from './CrudEntry'; +import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver'; import { CrudBatch } from './CrudBatch'; +import { CrudEntry, OpId } from './CrudEntry'; import { SyncDataBatch } from './SyncDataBatch'; -import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver'; export interface Checkpoint { last_op_id: OpId; @@ -62,6 +62,7 @@ export interface BucketStorageAdapter extends BaseObserver; + nextCrudItem(): Promise; hasCrud(): Promise; getCrudBatch(limit?: number): Promise; diff --git a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts index 6763b761..2f52a0de 100644 --- a/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts +++ b/packages/common/src/client/sync/bucket/SqliteBucketStorage.ts @@ -1,5 +1,7 @@ import { Mutex } from 'async-mutex'; +import Logger, { ILogger } from 'js-logger'; import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter'; +import { BaseObserver } from '../../../utils/BaseObserver'; import { BucketState, BucketStorageAdapter, @@ -8,12 +10,10 @@ import { PSInternalTable, SyncLocalDatabaseResult } from './BucketStorageAdapter'; -import { OpTypeEnum } from './OpType'; import { CrudBatch } from './CrudBatch'; -import { CrudEntry } from './CrudEntry'; +import { CrudEntry, CrudEntryJSON } from './CrudEntry'; +import { OpTypeEnum } from './OpType'; import { SyncDataBatch } from './SyncDataBatch'; -import Logger, { ILogger } from 'js-logger'; -import { BaseObserver } from '../../../utils/BaseObserver'; const COMPACT_OPERATION_INTERVAL = 1_000; @@ -51,10 +51,10 @@ export class SqliteBucketStorage extends BaseObserver imp async init() { this._hasCompletedSync = false; - const existingTableRows = await this.db.execute( + const existingTableRows = await this.db.getAll<{ name: string }>( `SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'` ); - for (const row of existingTableRows.rows?._array ?? []) { + for (const row of existingTableRows ?? []) { this.tableNames.add(row.name); } } @@ -72,10 +72,10 @@ export class SqliteBucketStorage extends BaseObserver imp startSession(): void {} async getBucketStates(): Promise { - const result = await this.db.execute( + const result = await this.db.getAll( 'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0' ); - return result.rows?._array ?? []; + return result; } async saveSyncData(batch: SyncDataBatch) { @@ -258,19 +258,20 @@ export class SqliteBucketStorage extends BaseObserver imp } async updateLocalTarget(cb: () => Promise): Promise { - const rs1 = await this.db.execute("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [ + const rs1 = await this.db.getAll("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [ SqliteBucketStorage.MAX_OP_ID ]); - if (!rs1.rows?.length) { + if (!rs1.length) { // Nothing to update return false; } - const rs = await this.db.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"); - if (!rs.rows?.length) { + const rs = await this.db.getAll<{ seq: number }>("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'"); + if (!rs.length) { // Nothing to update return false; } - const seqBefore: number = rs.rows?.item(0)['seq']; + + const seqBefore: number = rs[0]['seq']; const opId = await cb(); @@ -304,9 +305,17 @@ export class SqliteBucketStorage extends BaseObserver imp }); } + async nextCrudItem(): Promise { + const next = await this.db.getOptional('SELECT * FROM ps_crud ORDER BY id ASC LIMIT 1'); + if (!next) { + return; + } + return CrudEntry.fromRow(next); + } + async hasCrud(): Promise { - const anyData = await this.db.execute('SELECT 1 FROM ps_crud LIMIT 1'); - return !!anyData.rows?.length; + const anyData = await this.db.getOptional('SELECT 1 FROM ps_crud LIMIT 1'); + return !!anyData; } /** @@ -318,10 +327,10 @@ export class SqliteBucketStorage extends BaseObserver imp return null; } - const crudResult = await this.db.execute('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]); + const crudResult = await this.db.getAll('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]); const all: CrudEntry[] = []; - for (const row of crudResult.rows?._array ?? []) { + for (const row of crudResult) { all.push(CrudEntry.fromRow(row)); } diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index f44c8ba7..0807c41e 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -2,6 +2,13 @@ import throttle from 'lodash/throttle'; import Logger, { ILogger } from 'js-logger'; +import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus'; +import { AbortOperation } from '../../../utils/AbortOperation'; +import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver'; +import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter'; +import { CrudEntry } from '../bucket/CrudEntry'; +import { SyncDataBucket } from '../bucket/SyncDataBucket'; +import { AbstractRemote, SyncStreamOptions } from './AbstractRemote'; import { BucketRequest, StreamingSyncRequestParameterType, @@ -11,12 +18,6 @@ import { isStreamingSyncCheckpointDiff, isStreamingSyncData } from './streaming-sync-types'; -import { AbstractRemote, SyncStreamOptions } from './AbstractRemote'; -import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter'; -import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus'; -import { SyncDataBucket } from '../bucket/SyncDataBucket'; -import { BaseObserver, BaseListener, Disposable } from '../../../utils/BaseObserver'; -import { AbortOperation } from '../../../utils/AbortOperation'; export enum LockType { CRUD = 'crud', @@ -215,18 +216,40 @@ export abstract class AbstractStreamingSyncImplementation return this.obtainLock({ type: LockType.CRUD, callback: async () => { - this.updateSyncStatus({ - dataFlow: { - uploading: true - } - }); + /** + * Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration. + */ + let checkedCrudItem: CrudEntry | undefined; + while (true) { + this.updateSyncStatus({ + dataFlow: { + uploading: true + } + }); try { - const done = await this.uploadCrudBatch(); - if (done) { + /** + * This is the first item in the FIFO CRUD queue. + */ + const nextCrudItem = await this.options.adapter.nextCrudItem(); + if (nextCrudItem) { + if (nextCrudItem.id == checkedCrudItem?.id) { + // This will force a higher log level than exceptions which are caught here. + this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue. +Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method. +The next upload iteration will be delayed.`); + throw new Error('Delaying due to previously encountered CRUD item.'); + } + + checkedCrudItem = nextCrudItem; + await this.options.uploadCrud(); + } else { + // Uploading is completed + await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint()); break; } } catch (ex) { + checkedCrudItem = undefined; this.updateSyncStatus({ dataFlow: { uploading: false @@ -252,17 +275,6 @@ export abstract class AbstractStreamingSyncImplementation }); } - protected async uploadCrudBatch(): Promise { - const hasCrud = await this.options.adapter.hasCrud(); - if (hasCrud) { - await this.options.uploadCrud(); - return false; - } else { - await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint()); - return true; - } - } - async connect(options?: PowerSyncConnectionOptions) { if (this.abortController) { await this.disconnect(); diff --git a/packages/web/tests/multiple_instances.test.ts b/packages/web/tests/multiple_instances.test.ts index 01d602de..c0501ed4 100644 --- a/packages/web/tests/multiple_instances.test.ts +++ b/packages/web/tests/multiple_instances.test.ts @@ -1,4 +1,3 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { AbstractPowerSyncDatabase, SqliteBucketStorage, SyncStatus } from '@powersync/common'; import { PowerSyncDatabase, @@ -6,10 +5,11 @@ import { WebRemote, WebStreamingSyncImplementationOptions } from '@powersync/web'; -import { testSchema } from './utils/testDb'; -import { TestConnector } from './utils/MockStreamOpenFactory'; import { Mutex } from 'async-mutex'; import Logger from 'js-logger'; +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { TestConnector } from './utils/MockStreamOpenFactory'; +import { testSchema } from './utils/testDb'; describe('Multiple Instances', () => { const dbFilename = 'test-multiple-instances.db'; @@ -23,6 +23,8 @@ describe('Multiple Instances', () => { schema: testSchema }); + beforeAll(() => Logger.useDefaults()); + beforeEach(() => { db = openDatabase(); }); @@ -184,34 +186,44 @@ describe('Multiple Instances', () => { }); // Create the first streaming client - const syncOptions1: WebStreamingSyncImplementationOptions = { + const stream1 = new SharedWebStreamingSyncImplementation({ adapter: new SqliteBucketStorage(db.database, new Mutex()), remote: new WebRemote(connector1), uploadCrud: async () => { triggerUpload1(); connector1.uploadData(db); }, - identifier - }; - const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1); + identifier, + retryDelayMs: 100, + flags: { + broadcastLogs: true + } + }); // Generate the second streaming sync implementation const connector2 = new TestConnector(); - const spy2 = vi.spyOn(connector2, 'uploadData'); + // The second connector will be called first to upload, we don't want it to actually upload + // This will cause the sync uploads to be delayed as the CRUD queue did not change + const spy2 = vi.spyOn(connector2, 'uploadData').mockImplementation(async () => {}); + let triggerUpload2: () => void; const upload2TriggeredPromise = new Promise((resolve) => { triggerUpload2 = resolve; }); - const syncOptions2: WebStreamingSyncImplementationOptions = { + + const stream2 = new SharedWebStreamingSyncImplementation({ adapter: new SqliteBucketStorage(db.database, new Mutex()), remote: new WebRemote(connector1), uploadCrud: async () => { triggerUpload2(); connector2.uploadData(db); }, - identifier - }; - const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2); + identifier, + retryDelayMs: 100, + flags: { + broadcastLogs: true + } + }); // Waits for the stream to be marked as connected const stream2UpdatedPromise = new Promise((resolve, reject) => { @@ -230,6 +242,7 @@ describe('Multiple Instances', () => { // The status in the second stream client should be updated await stream2UpdatedPromise; + expect(stream2.isConnected).true; // Create something with CRUD in it.