Skip to content

Commit

Permalink
Fix ponyfills + Merge branch 'main' into poc-bundle-christiaan-steven
Browse files Browse the repository at this point in the history
  • Loading branch information
Chriztiaan committed Jul 17, 2024
2 parents d9ccb77 + c4a31ca commit 58dbde7
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 76 deletions.
5 changes: 5 additions & 0 deletions .changeset/happy-gifts-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'diagnostics-app': minor
---

Faster initial sync and other fixes
5 changes: 5 additions & 0 deletions .changeset/real-bottles-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/web': patch
---

Fix read statements not using the transaction lock
2 changes: 1 addition & 1 deletion packages/common/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export default (commandLineArgs) => {
commonjs({}),
inject({
Buffer: ['buffer', 'Buffer'],
ReadableStream: ['web-streams-polyfill/ponyfill/es2018', 'ReadableStream'],
ReadableStream: ['web-streams-polyfill/ponyfill', 'ReadableStream'],
// Used by can-ndjson-stream
TextDecoder: ['text-encoding', 'TextDecoder']
})
Expand Down
2 changes: 0 additions & 2 deletions packages/kysely-driver/tests/setup/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,3 @@ export const getPowerSyncDb = () => {

return database;
};

export const getKyselyDb = wrapPowerSyncWithKysely<Database>(getPowerSyncDb());
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ describe('PowerSyncConnection', () => {

it('should execute a select query using getAll from the table', async () => {
await powerSyncDb.execute('INSERT INTO users (id, name) VALUES(uuid(), ?)', ['John']);

const getAllSpy = vi.spyOn(powerSyncDb, 'getAll');

const compiledQuery: CompiledQuery = {
Expand All @@ -35,7 +34,7 @@ describe('PowerSyncConnection', () => {

expect(rows.length).toEqual(1);
expect(rows[0].name).toEqual('John');
expect(getAllSpy).toHaveBeenCalledTimes(1);
expect(getAllSpy).toHaveBeenCalledWith('SELECT * From users', []);
});

it('should insert to the table', async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/react-native/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export default (commandLineArgs) => {
commonjs({}),
inject({
Buffer: ['@craftzdog/react-native-buffer', 'Buffer'],
ReadableStream: ['web-streams-polyfill/ponyfill/es2018', 'ReadableStream'],
ReadableStream: ['web-streams-polyfill/ponyfill', 'ReadableStream'],
TextEncoder: ['text-encoding', 'TextEncoder'],
TextDecoder: ['text-encoding', 'TextDecoder']
}),
Expand Down
4 changes: 3 additions & 1 deletion packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
this.dbGetHelpers = null;
this.methods = null;
this.initialized = this.init();
this.dbGetHelpers = this.generateDBHelpers({ execute: this._execute.bind(this) });
this.dbGetHelpers = this.generateDBHelpers({
execute: (query, params) => this.acquireLock(() => this._execute(query, params))
});
}

get name() {
Expand Down
19 changes: 12 additions & 7 deletions packages/web/src/shared/open-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as SQLite from '@journeyapps/wa-sqlite';
import '@journeyapps/wa-sqlite';
import * as Comlink from 'comlink';
import type { DBFunctionsInterface, OnTableChangeCallback, WASQLExecuteResult } from './types';
import { Mutex } from 'async-mutex';

let nextId = 1;

Expand All @@ -18,6 +19,7 @@ export async function _openDB(
sqlite3.vfs_register(vfs, true);

const db = await sqlite3.open_v2(dbFileName);
const statementMutex = new Mutex();

/**
* Listeners are exclusive to the DB connection.
Expand All @@ -40,10 +42,10 @@ export async function _openDB(

/**
* This requests a lock for executing statements.
* Should only be used interanlly.
* Should only be used internally.
*/
const _acquireExecuteLock = (callback: () => Promise<any>): Promise<any> => {
return navigator.locks.request(`db-execute-${dbFileName}`, callback);
const _acquireExecuteLock = <T>(callback: () => Promise<T>): Promise<T> => {
return statementMutex.runExclusive(callback);
};

/**
Expand Down Expand Up @@ -115,7 +117,7 @@ export async function _openDB(
* This executes SQL statements in a batch.
*/
const executeBatch = async (sql: string, bindings?: any[][]): Promise<WASQLExecuteResult> => {
return _acquireExecuteLock(async () => {
return _acquireExecuteLock(async (): Promise<WASQLExecuteResult> => {
let affectedRows = 0;

const str = sqlite3.str_new(db, sql);
Expand All @@ -127,7 +129,8 @@ export async function _openDB(
const prepared = await sqlite3.prepare_v2(db, query);
if (prepared === null) {
return {
rowsAffected: 0
rowsAffected: 0,
rows: { _array: [], length: 0 }
};
}
const wrappedBindings = bindings ? bindings : [];
Expand Down Expand Up @@ -158,13 +161,15 @@ export async function _openDB(
} catch (err) {
await executeSingleStatement('ROLLBACK');
return {
rowsAffected: 0
rowsAffected: 0,
rows: { _array: [], length: 0 }
};
} finally {
sqlite3.str_finish(str);
}
const result = {
rowsAffected: affectedRows
rowsAffected: affectedRows,
rows: { _array: [], length: 0 }
};

return result;
Expand Down
22 changes: 22 additions & 0 deletions packages/web/tests/crud.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AbstractPowerSyncDatabase, Column, ColumnType, CrudEntry, Schema, Table
import { PowerSyncDatabase } from '@powersync/web';
import { v4 as uuid } from 'uuid';
import { generateTestDb } from './utils/testDb';
import pDefer from 'p-defer';

const testId = '2290de4f-0488-4e50-abed-f8e8eb1d0b42';

Expand Down Expand Up @@ -289,4 +290,25 @@ describe('CRUD Tests', () => {
await tx2.complete();
expect(await powersync.getNextCrudTransaction()).equals(null);
});

it('Transaction exclusivity', async () => {
const outside = pDefer();
const inTx = pDefer();

const txPromise = powersync.writeTransaction(async (tx) => {
await tx.execute('INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test1']);
inTx.resolve();
await outside.promise;
await tx.rollback();
});

await inTx.promise;

const r = powersync.getOptional<any>('SELECT * FROM assets WHERE id = ?', [testId]);
await new Promise((resolve) => setTimeout(resolve, 10));
outside.resolve();

await txPromise;
expect(await r).toEqual(null);
});
});
159 changes: 100 additions & 59 deletions tools/diagnostics-app/src/app/views/sync-diagnostics.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { NavigationPage } from '@/components/navigation/NavigationPage';
import { clearData, syncErrorTracker } from '@/library/powersync/ConnectionManager';
import { clearData, db, syncErrorTracker } from '@/library/powersync/ConnectionManager';
import {
Box,
Button,
Expand All @@ -15,7 +15,6 @@ import {
styled
} from '@mui/material';
import { DataGrid, GridColDef } from '@mui/x-data-grid';
import { useQuery } from '@powersync/react';
import React from 'react';

const BUCKETS_QUERY = `
Expand All @@ -24,9 +23,9 @@ WITH
(SELECT
bucket,
row_type,
sum(length(data)) as data_size,
sum(case when op = 3 and superseded = 0 then length(data) else 0 end) as data_size,
sum(length(row_type) + length(row_id) + length(bucket) + length(key) + 40) as metadata_size,
count() as row_count
sum(case when op = 3 and superseded = 0 then 1 else 0 end) as row_count
FROM ps_oplog GROUP BY bucket, row_type),
oplog_stats AS
Expand All @@ -51,23 +50,65 @@ FROM local_bucket_data local
LEFT JOIN oplog_stats stats ON stats.name = local.id`;

const TABLES_QUERY = `
SELECT row_type as name, count() as count, sum(length(data)) as size FROM ps_oplog GROUP BY row_type
SELECT row_type as name, count() as count, sum(length(data)) as size FROM ps_oplog WHERE superseded = 0 and op = 3 GROUP BY row_type
`;

export default function SyncDiagnosticsPage() {
const { data: bucketRows, isLoading: bucketRowsLoading } = useQuery(BUCKETS_QUERY, undefined, {
rawTableNames: true,
tables: ['ps_oplog', 'ps_data_local__local_bucket_data'],
throttleMs: 500
});
const { data: tableRows, isLoading: tableRowsLoading } = useQuery(TABLES_QUERY, undefined, {
rawTableNames: true,
tables: ['ps_oplog', 'ps_data_local__local_bucket_data'],
throttleMs: 500
});
const BUCKETS_QUERY_FAST = `
SELECT
local.id as name,
'[]' as tables,
0 as data_size,
0 as metadata_size,
0 as row_count,
local.download_size,
local.total_operations,
local.downloading
FROM local_bucket_data local`;

export default function SyncDiagnosticsPage() {
const [bucketRows, setBucketRows] = React.useState<null | any[]>(null);
const [tableRows, setTableRows] = React.useState<null | any[]>(null);
const [syncError, setSyncError] = React.useState<Error | null>(syncErrorTracker.lastSyncError);

const bucketRowsLoading = bucketRows == null;
const tableRowsLoading = tableRows == null;

const refreshStats = async () => {
// Similar to db.currentState.hasSynced, but synchronized to the onChange events
const hasSynced = await db.getOptional('SELECT 1 FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1');
if (hasSynced != null) {
// These are potentially expensive queries - do not run during initial sync
const bucketRows = await db.getAll(BUCKETS_QUERY);
const tableRows = await db.getAll(TABLES_QUERY);
setBucketRows(bucketRows);
setTableRows(tableRows);
} else {
// Fast query to show progress during initial sync
const bucketRows = await db.getAll(BUCKETS_QUERY_FAST);
setBucketRows(bucketRows);
setTableRows(null);
}
};

React.useEffect(() => {
const controller = new AbortController();

db.onChangeWithCallback(
{
async onChange(event) {
await refreshStats();
}
},
{ rawTableNames: true, tables: ['ps_oplog', 'ps_buckets', 'ps_data_local__local_bucket_data'], throttleMs: 500 }
);

refreshStats();

return () => {
controller.abort();
};
}, []);

React.useEffect(() => {
const l = syncErrorTracker.registerListener({
lastErrorUpdated(error) {
Expand Down Expand Up @@ -111,7 +152,7 @@ export default function SyncDiagnosticsPage() {
}
];

const rows = bucketRows.map((r) => {
const rows = (bucketRows ?? []).map((r) => {
return {
id: r.name,
name: r.name,
Expand Down Expand Up @@ -146,7 +187,7 @@ export default function SyncDiagnosticsPage() {
}
];

const tablesRows = tableRows.map((r) => {
const tablesRows = (tableRows ?? []).map((r) => {
return {
id: r.name,
...r
Expand Down Expand Up @@ -181,50 +222,40 @@ export default function SyncDiagnosticsPage() {
);

const tablesTable = (
<S.QueryResultContainer>
<Typography variant="h4" gutterBottom>
Tables
</Typography>
<DataGrid
autoHeight={true}
rows={tablesRows}
columns={tablesColumns}
initialState={{
pagination: {
paginationModel: {
pageSize: 10
}
<DataGrid
autoHeight={true}
rows={tablesRows}
columns={tablesColumns}
initialState={{
pagination: {
paginationModel: {
pageSize: 10
}
}}
pageSizeOptions={[10, 50, 100]}
disableRowSelectionOnClick
/>
</S.QueryResultContainer>
}
}}
pageSizeOptions={[10, 50, 100]}
disableRowSelectionOnClick
/>
);

const bucketsTable = (
<S.QueryResultContainer>
<Typography variant="h4" gutterBottom>
Buckets
</Typography>
<DataGrid
autoHeight={true}
rows={rows}
columns={columns}
initialState={{
pagination: {
paginationModel: {
pageSize: 50
}
},
sorting: {
sortModel: [{ field: 'total_operations', sort: 'desc' }]
<DataGrid
autoHeight={true}
rows={rows}
columns={columns}
initialState={{
pagination: {
paginationModel: {
pageSize: 50
}
}}
pageSizeOptions={[10, 50, 100]}
disableRowSelectionOnClick
/>
</S.QueryResultContainer>
},
sorting: {
sortModel: [{ field: 'total_operations', sort: 'desc' }]
}
}}
pageSizeOptions={[10, 50, 100]}
disableRowSelectionOnClick
/>
);

return (
Expand All @@ -239,8 +270,18 @@ export default function SyncDiagnosticsPage() {
}}>
Clear & Redownload
</Button>
{tableRowsLoading ? <CircularProgress /> : tablesTable}
{bucketRowsLoading ? <CircularProgress /> : bucketsTable}
<S.QueryResultContainer>
<Typography variant="h4" gutterBottom>
Tables
</Typography>
{tableRowsLoading ? <CircularProgress /> : tablesTable}
</S.QueryResultContainer>
<S.QueryResultContainer>
<Typography variant="h4" gutterBottom>
Buckets
</Typography>
{bucketRowsLoading ? <CircularProgress /> : bucketsTable}
</S.QueryResultContainer>
</S.MainContainer>
</NavigationPage>
);
Expand Down
Loading

0 comments on commit 58dbde7

Please sign in to comment.