diff --git a/.changeset/afraid-apples-learn.md b/.changeset/afraid-apples-learn.md new file mode 100644 index 00000000..7e318c7a --- /dev/null +++ b/.changeset/afraid-apples-learn.md @@ -0,0 +1,7 @@ +--- +'@powersync/common': patch +'@powersync/web': patch +'@powersync/react-native': patch +--- + +Fixed issue where sequentially mutating the same row multiple times could cause the CRUD upload queue monitoring to think CRUD operations have not been processed correctly by the `BackendConnector` `uploadData` method. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 658fc61d..40553d45 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -3,6 +3,7 @@ import Logger, { ILogger } from 'js-logger'; import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js'; import { AbortOperation } from '../../../utils/AbortOperation.js'; import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js'; +import { throttleLeadingTrailing } from '../../../utils/throttle.js'; import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js'; import { CrudEntry } from '../bucket/CrudEntry.js'; import { SyncDataBucket } from '../bucket/SyncDataBucket.js'; @@ -16,7 +17,6 @@ import { isStreamingSyncCheckpointDiff, isStreamingSyncData } from './streaming-sync-types.js'; -import { throttleLeadingTrailing } from '../../../utils/throttle.js'; export enum LockType { CRUD = 'crud', @@ -230,7 +230,7 @@ export abstract class AbstractStreamingSyncImplementation */ const nextCrudItem = await this.options.adapter.nextCrudItem(); if (nextCrudItem) { - if (nextCrudItem.id == checkedCrudItem?.id) { + if (nextCrudItem.clientId == checkedCrudItem?.clientId) { // This will force a higher log level than exceptions which are caught here. this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue. Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method. diff --git a/packages/web/src/db/PowerSyncDatabase.ts b/packages/web/src/db/PowerSyncDatabase.ts index 243ff49f..8ac28000 100644 --- a/packages/web/src/db/PowerSyncDatabase.ts +++ b/packages/web/src/db/PowerSyncDatabase.ts @@ -16,8 +16,8 @@ import { import { Mutex } from 'async-mutex'; import { WASQLiteOpenFactory } from './adapters/wa-sqlite/WASQLiteOpenFactory'; import { - ResolvedWebSQLOpenOptions, DEFAULT_WEB_SQL_FLAGS, + ResolvedWebSQLOpenOptions, resolveWebSQLFlags, WebSQLFlags } from './adapters/web-sql-flags'; diff --git a/packages/web/tests/stream.test.ts b/packages/web/tests/stream.test.ts index f8119d41..1d195d21 100644 --- a/packages/web/tests/stream.test.ts +++ b/packages/web/tests/stream.test.ts @@ -1,12 +1,39 @@ -import { Schema, TableV2, column } from '@powersync/common'; +import { Schema, Table, column } from '@powersync/common'; +import { WebPowerSyncOpenFactoryOptions } from '@powersync/web'; import Logger from 'js-logger'; import { v4 as uuid } from 'uuid'; import { beforeAll, describe, expect, it, vi } from 'vitest'; import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory'; +type UnwrapPromise = T extends Promise ? U : T; + +export type ConnectedDatabaseUtils = UnwrapPromise>; +export type GenerateConnectedDatabaseOptions = { + powerSyncOptions: Partial; +}; + const UPLOAD_TIMEOUT_MS = 3000; -export async function generateConnectedDatabase({ useWebWorker } = { useWebWorker: true }) { +export const DEFAULT_CONNECTED_POWERSYNC_OPTIONS = { + powerSyncOptions: { + dbFilename: 'test-stream-connection.db', + flags: { + enableMultiTabs: false, + useWebWorker: true + }, + // Makes tests faster + crudUploadThrottleMs: 0, + schema: new Schema({ + users: new Table({ name: column.text }) + }) + } +}; + +export async function generateConnectedDatabase( + options: GenerateConnectedDatabaseOptions = DEFAULT_CONNECTED_POWERSYNC_OPTIONS +) { + const { powerSyncOptions } = options; + const { powerSyncOptions: defaultPowerSyncOptions } = DEFAULT_CONNECTED_POWERSYNC_OPTIONS; /** * Very basic implementation of a listener pattern. * Required since we cannot extend multiple classes. @@ -16,24 +43,14 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke const uploadSpy = vi.spyOn(connector, 'uploadData'); const remote = new MockRemote(connector, () => callbacks.forEach((c) => c())); - const users = new TableV2({ - name: column.text - }); - - const schema = new Schema({ - users - }); - const factory = new MockStreamOpenFactory( { - dbFilename: 'test-stream-connection.db', + ...defaultPowerSyncOptions, + ...powerSyncOptions, flags: { - enableMultiTabs: false, - useWebWorker - }, - // Makes tests faster - crudUploadThrottleMs: 0, - schema + ...(defaultPowerSyncOptions.flags ?? {}), + ...(powerSyncOptions.flags ?? {}) + } }, remote ); @@ -83,7 +100,14 @@ describe('Streaming', () => { test: (createConnectedDatabase: () => ReturnType) => Promise ) => { const funcWithWebWorker = generateConnectedDatabase; - const funcWithoutWebWorker = () => generateConnectedDatabase({ useWebWorker: false }); + const funcWithoutWebWorker = () => + generateConnectedDatabase({ + powerSyncOptions: { + flags: { + useWebWorker: false + } + } + }); it(`${name} - with web worker`, () => test(funcWithWebWorker)); it(`${name} - without web worker`, () => test(funcWithoutWebWorker)); diff --git a/packages/web/tests/uploads.test.ts b/packages/web/tests/uploads.test.ts new file mode 100644 index 00000000..c61207e2 --- /dev/null +++ b/packages/web/tests/uploads.test.ts @@ -0,0 +1,116 @@ +import Logger from 'js-logger'; +import p from 'p-defer'; +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { ConnectedDatabaseUtils, generateConnectedDatabase } from './stream.test'; + +describe('CRUD Uploads', () => { + let connectedUtils: ConnectedDatabaseUtils; + const logger = Logger.get('crud-logger'); + + beforeAll(() => Logger.useDefaults()); + + beforeEach(async () => { + connectedUtils = await generateConnectedDatabase({ + powerSyncOptions: { + logger, + /** + * The timeout here is set to longer than the default test timeout + * A retry wil cause tests to fail. + */ + crudUploadThrottleMs: 10_000, + flags: { + enableMultiTabs: false + } + } + }); + }); + + afterEach(async () => { + connectedUtils.remote.streamController?.close(); + await connectedUtils.powersync.disconnectAndClear(); + await connectedUtils.powersync.close(); + }); + + it('should warn for missing upload operations in uploadData', async () => { + const { powersync, uploadSpy } = connectedUtils; + const loggerSpy = vi.spyOn(logger, 'warn'); + + const deferred = p(); + + uploadSpy.mockImplementation(async (db) => { + // This upload method does not perform an upload + deferred.resolve(); + }); + + // Create something with CRUD in it. + await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven']); + + // The empty upload handler should have been called + // Timeouts seem to be weird in Vitest Browser mode. + // This makes the check below more stable. + await deferred.promise; + + await vi.waitFor( + () => { + expect( + loggerSpy.mock.calls.find((logArgs) => + logArgs[0].includes('Potentially previously uploaded CRUD entries are still present') + ) + ).exist; + }, + { + timeout: 500 + } + ); + }); + + it('should immediately upload sequential transactions', async () => { + const { powersync, uploadSpy } = connectedUtils; + const deferred = p(); + + uploadSpy.mockImplementation(async (db) => { + // This upload method does not perform an upload + const nextTransaction = await db.getNextCrudTransaction(); + console.log('uploading trans', nextTransaction); + if (!nextTransaction) { + return; + } + + // Mockingly delete the crud op in order to progress through the CRUD queue + for (const op of nextTransaction.crud) { + await db.execute(`DELETE FROM ps_crud WHERE id = ?`, [op.clientId]); + } + + deferred.resolve(); + }); + + // Create the first item + await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven']); + + // Modify the first item in a new transaction + await powersync.execute(` + UPDATE + users + SET + name = 'Mugi' + WHERE + name = 'steven'`); + + // Create a second item + await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven2']); + + // The empty upload handler should have been called + // Timeouts seem to be weird in Vitest Browser mode. + // This makes the check below more stable. + await deferred.promise; + + await vi.waitFor( + () => { + expect(uploadSpy.mock.calls.length).eq(3); + }, + { + timeout: 5_000 + } + ); + }); +}); diff --git a/packages/web/tests/utils/MockStreamOpenFactory.ts b/packages/web/tests/utils/MockStreamOpenFactory.ts index 71a6d026..991bee99 100644 --- a/packages/web/tests/utils/MockStreamOpenFactory.ts +++ b/packages/web/tests/utils/MockStreamOpenFactory.ts @@ -133,6 +133,7 @@ export class MockedStreamPowerSync extends PowerSyncDatabase { connector: PowerSyncBackendConnector ): AbstractStreamingSyncImplementation { return new WebStreamingSyncImplementation({ + logger: this.options.logger, adapter: this.bucketStorageAdapter, remote: this.remote, uploadCrud: async () => { @@ -140,7 +141,7 @@ export class MockedStreamPowerSync extends PowerSyncDatabase { await connector.uploadData(this); }, identifier: this.database.name, - retryDelayMs: 0 + retryDelayMs: this.options.crudUploadThrottleMs ?? 0 // The zero here makes tests faster }); } }