Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] CRUD upload retries #205

Merged
merged 1 commit into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/famous-teachers-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Fixed CRUD uploads which would not retry after failing until the connection status was updated. A failed CRUD operation should not change the status to `connected: false`.
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,18 @@ export abstract class AbstractStreamingSyncImplementation
}
} catch (ex) {
this.updateSyncStatus({
connected: false,
dataFlow: {
uploading: false
}
});
await this.delayRetry();
break;
if (!this.isConnected) {
// Exit the upload loop if the sync stream is no longer connected
break;
}
this.logger.debug(
`Caught exception when uploading. Upload will retry after a delay. Exception: ${ex.message}`
);
} finally {
this.updateSyncStatus({
dataFlow: {
Expand Down
195 changes: 162 additions & 33 deletions packages/web/tests/stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import _ from 'lodash';
import Logger from 'js-logger';
import { beforeAll, describe, expect, it } from 'vitest';
import { beforeAll, describe, expect, it, vi } from 'vitest';
import { v4 as uuid } from 'uuid';
import { AbstractPowerSyncDatabase, Schema, SyncStatusOptions, TableV2, column } from '@powersync/common';
import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory';

const UPLOAD_TIMEOUT_MS = 3000;

export async function waitForConnectionStatus(
db: AbstractPowerSyncDatabase,
statusCheck: SyncStatusOptions = { connected: true }
Expand All @@ -30,7 +32,9 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
* Required since we cannot extend multiple classes.
*/
const callbacks: Map<string, () => void> = new Map();
const remote = new MockRemote(new TestConnector(), () => callbacks.forEach((c) => c()));
const connector = new TestConnector();
const uploadSpy = vi.spyOn(connector, 'uploadData');
const remote = new MockRemote(connector, () => callbacks.forEach((c) => c()));

const users = new TableV2({
name: column.text
Expand All @@ -47,6 +51,8 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
enableMultiTabs: false,
useWebWorker
},
// Makes tests faster
crudUploadThrottleMs: 0,
schema
},
remote
Expand All @@ -62,69 +68,192 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
});
});

const streamOpened = waitForStream();
const connect = async () => {
const streamOpened = waitForStream();

const connectedPromise = powersync.connect(connector);

const connectedPromise = powersync.connect(new TestConnector());
await streamOpened;

await streamOpened;
remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n'));

remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n'));
// Wait for connected to be true
await connectedPromise;
};

// Wait for connected to be true
await connectedPromise;
await connect();

return {
connector,
connect,
factory,
powersync,
remote,
uploadSpy,
waitForStream
};
}

describe('Stream test', () => {
describe('Streaming', () => {
/**
* Declares a test to be executed with different generated db functions
*/
const itWithGenerators = async (name: string, test: (func: () => any) => Promise<void>) => {
const itWithGenerators = async (
name: string,
test: (createConnectedDatabase: () => ReturnType<typeof generateConnectedDatabase>) => Promise<void>
) => {
const funcWithWebWorker = generateConnectedDatabase;
const funcWithoutWebWorker = () => generateConnectedDatabase({ useWebWorker: false });

it(`${name} - with web worker`, () => test(funcWithWebWorker));
it(`${name} - without web worker`, () => test(funcWithoutWebWorker));
};

describe('With Web Worker', () => {
beforeAll(() => Logger.useDefaults());
beforeAll(() => Logger.useDefaults());

itWithGenerators('PowerSync reconnect on closed stream', async (createConnectedDatabase) => {
const { powersync, waitForStream, remote } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

// Close the stream
const newStream = waitForStream();
remote.streamController?.close();

// A new stream should be requested
await newStream;

await powersync.disconnectAndClear();
await powersync.close();
});

itWithGenerators('PowerSync reconnect multiple connect calls', async (createConnectedDatabase) => {
// This initially performs a connect call
const { powersync, waitForStream } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

// Call connect again, a new stream should be requested
const newStream = waitForStream();
powersync.connect(new TestConnector());

itWithGenerators('PowerSync reconnect on closed stream', async (func) => {
const { powersync, waitForStream, remote } = await func();
expect(powersync.connected).toBe(true);
// A new stream should be requested
await newStream;

// Close the stream
const newStream = waitForStream();
remote.streamController?.close();
await powersync.disconnectAndClear();
await powersync.close();
});

itWithGenerators('Should trigger upload connector when connected', async (createConnectedDatabase) => {
const { powersync, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

// do something which should trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);
// It should try and upload
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing the first check
expect(uploadSpy.mock.calls.length).equals(1);
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

// A new stream should be requested
await newStream;
await powersync.disconnectAndClear();
await powersync.close();
});

itWithGenerators('Should retry failed uploads when connected', async (createConnectedDatabase) => {
const { powersync, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

await powersync.disconnectAndClear();
await powersync.close();
let uploadCounter = 0;
// This test will throw an exception a few times before uploading
const throwCounter = 2;
uploadSpy.mockImplementation(async (db) => {
if (uploadCounter++ < throwCounter) {
throw new Error('No uploads yet');
}
// Now actually do the upload
const tx = await db.getNextCrudTransaction();
await tx?.complete();
});

itWithGenerators('PowerSync reconnect multiple connect calls', async (func) => {
// This initially performs a connect call
const { powersync, waitForStream } = await func();
expect(powersync.connected).toBe(true);
// do something which should trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);

// Call connect again, a new stream should be requested
const newStream = waitForStream();
powersync.connect(new TestConnector());
// It should try and upload
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing a check
expect(uploadSpy.mock.calls.length).equals(throwCounter + 1);
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

// A new stream should be requested
await newStream;
await powersync.disconnectAndClear();
await powersync.close();
});

itWithGenerators('Should upload after reconnecting', async (createConnectedDatabase) => {
const { connect, powersync, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

await powersync.disconnect();

// do something (offline) which should trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);

await connect();

// It should try and upload
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing a check
expect(uploadSpy.mock.calls.length).equals(1);
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

await powersync.disconnectAndClear();
await powersync.close();
});

await powersync.disconnectAndClear();
await powersync.close();
itWithGenerators('Should update status when uploading', async (createConnectedDatabase) => {
const { powersync, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

let uploadStartedPromise = new Promise<void>((resolve) => {
uploadSpy.mockImplementation(async (db) => {
resolve();
// Now actually do the upload
const tx = await db.getNextCrudTransaction();
await tx?.complete();
});
});

// do something which should trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);

await uploadStartedPromise;
expect(powersync.currentStatus.dataFlowStatus.uploading).true;

// Status should update after uploads are completed
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing a check
expect(powersync.currentStatus.dataFlowStatus.uploading).false;
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

await powersync.disconnectAndClear();
await powersync.close();
});
});
15 changes: 12 additions & 3 deletions packages/web/tests/utils/MockStreamOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export class TestConnector implements PowerSyncBackendConnector {
};
}
async uploadData(database: AbstractPowerSyncDatabase): Promise<void> {
return;
const tx = await database.getNextCrudTransaction();
await tx?.complete();
}
}

Expand All @@ -49,8 +50,16 @@ export class MockRemote extends AbstractRemote {
post(path: string, data: any, headers?: Record<string, string> | undefined): Promise<any> {
throw new Error('Method not implemented.');
}
get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
throw new Error('Method not implemented.');
async get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
// mock a response for write checkpoint API
if (path.includes('checkpoint')) {
return {
data: {
write_checkpoint: '1000'
}
};
}
throw new Error('Not implemented');
}
async postStreaming(
path: string,
Expand Down
Loading