Skip to content

Commit

Permalink
fix: crud upload queue monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney committed Sep 20, 2024
1 parent d5f755d commit d21f2eb
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 22 deletions.
7 changes: 7 additions & 0 deletions .changeset/afraid-apples-learn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/common': patch
'@powersync/web': patch
'@powersync/react-native': patch
---

Fixed issue where sequentially mutating the same row multiple times could cause the CRUD upload queue monitoring to think CRUD operations have not been processed correctly by the `BackendConnector` `uploadData` method.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Logger, { ILogger } from 'js-logger';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus.js';
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver.js';
import { throttleLeadingTrailing } from '../../../utils/throttle.js';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter.js';
import { CrudEntry } from '../bucket/CrudEntry.js';
import { SyncDataBucket } from '../bucket/SyncDataBucket.js';
Expand All @@ -16,7 +17,6 @@ import {
isStreamingSyncCheckpointDiff,
isStreamingSyncData
} from './streaming-sync-types.js';
import { throttleLeadingTrailing } from '../../../utils/throttle.js';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -230,7 +230,7 @@ export abstract class AbstractStreamingSyncImplementation
*/
const nextCrudItem = await this.options.adapter.nextCrudItem();
if (nextCrudItem) {
if (nextCrudItem.id == checkedCrudItem?.id) {
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
// This will force a higher log level than exceptions which are caught here.
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
Expand Down
2 changes: 1 addition & 1 deletion packages/web/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import {
import { Mutex } from 'async-mutex';
import { WASQLiteOpenFactory } from './adapters/wa-sqlite/WASQLiteOpenFactory';
import {
ResolvedWebSQLOpenOptions,
DEFAULT_WEB_SQL_FLAGS,
ResolvedWebSQLOpenOptions,
resolveWebSQLFlags,
WebSQLFlags
} from './adapters/web-sql-flags';
Expand Down
60 changes: 42 additions & 18 deletions packages/web/tests/stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,39 @@
import { Schema, TableV2, column } from '@powersync/common';
import { Schema, Table, column } from '@powersync/common';
import { WebPowerSyncOpenFactoryOptions } from '@powersync/web';
import Logger from 'js-logger';
import { v4 as uuid } from 'uuid';
import { beforeAll, describe, expect, it, vi } from 'vitest';
import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory';

type UnwrapPromise<T> = T extends Promise<infer U> ? U : T;

export type ConnectedDatabaseUtils = UnwrapPromise<ReturnType<typeof generateConnectedDatabase>>;
export type GenerateConnectedDatabaseOptions = {
powerSyncOptions: Partial<WebPowerSyncOpenFactoryOptions>;
};

const UPLOAD_TIMEOUT_MS = 3000;

export async function generateConnectedDatabase({ useWebWorker } = { useWebWorker: true }) {
export const DEFAULT_CONNECTED_POWERSYNC_OPTIONS = {
powerSyncOptions: {
dbFilename: 'test-stream-connection.db',
flags: {
enableMultiTabs: false,
useWebWorker: true
},
// Makes tests faster
crudUploadThrottleMs: 0,
schema: new Schema({
users: new Table({ name: column.text })
})
}
};

export async function generateConnectedDatabase(
options: GenerateConnectedDatabaseOptions = DEFAULT_CONNECTED_POWERSYNC_OPTIONS
) {
const { powerSyncOptions } = options;
const { powerSyncOptions: defaultPowerSyncOptions } = DEFAULT_CONNECTED_POWERSYNC_OPTIONS;
/**
* Very basic implementation of a listener pattern.
* Required since we cannot extend multiple classes.
Expand All @@ -16,24 +43,14 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
const uploadSpy = vi.spyOn(connector, 'uploadData');
const remote = new MockRemote(connector, () => callbacks.forEach((c) => c()));

const users = new TableV2({
name: column.text
});

const schema = new Schema({
users
});

const factory = new MockStreamOpenFactory(
{
dbFilename: 'test-stream-connection.db',
...defaultPowerSyncOptions,
...powerSyncOptions,
flags: {
enableMultiTabs: false,
useWebWorker
},
// Makes tests faster
crudUploadThrottleMs: 0,
schema
...(defaultPowerSyncOptions.flags ?? {}),
...(powerSyncOptions.flags ?? {})
}
},
remote
);
Expand Down Expand Up @@ -83,7 +100,14 @@ describe('Streaming', () => {
test: (createConnectedDatabase: () => ReturnType<typeof generateConnectedDatabase>) => Promise<void>
) => {
const funcWithWebWorker = generateConnectedDatabase;
const funcWithoutWebWorker = () => generateConnectedDatabase({ useWebWorker: false });
const funcWithoutWebWorker = () =>
generateConnectedDatabase({
powerSyncOptions: {
flags: {
useWebWorker: false
}
}
});

it(`${name} - with web worker`, () => test(funcWithWebWorker));
it(`${name} - without web worker`, () => test(funcWithoutWebWorker));
Expand Down
116 changes: 116 additions & 0 deletions packages/web/tests/uploads.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import Logger from 'js-logger';
import p from 'p-defer';
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
import { ConnectedDatabaseUtils, generateConnectedDatabase } from './stream.test';

describe('CRUD Uploads', () => {
let connectedUtils: ConnectedDatabaseUtils;
const logger = Logger.get('crud-logger');

beforeAll(() => Logger.useDefaults());

beforeEach(async () => {
connectedUtils = await generateConnectedDatabase({
powerSyncOptions: {
logger,
/**
* The timeout here is set to longer than the default test timeout
* A retry wil cause tests to fail.
*/
crudUploadThrottleMs: 10_000,
flags: {
enableMultiTabs: false
}
}
});
});

