Skip to content

Commit

Permalink
Merge pull request #227 from powersync-ja/improve-diagnostics-app
Browse files Browse the repository at this point in the history
Diagnostics app improvements
  • Loading branch information
rkistner authored Jul 16, 2024
2 parents 9f35b78 + 1b2b207 commit c4a31ca
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 62 deletions.
5 changes: 5 additions & 0 deletions .changeset/happy-gifts-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'diagnostics-app': minor
---

Faster initial sync and other fixes
159 changes: 100 additions & 59 deletions tools/diagnostics-app/src/app/views/sync-diagnostics.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { NavigationPage } from '@/components/navigation/NavigationPage';
import { clearData, syncErrorTracker } from '@/library/powersync/ConnectionManager';
import { clearData, db, syncErrorTracker } from '@/library/powersync/ConnectionManager';
import {
Box,
Button,
Expand All @@ -15,7 +15,6 @@ import {
styled
} from '@mui/material';
import { DataGrid, GridColDef } from '@mui/x-data-grid';
import { useQuery } from '@powersync/react';
import React from 'react';

const BUCKETS_QUERY = `
Expand All @@ -24,9 +23,9 @@ WITH
(SELECT
bucket,
row_type,
sum(length(data)) as data_size,
sum(case when op = 3 and superseded = 0 then length(data) else 0 end) as data_size,
sum(length(row_type) + length(row_id) + length(bucket) + length(key) + 40) as metadata_size,
count() as row_count
sum(case when op = 3 and superseded = 0 then 1 else 0 end) as row_count
FROM ps_oplog GROUP BY bucket, row_type),
oplog_stats AS
Expand All @@ -51,23 +50,65 @@ FROM local_bucket_data local
LEFT JOIN oplog_stats stats ON stats.name = local.id`;

const TABLES_QUERY = `
SELECT row_type as name, count() as count, sum(length(data)) as size FROM ps_oplog GROUP BY row_type
SELECT row_type as name, count() as count, sum(length(data)) as size FROM ps_oplog WHERE superseded = 0 and op = 3 GROUP BY row_type
`;

export default function SyncDiagnosticsPage() {
const { data: bucketRows, isLoading: bucketRowsLoading } = useQuery(BUCKETS_QUERY, undefined, {
rawTableNames: true,
tables: ['ps_oplog', 'ps_data_local__local_bucket_data'],
throttleMs: 500
});
const { data: tableRows, isLoading: tableRowsLoading } = useQuery(TABLES_QUERY, undefined, {
rawTableNames: true,
tables: ['ps_oplog', 'ps_data_local__local_bucket_data'],
throttleMs: 500
});
const BUCKETS_QUERY_FAST = `
SELECT
local.id as name,
'[]' as tables,
0 as data_size,
0 as metadata_size,
0 as row_count,
local.download_size,
local.total_operations,
local.downloading
FROM local_bucket_data local`;

export default function SyncDiagnosticsPage() {
const [bucketRows, setBucketRows] = React.useState<null | any[]>(null);
const [tableRows, setTableRows] = React.useState<null | any[]>(null);
const [syncError, setSyncError] = React.useState<Error | null>(syncErrorTracker.lastSyncError);

const bucketRowsLoading = bucketRows == null;
const tableRowsLoading = tableRows == null;

const refreshStats = async () => {
// Similar to db.currentState.hasSynced, but synchronized to the onChange events
const hasSynced = await db.getOptional('SELECT 1 FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1');
if (hasSynced != null) {
// These are potentially expensive queries - do not run during initial sync
const bucketRows = await db.getAll(BUCKETS_QUERY);
const tableRows = await db.getAll(TABLES_QUERY);
setBucketRows(bucketRows);
setTableRows(tableRows);
} else {
// Fast query to show progress during initial sync
const bucketRows = await db.getAll(BUCKETS_QUERY_FAST);
setBucketRows(bucketRows);
setTableRows(null);
}
};

React.useEffect(() => {
const controller = new AbortController();

db.onChangeWithCallback(
{
async onChange(event) {
await refreshStats();
}
},
{ rawTableNames: true, tables: ['ps_oplog', 'ps_buckets', 'ps_data_local__local_bucket_data'], throttleMs: 500 }
);

refreshStats();

return () => {
controller.abort();
};
}, []);

React.useEffect(() => {
const l = syncErrorTracker.registerListener({
lastErrorUpdated(error) {
Expand Down Expand Up @@ -111,7 +152,7 @@ export default function SyncDiagnosticsPage() {
}
];

const rows = bucketRows.map((r) => {
const rows = (bucketRows ?? []).map((r) => {
return {
id: r.name,
name: r.name,
Expand Down Expand Up @@ -146,7 +187,7 @@ export default function SyncDiagnosticsPage() {
}
];

const tablesRows = tableRows.map((r) => {
const tablesRows = (tableRows ?? []).map((r) => {
return {
id: r.name,
...r
Expand Down Expand Up @@ -181,50 +222,40 @@ export default function SyncDiagnosticsPage() {
);

const tablesTable = (
<S.QueryResultContainer>
<Typography variant="h4" gutterBottom>
Tables
</Typography>
<DataGrid
autoHeight={true}
rows={tablesRows}
columns={tablesColumns}
initialState={{
pagination: {
paginationModel: {
pageSize: 10
}
<DataGrid
autoHeight={true}
rows={tablesRows}
columns={tablesColumns}
initialState={{
pagination: {
paginationModel: {
pageSize: 10
}
}}
pageSizeOptions={[10, 50, 100]}
disableRowSelectionOnClick
/>
</S.QueryResultContainer>
}
}}
pageSizeOptions={[10, 50, 100]}
disableRowSelectionOnClick
/>
);

const bucketsTable = (
<S.QueryResultContainer>
<Typography variant="h4" gutterBottom>
Buckets
</Typography>
<DataGrid
autoHeight={true}
rows={rows}
columns={columns}
initialState={{
pagination: {
paginationModel: {
pageSize: 50
}
},
sorting: {
sortModel: [{ field: 'total_operations', sort: 'desc' }]
<DataGrid
autoHeight={true}
rows={rows}
columns={columns}
initialState={{
pagination: {
paginationModel: {
pageSize: 50
}
}}
pageSizeOptions={[10, 50, 100]}
disableRowSelectionOnClick
/>
</S.QueryResultContainer>
},
sorting: {
sortModel: [{ field: 'total_operations', sort: 'desc' }]
}
}}
pageSizeOptions={[10, 50, 100]}
disableRowSelectionOnClick
/>
);

return (
Expand All @@ -239,8 +270,18 @@ export default function SyncDiagnosticsPage() {
}}>
Clear & Redownload
</Button>
{tableRowsLoading ? <CircularProgress /> : tablesTable}
{bucketRowsLoading ? <CircularProgress /> : bucketsTable}
<S.QueryResultContainer>
<Typography variant="h4" gutterBottom>
Tables
</Typography>
{tableRowsLoading ? <CircularProgress /> : tablesTable}
</S.QueryResultContainer>
<S.QueryResultContainer>
<Typography variant="h4" gutterBottom>
Buckets
</Typography>
{bucketRowsLoading ? <CircularProgress /> : bucketsTable}
</S.QueryResultContainer>
</S.MainContainer>
</NavigationPage>
);
Expand Down
13 changes: 11 additions & 2 deletions tools/diagnostics-app/src/library/powersync/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
BaseListener,
BaseObserver,
PowerSyncDatabase,
SyncStreamConnectionMethod,
WebRemote,
WebStreamingSyncImplementation,
WebStreamingSyncImplementationOptions
Expand All @@ -11,6 +12,12 @@ import { DynamicSchemaManager } from './DynamicSchemaManager';
import { RecordingStorageAdapter } from './RecordingStorageAdapter';
import { TokenConnector } from './TokenConnector';

import { Buffer } from 'buffer';

if (typeof self.Buffer == 'undefined') {
self.Buffer = Buffer;
}

Logger.useDefaults();
Logger.setLevel(Logger.DEBUG);

Expand All @@ -22,6 +29,8 @@ export const db = new PowerSyncDatabase({
},
schema: schemaManager.buildSchema()
});
db.execute('PRAGMA cache_size=-50000');

export const connector = new TokenConnector();

const remote = new WebRemote(connector);
Expand Down Expand Up @@ -71,7 +80,7 @@ if (connector.hasCredentials()) {
}

export async function connect() {
await sync.connect();
await sync.connect({ connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET });
if (!sync.syncStatus.connected) {
// Disconnect but don't wait for it
sync.disconnect();
Expand All @@ -87,7 +96,7 @@ export async function clearData() {
await schemaManager.clear();
await schemaManager.refreshSchema(db.database);
if (connector.hasCredentials()) {
await sync.connect();
await sync.connect({ connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET });
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ export class RecordingStorageAdapter extends SqliteBucketStorage {

async syncLocalDatabase(checkpoint: Checkpoint) {
const r = await super.syncLocalDatabase(checkpoint);
await this.schemaManager.refreshSchema(this.rdb);
// Refresh schema asynchronously, to allow us to better measure
// performance of initial sync.
setTimeout(() => {
this.schemaManager.refreshSchema(this.rdb);
}, 60);
if (r.checkpointValid) {
await this.rdb.execute('UPDATE local_bucket_data SET downloading = FALSE');
}
Expand Down

0 comments on commit c4a31ca

Please sign in to comment.