diff --git a/.changeset/happy-gifts-sort.md b/.changeset/happy-gifts-sort.md new file mode 100644 index 00000000..208d943b --- /dev/null +++ b/.changeset/happy-gifts-sort.md @@ -0,0 +1,5 @@ +--- +'diagnostics-app': minor +--- + +Faster initial sync and other fixes diff --git a/.changeset/real-bottles-mate.md b/.changeset/real-bottles-mate.md new file mode 100644 index 00000000..377b58c6 --- /dev/null +++ b/.changeset/real-bottles-mate.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': patch +--- + +Fix read statements not using the transaction lock diff --git a/packages/common/rollup.config.mjs b/packages/common/rollup.config.mjs index 8a9bac8c..0112f24d 100644 --- a/packages/common/rollup.config.mjs +++ b/packages/common/rollup.config.mjs @@ -22,7 +22,7 @@ export default (commandLineArgs) => { commonjs({}), inject({ Buffer: ['buffer', 'Buffer'], - ReadableStream: ['web-streams-polyfill/ponyfill/es2018', 'ReadableStream'], + ReadableStream: ['web-streams-polyfill/ponyfill', 'ReadableStream'], // Used by can-ndjson-stream TextDecoder: ['text-encoding', 'TextDecoder'] }) diff --git a/packages/kysely-driver/tests/setup/db.ts b/packages/kysely-driver/tests/setup/db.ts index 1d18c54c..ae021860 100644 --- a/packages/kysely-driver/tests/setup/db.ts +++ b/packages/kysely-driver/tests/setup/db.ts @@ -18,5 +18,3 @@ export const getPowerSyncDb = () => { return database; }; - -export const getKyselyDb = wrapPowerSyncWithKysely(getPowerSyncDb()); diff --git a/packages/kysely-driver/tests/sqlite/sqlite-connection.test.ts b/packages/kysely-driver/tests/sqlite/sqlite-connection.test.ts index ffb78303..cbaed441 100644 --- a/packages/kysely-driver/tests/sqlite/sqlite-connection.test.ts +++ b/packages/kysely-driver/tests/sqlite/sqlite-connection.test.ts @@ -21,7 +21,6 @@ describe('PowerSyncConnection', () => { it('should execute a select query using getAll from the table', async () => { await powerSyncDb.execute('INSERT INTO users (id, name) VALUES(uuid(), ?)', ['John']); - const getAllSpy = vi.spyOn(powerSyncDb, 'getAll'); const compiledQuery: CompiledQuery = { @@ -35,7 +34,7 @@ describe('PowerSyncConnection', () => { expect(rows.length).toEqual(1); expect(rows[0].name).toEqual('John'); - expect(getAllSpy).toHaveBeenCalledTimes(1); + expect(getAllSpy).toHaveBeenCalledWith('SELECT * From users', []); }); it('should insert to the table', async () => { diff --git a/packages/react-native/rollup.config.mjs b/packages/react-native/rollup.config.mjs index da024598..1586d12b 100644 --- a/packages/react-native/rollup.config.mjs +++ b/packages/react-native/rollup.config.mjs @@ -28,7 +28,7 @@ export default (commandLineArgs) => { commonjs({}), inject({ Buffer: ['@craftzdog/react-native-buffer', 'Buffer'], - ReadableStream: ['web-streams-polyfill/ponyfill/es2018', 'ReadableStream'], + ReadableStream: ['web-streams-polyfill/ponyfill', 'ReadableStream'], TextEncoder: ['text-encoding', 'TextEncoder'], TextDecoder: ['text-encoding', 'TextDecoder'] }), diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts index bb1dfb20..7449ad29 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts @@ -46,7 +46,9 @@ export class WASQLiteDBAdapter extends BaseObserver implement this.dbGetHelpers = null; this.methods = null; this.initialized = this.init(); - this.dbGetHelpers = this.generateDBHelpers({ execute: this._execute.bind(this) }); + this.dbGetHelpers = this.generateDBHelpers({ + execute: (query, params) => this.acquireLock(() => this._execute(query, params)) + }); } get name() { diff --git a/packages/web/src/shared/open-db.ts b/packages/web/src/shared/open-db.ts index b570d8db..bf02aa7b 100644 --- a/packages/web/src/shared/open-db.ts +++ b/packages/web/src/shared/open-db.ts @@ -2,6 +2,7 @@ import * as SQLite from '@journeyapps/wa-sqlite'; import '@journeyapps/wa-sqlite'; import * as Comlink from 'comlink'; import type { DBFunctionsInterface, OnTableChangeCallback, WASQLExecuteResult } from './types'; +import { Mutex } from 'async-mutex'; let nextId = 1; @@ -18,6 +19,7 @@ export async function _openDB( sqlite3.vfs_register(vfs, true); const db = await sqlite3.open_v2(dbFileName); + const statementMutex = new Mutex(); /** * Listeners are exclusive to the DB connection. @@ -40,10 +42,10 @@ export async function _openDB( /** * This requests a lock for executing statements. - * Should only be used interanlly. + * Should only be used internally. */ - const _acquireExecuteLock = (callback: () => Promise): Promise => { - return navigator.locks.request(`db-execute-${dbFileName}`, callback); + const _acquireExecuteLock = (callback: () => Promise): Promise => { + return statementMutex.runExclusive(callback); }; /** @@ -115,7 +117,7 @@ export async function _openDB( * This executes SQL statements in a batch. */ const executeBatch = async (sql: string, bindings?: any[][]): Promise => { - return _acquireExecuteLock(async () => { + return _acquireExecuteLock(async (): Promise => { let affectedRows = 0; const str = sqlite3.str_new(db, sql); @@ -127,7 +129,8 @@ export async function _openDB( const prepared = await sqlite3.prepare_v2(db, query); if (prepared === null) { return { - rowsAffected: 0 + rowsAffected: 0, + rows: { _array: [], length: 0 } }; } const wrappedBindings = bindings ? bindings : []; @@ -158,13 +161,15 @@ export async function _openDB( } catch (err) { await executeSingleStatement('ROLLBACK'); return { - rowsAffected: 0 + rowsAffected: 0, + rows: { _array: [], length: 0 } }; } finally { sqlite3.str_finish(str); } const result = { - rowsAffected: affectedRows + rowsAffected: affectedRows, + rows: { _array: [], length: 0 } }; return result; diff --git a/packages/web/tests/crud.test.ts b/packages/web/tests/crud.test.ts index c615d11b..714080b3 100644 --- a/packages/web/tests/crud.test.ts +++ b/packages/web/tests/crud.test.ts @@ -3,6 +3,7 @@ import { AbstractPowerSyncDatabase, Column, ColumnType, CrudEntry, Schema, Table import { PowerSyncDatabase } from '@powersync/web'; import { v4 as uuid } from 'uuid'; import { generateTestDb } from './utils/testDb'; +import pDefer from 'p-defer'; const testId = '2290de4f-0488-4e50-abed-f8e8eb1d0b42'; @@ -289,4 +290,25 @@ describe('CRUD Tests', () => { await tx2.complete(); expect(await powersync.getNextCrudTransaction()).equals(null); }); + + it('Transaction exclusivity', async () => { + const outside = pDefer(); + const inTx = pDefer(); + + const txPromise = powersync.writeTransaction(async (tx) => { + await tx.execute('INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test1']); + inTx.resolve(); + await outside.promise; + await tx.rollback(); + }); + + await inTx.promise; + + const r = powersync.getOptional('SELECT * FROM assets WHERE id = ?', [testId]); + await new Promise((resolve) => setTimeout(resolve, 10)); + outside.resolve(); + + await txPromise; + expect(await r).toEqual(null); + }); }); diff --git a/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx b/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx index 44311362..b628ae8d 100644 --- a/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx +++ b/tools/diagnostics-app/src/app/views/sync-diagnostics.tsx @@ -1,5 +1,5 @@ import { NavigationPage } from '@/components/navigation/NavigationPage'; -import { clearData, syncErrorTracker } from '@/library/powersync/ConnectionManager'; +import { clearData, db, syncErrorTracker } from '@/library/powersync/ConnectionManager'; import { Box, Button, @@ -15,7 +15,6 @@ import { styled } from '@mui/material'; import { DataGrid, GridColDef } from '@mui/x-data-grid'; -import { useQuery } from '@powersync/react'; import React from 'react'; const BUCKETS_QUERY = ` @@ -24,9 +23,9 @@ WITH (SELECT bucket, row_type, - sum(length(data)) as data_size, + sum(case when op = 3 and superseded = 0 then length(data) else 0 end) as data_size, sum(length(row_type) + length(row_id) + length(bucket) + length(key) + 40) as metadata_size, - count() as row_count + sum(case when op = 3 and superseded = 0 then 1 else 0 end) as row_count FROM ps_oplog GROUP BY bucket, row_type), oplog_stats AS @@ -51,23 +50,65 @@ FROM local_bucket_data local LEFT JOIN oplog_stats stats ON stats.name = local.id`; const TABLES_QUERY = ` -SELECT row_type as name, count() as count, sum(length(data)) as size FROM ps_oplog GROUP BY row_type +SELECT row_type as name, count() as count, sum(length(data)) as size FROM ps_oplog WHERE superseded = 0 and op = 3 GROUP BY row_type `; -export default function SyncDiagnosticsPage() { - const { data: bucketRows, isLoading: bucketRowsLoading } = useQuery(BUCKETS_QUERY, undefined, { - rawTableNames: true, - tables: ['ps_oplog', 'ps_data_local__local_bucket_data'], - throttleMs: 500 - }); - const { data: tableRows, isLoading: tableRowsLoading } = useQuery(TABLES_QUERY, undefined, { - rawTableNames: true, - tables: ['ps_oplog', 'ps_data_local__local_bucket_data'], - throttleMs: 500 - }); +const BUCKETS_QUERY_FAST = ` +SELECT + local.id as name, + '[]' as tables, + 0 as data_size, + 0 as metadata_size, + 0 as row_count, + local.download_size, + local.total_operations, + local.downloading +FROM local_bucket_data local`; +export default function SyncDiagnosticsPage() { + const [bucketRows, setBucketRows] = React.useState(null); + const [tableRows, setTableRows] = React.useState(null); const [syncError, setSyncError] = React.useState(syncErrorTracker.lastSyncError); + const bucketRowsLoading = bucketRows == null; + const tableRowsLoading = tableRows == null; + + const refreshStats = async () => { + // Similar to db.currentState.hasSynced, but synchronized to the onChange events + const hasSynced = await db.getOptional('SELECT 1 FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1'); + if (hasSynced != null) { + // These are potentially expensive queries - do not run during initial sync + const bucketRows = await db.getAll(BUCKETS_QUERY); + const tableRows = await db.getAll(TABLES_QUERY); + setBucketRows(bucketRows); + setTableRows(tableRows); + } else { + // Fast query to show progress during initial sync + const bucketRows = await db.getAll(BUCKETS_QUERY_FAST); + setBucketRows(bucketRows); + setTableRows(null); + } + }; + + React.useEffect(() => { + const controller = new AbortController(); + + db.onChangeWithCallback( + { + async onChange(event) { + await refreshStats(); + } + }, + { rawTableNames: true, tables: ['ps_oplog', 'ps_buckets', 'ps_data_local__local_bucket_data'], throttleMs: 500 } + ); + + refreshStats(); + + return () => { + controller.abort(); + }; + }, []); + React.useEffect(() => { const l = syncErrorTracker.registerListener({ lastErrorUpdated(error) { @@ -111,7 +152,7 @@ export default function SyncDiagnosticsPage() { } ]; - const rows = bucketRows.map((r) => { + const rows = (bucketRows ?? []).map((r) => { return { id: r.name, name: r.name, @@ -146,7 +187,7 @@ export default function SyncDiagnosticsPage() { } ]; - const tablesRows = tableRows.map((r) => { + const tablesRows = (tableRows ?? []).map((r) => { return { id: r.name, ...r @@ -181,50 +222,40 @@ export default function SyncDiagnosticsPage() { ); const tablesTable = ( - - - Tables - - - + } + }} + pageSizeOptions={[10, 50, 100]} + disableRowSelectionOnClick + /> ); const bucketsTable = ( - - - Buckets - - - + }, + sorting: { + sortModel: [{ field: 'total_operations', sort: 'desc' }] + } + }} + pageSizeOptions={[10, 50, 100]} + disableRowSelectionOnClick + /> ); return ( @@ -239,8 +270,18 @@ export default function SyncDiagnosticsPage() { }}> Clear & Redownload - {tableRowsLoading ? : tablesTable} - {bucketRowsLoading ? : bucketsTable} + + + Tables + + {tableRowsLoading ? : tablesTable} + + + + Buckets + + {bucketRowsLoading ? : bucketsTable} + ); diff --git a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts index ac838f2c..4bfda3bc 100644 --- a/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts +++ b/tools/diagnostics-app/src/library/powersync/ConnectionManager.ts @@ -2,6 +2,7 @@ import { BaseListener, BaseObserver, PowerSyncDatabase, + SyncStreamConnectionMethod, WebRemote, WebStreamingSyncImplementation, WebStreamingSyncImplementationOptions @@ -11,6 +12,12 @@ import { DynamicSchemaManager } from './DynamicSchemaManager'; import { RecordingStorageAdapter } from './RecordingStorageAdapter'; import { TokenConnector } from './TokenConnector'; +import { Buffer } from 'buffer'; + +if (typeof self.Buffer == 'undefined') { + self.Buffer = Buffer; +} + Logger.useDefaults(); Logger.setLevel(Logger.DEBUG); @@ -22,6 +29,8 @@ export const db = new PowerSyncDatabase({ }, schema: schemaManager.buildSchema() }); +db.execute('PRAGMA cache_size=-50000'); + export const connector = new TokenConnector(); const remote = new WebRemote(connector); @@ -71,7 +80,7 @@ if (connector.hasCredentials()) { } export async function connect() { - await sync.connect(); + await sync.connect({ connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET }); if (!sync.syncStatus.connected) { // Disconnect but don't wait for it sync.disconnect(); @@ -87,7 +96,7 @@ export async function clearData() { await schemaManager.clear(); await schemaManager.refreshSchema(db.database); if (connector.hasCredentials()) { - await sync.connect(); + await sync.connect({ connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET }); } } diff --git a/tools/diagnostics-app/src/library/powersync/RecordingStorageAdapter.ts b/tools/diagnostics-app/src/library/powersync/RecordingStorageAdapter.ts index ef690085..6c025c05 100644 --- a/tools/diagnostics-app/src/library/powersync/RecordingStorageAdapter.ts +++ b/tools/diagnostics-app/src/library/powersync/RecordingStorageAdapter.ts @@ -42,7 +42,11 @@ export class RecordingStorageAdapter extends SqliteBucketStorage { async syncLocalDatabase(checkpoint: Checkpoint) { const r = await super.syncLocalDatabase(checkpoint); - await this.schemaManager.refreshSchema(this.rdb); + // Refresh schema asynchronously, to allow us to better measure + // performance of initial sync. + setTimeout(() => { + this.schemaManager.refreshSchema(this.rdb); + }, 60); if (r.checkpointValid) { await this.rdb.execute('UPDATE local_bucket_data SET downloading = FALSE'); }