afterEach(async () => {
connectedUtils.remote.streamController?.close();
await connectedUtils.powersync.disconnectAndClear();
await connectedUtils.powersync.close();
});

it('should warn for missing upload operations in uploadData', async () => {
const { powersync, uploadSpy } = connectedUtils;
const loggerSpy = vi.spyOn(logger, 'warn');

const deferred = p();

uploadSpy.mockImplementation(async (db) => {
// This upload method does not perform an upload
deferred.resolve();
});

// Create something with CRUD in it.
await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven']);

// The empty upload handler should have been called
// Timeouts seem to be weird in Vitest Browser mode.
// This makes the check below more stable.
await deferred.promise;

await vi.waitFor(
() => {
expect(
loggerSpy.mock.calls.find((logArgs) =>
logArgs[0].includes('Potentially previously uploaded CRUD entries are still present')
)
).exist;
},
{
timeout: 500
}
);
});

it('should immediately upload sequential transactions', async () => {
const { powersync, uploadSpy } = connectedUtils;
const deferred = p();

uploadSpy.mockImplementation(async (db) => {
// This upload method does not perform an upload
const nextTransaction = await db.getNextCrudTransaction();
console.log('uploading trans', nextTransaction);
if (!nextTransaction) {
return;
}

// Mockingly delete the crud op in order to progress through the CRUD queue
for (const op of nextTransaction.crud) {
await db.execute(`DELETE FROM ps_crud WHERE id = ?`, [op.clientId]);
}

deferred.resolve();
});

// Create the first item
await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven']);

// Modify the first item in a new transaction
await powersync.execute(`
UPDATE
users
SET
name = 'Mugi'
WHERE
name = 'steven'`);

// Create a second item
await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', ['steven2']);

// The empty upload handler should have been called
// Timeouts seem to be weird in Vitest Browser mode.
// This makes the check below more stable.
await deferred.promise;

await vi.waitFor(
() => {
expect(uploadSpy.mock.calls.length).eq(3);
},
{
timeout: 5_000
}
);
});
});
3 changes: 2 additions & 1 deletion packages/web/tests/utils/MockStreamOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,15 @@ export class MockedStreamPowerSync extends PowerSyncDatabase {
connector: PowerSyncBackendConnector
): AbstractStreamingSyncImplementation {
return new WebStreamingSyncImplementation({
logger: this.options.logger,
adapter: this.bucketStorageAdapter,
remote: this.remote,
uploadCrud: async () => {
await this.waitForReady();
await connector.uploadData(this);
},
identifier: this.database.name,
retryDelayMs: 0
retryDelayMs: this.options.crudUploadThrottleMs ?? 0 // The zero here makes tests faster
});
}
}
Expand Down

0 comments on commit d21f2eb

Please sign in to comment.