From ca61f99ee518b0446ed8b86ea1213dd50d840c7e Mon Sep 17 00:00:00 2001 From: Paul Sonnentag Date: Mon, 6 Nov 2023 13:33:36 +0100 Subject: [PATCH] Move sync state back to doc synchronizer --- packages/automerge-repo/src/DocHandle.ts | 46 ++--------- packages/automerge-repo/src/Repo.ts | 21 ++--- .../src/storage/StorageSubsystem.ts | 26 ++---- .../synchronizer/CollectionSynchronizer.ts | 14 +++- .../src/synchronizer/DocSynchronizer.ts | 80 ++++++++++++++----- .../test/DocSynchronizer.test.ts | 16 +++- packages/automerge-repo/test/Repo.test.ts | 6 +- 7 files changed, 106 insertions(+), 103 deletions(-) diff --git a/packages/automerge-repo/src/DocHandle.ts b/packages/automerge-repo/src/DocHandle.ts index 535ad8d6e..4b56d3727 100644 --- a/packages/automerge-repo/src/DocHandle.ts +++ b/packages/automerge-repo/src/DocHandle.ts @@ -39,7 +39,7 @@ export class DocHandle // #machine: DocHandleXstateMachine #timeoutDelay: number - #syncStates: Record = {} + #remoteHeads: Record = {} /** The URL of this document * @@ -328,42 +328,16 @@ export class DocHandle // }) } - /** `getSyncState` can be used to read the sync state of a specific peer - */ - getSyncState(peerId: PeerId): A.SyncState | undefined { - return this.#syncStates[peerId] - } - - /** `setSyncState` is called by the doc synchronizer when the sync state changes + /** `setRemoteHeads` is called by the doc synchronizer * @hidden */ - setSyncState(peerId: PeerId, syncState: A.SyncState): void { - const previousSyncState = this.#syncStates[peerId] - - this.#syncStates[peerId] = syncState - const { theirHeads, sharedHeads } = syncState - - if ( - syncState.theirHeads && - (!previousSyncState || - !previousSyncState.theirHeads || - !arraysEqual(previousSyncState.theirHeads, syncState.theirHeads)) - ) { - this.emit("remote-heads", { peerId, heads: syncState.theirHeads }) - } - - this.emit("sync-state", { peerId, syncState }) + setRemoteHeads(peerId: PeerId, heads: A.Heads) { + this.#remoteHeads[peerId] = heads + this.emit("remote-heads", { peerId, heads }) } getRemoteHeads(peerId: PeerId): A.Heads | undefined { - return this.#syncStates[peerId].theirHeads - } - - /** `setSyncStates` is called by the repo when the doc is loaded initially - * @hidden - */ - initSyncStates(syncStates: Record): void { - this.#syncStates = syncStates + return this.#remoteHeads[peerId] } /** `change` is called by the repo when the document is changed locally */ @@ -668,11 +642,3 @@ const { AWAIT_NETWORK, NETWORK_READY, } = Event - -function arraysEqual(a: T[], b: T[]): boolean { - if (a.length !== b.length) return false - for (let i = 0; i < a.length; i++) { - if (a[i] !== b[i]) return false - } - return true -} diff --git a/packages/automerge-repo/src/Repo.ts b/packages/automerge-repo/src/Repo.ts index e3ff36d5b..6700aff31 100644 --- a/packages/automerge-repo/src/Repo.ts +++ b/packages/automerge-repo/src/Repo.ts @@ -69,23 +69,11 @@ export class Repo extends EventEmitter { await storageSubsystem.saveDoc(handle.documentId, handle.docSync()!) } else { // Try to load from disk - const [loadedDoc, loadedSyncStates] = await Promise.all([ - storageSubsystem.loadDoc(handle.documentId), - storageSubsystem.loadSyncStates(handle.documentId), - ]) - - console.log("loadedSyncStates", handle.documentId, loadedSyncStates) - + const loadedDoc = await storageSubsystem.loadDoc(handle.documentId) if (loadedDoc) { handle.update(() => loadedDoc) - handle.initSyncStates(loadedSyncStates) } } - - // todo: debounce - handle.on("sync-state", ({ syncState, peerId }) => { - storageSubsystem.saveSyncState(handle.documentId, peerId, syncState) - }) } handle.on("unavailable", () => { @@ -159,6 +147,13 @@ export class Repo extends EventEmitter { networkSubsystem.on("message", async msg => { await synchronizer.receiveMessage(msg) }) + + if (storageSubsystem) { + // todo: debounce + synchronizer.on("sync-state", ({ documentId, peerId, syncState }) => { + storageSubsystem.saveSyncState(documentId, peerId, syncState) + }) + } } /** Returns an existing handle if we have it; creates one otherwise. */ diff --git a/packages/automerge-repo/src/storage/StorageSubsystem.ts b/packages/automerge-repo/src/storage/StorageSubsystem.ts index d2180f61b..0ef417d3a 100644 --- a/packages/automerge-repo/src/storage/StorageSubsystem.ts +++ b/packages/automerge-repo/src/storage/StorageSubsystem.ts @@ -208,25 +208,13 @@ export class StorageSubsystem { this.#compacting = false } - async loadSyncStates( - documentId: DocumentId - ): Promise> { - const key = [documentId, "sync-state"] - const loaded = await this.#storageAdapter.loadRange(key) - - const syncStates: Record = {} - - for (const chunk of loaded) { - const peerId = chunk.key[2] as PeerId - if (chunk.data === undefined) { - console.warn(`Failed to load a syncState for ${documentId}:${peerId}`) - continue - } - const syncState = A.decodeSyncState(chunk.data) - syncStates[peerId] = syncState - } - - return syncStates + async loadSyncState( + documentId: DocumentId, + peerId: PeerId + ): Promise { + const key = [documentId, "sync-state", peerId] + const loaded = await this.#storageAdapter.load(key) + return loaded ? A.decodeSyncState(loaded) : undefined } async saveSyncState( diff --git a/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts b/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts index 92e3b72a1..aaffc2c1f 100644 --- a/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts @@ -35,7 +35,19 @@ export class CollectionSynchronizer extends Synchronizer { /** Creates a new docSynchronizer and sets it up to propagate messages */ #initDocSynchronizer(handle: DocHandle): DocSynchronizer { - const docSynchronizer = new DocSynchronizer(handle) + const docSynchronizer = new DocSynchronizer({ + handle, + onLoadSyncState: peerId => { + if (!this.repo.storageSubsystem) { + return undefined + } + + return this.repo.storageSubsystem.loadSyncState( + handle.documentId, + peerId + ) + }, + }) docSynchronizer.on("message", event => this.emit("message", event)) docSynchronizer.on("sync-state", event => this.emit("sync-state", event)) return docSynchronizer diff --git a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts index a32e63a04..9b092f5ef 100644 --- a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts @@ -20,6 +20,7 @@ import { import { PeerId } from "../types.js" import { Synchronizer } from "./Synchronizer.js" import { throttle } from "../helpers/throttle.js" +import { headsAreSame } from "../helpers/headsAreSame.js" type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants" @@ -34,6 +35,11 @@ type PendingMessage = { received: Date } +interface DocSynchronizerConfig { + handle: DocHandle + onLoadSyncState?: (peerId: PeerId) => A.SyncState | undefined +} + /** * DocSynchronizer takes a handle to an Automerge document, and receives & dispatches sync messages * to bring it inline with all other peers' versions. @@ -47,14 +53,23 @@ export class DocSynchronizer extends Synchronizer { #peerDocumentStatuses: Record = {} + /** Sync state for each peer we've communicated with (including inactive peers) */ + #syncStates: Record = {} + #lastSyncs: Record = {} #pendingSyncMessages: Array = [] #syncStarted = false - constructor(private handle: DocHandle) { + #handle: DocHandle + #onLoadSyncState: (peerId: PeerId) => Promise + + constructor({ handle, onLoadSyncState }: DocSynchronizerConfig) { super() + this.#handle = handle + this.#onLoadSyncState = onLoadSyncState ?? (() => undefined) + const docId = handle.documentId.slice(0, 5) this.#log = debug(`automerge-repo:docsync:${docId}`) @@ -79,14 +94,14 @@ export class DocSynchronizer extends Synchronizer { } get documentId() { - return this.handle.documentId + return this.#handle.documentId } /// PRIVATE async #syncWithPeers() { this.#log(`syncWithPeers`) - const doc = await this.handle.doc() + const doc = await this.#handle.doc() if (doc === undefined) return this.#peers.forEach(peerId => this.#sendSyncMessage(peerId, doc)) } @@ -104,7 +119,7 @@ export class DocSynchronizer extends Synchronizer { const message: MessageContents = { type: "ephemeral", targetId: peerId, - documentId: this.handle.documentId, + documentId: this.#handle.documentId, data, } this.emit("message", message) @@ -121,13 +136,36 @@ export class DocSynchronizer extends Synchronizer { this.#peerDocumentStatuses[peerId] = "unknown" } - return this.handle.getSyncState(peerId) ?? A.initSyncState() + const prevSyncState = this.#syncStates[peerId] + if (prevSyncState) { + return prevSyncState + } + + return A.initSyncState() } #setSyncState(peerId: PeerId, syncState: A.SyncState) { // TODO: we only need to do this on reconnect - this.handle.setSyncState(peerId, syncState) + const previousSyncState = this.#syncStates[peerId] + + this.#syncStates[peerId] = syncState + + const haveTheirSyncedHeadsChanged = + syncState.theirHeads && + (!previousSyncState || + !previousSyncState.theirHeads || + !headsAreSame(previousSyncState.theirHeads, syncState.theirHeads)) + + if (haveTheirSyncedHeadsChanged) { + this.#handle.setRemoteHeads(peerId, syncState.theirHeads) + } + + this.emit("sync-state", { + peerId, + syncState, + documentId: this.#handle.documentId, + }) } #sendSyncMessage(peerId: PeerId, doc: A.Doc) { @@ -140,7 +178,7 @@ export class DocSynchronizer extends Synchronizer { const isNew = A.getHeads(doc).length === 0 if ( - !this.handle.isReady() && + !this.#handle.isReady() && isNew && newSyncState.sharedHeads.length === 0 && !Object.values(this.#peerDocumentStatuses).includes("has") && @@ -150,7 +188,7 @@ export class DocSynchronizer extends Synchronizer { this.emit("message", { type: "request", targetId: peerId, - documentId: this.handle.documentId, + documentId: this.#handle.documentId, data: message, } as RequestMessage) } else { @@ -158,7 +196,7 @@ export class DocSynchronizer extends Synchronizer { type: "sync", targetId: peerId, data: message, - documentId: this.handle.documentId, + documentId: this.#handle.documentId, } as SyncMessage) } @@ -186,14 +224,14 @@ export class DocSynchronizer extends Synchronizer { // messages during disconnection. // TODO: cover that case with a test and remove this hack peerIds.forEach(peerId => { - const syncStateRaw = this.#getSyncState(peerId) - const syncState = A.decodeSyncState(A.encodeSyncState(syncStateRaw)) + const syncState = this.#getSyncState(peerId) + // const syncState = A.decodeSyncState(A.encodeSyncState(syncStateRaw)) this.#setSyncState(peerId, syncState) }) // At this point if we don't have anything in our storage, we need to use an empty doc to sync // with; but we don't want to surface that state to the front end - void this.handle.doc([READY, REQUESTING, UNAVAILABLE]).then(doc => { + void this.#handle.doc([READY, REQUESTING, UNAVAILABLE]).then(doc => { // we register out peers first, then say that sync has started this.#syncStarted = true this.#checkDocUnavailable() @@ -236,15 +274,15 @@ export class DocSynchronizer extends Synchronizer { } receiveEphemeralMessage(message: EphemeralMessage) { - if (message.documentId !== this.handle.documentId) + if (message.documentId !== this.#handle.documentId) throw new Error(`channelId doesn't match documentId`) const { senderId, data } = message const contents = decode(new Uint8Array(data)) - this.handle.emit("ephemeral-message", { - handle: this.handle, + this.#handle.emit("ephemeral-message", { + handle: this.#handle, senderId, message: contents, }) @@ -259,11 +297,11 @@ export class DocSynchronizer extends Synchronizer { } receiveSyncMessage(message: SyncMessage | RequestMessage) { - if (message.documentId !== this.handle.documentId) + if (message.documentId !== this.#handle.documentId) throw new Error(`channelId doesn't match documentId`) // We need to block receiving the syncMessages until we've checked local storage - if (!this.handle.inState([READY, REQUESTING, UNAVAILABLE])) { + if (!this.#handle.inState([READY, REQUESTING, UNAVAILABLE])) { this.#pendingSyncMessages.push({ message, received: new Date() }) return } @@ -284,7 +322,7 @@ export class DocSynchronizer extends Synchronizer { this.#peerDocumentStatuses[message.senderId] = "has" } - this.handle.update(doc => { + this.#handle.update(doc => { const [newDoc, newSyncState] = A.receiveSyncMessage( doc, this.#getSyncState(message.senderId), @@ -312,7 +350,7 @@ export class DocSynchronizer extends Synchronizer { // if we know none of the peers have the document, tell all our peers that we don't either if ( this.#syncStarted && - this.handle.inState([REQUESTING]) && + this.#handle.inState([REQUESTING]) && this.#peers.every( peerId => this.#peerDocumentStatuses[peerId] === "unavailable" || @@ -324,13 +362,13 @@ export class DocSynchronizer extends Synchronizer { .forEach(peerId => { const message: MessageContents = { type: "doc-unavailable", - documentId: this.handle.documentId, + documentId: this.#handle.documentId, targetId: peerId, } this.emit("message", message) }) - this.handle.unavailable() + this.#handle.unavailable() } } diff --git a/packages/automerge-repo/test/DocSynchronizer.test.ts b/packages/automerge-repo/test/DocSynchronizer.test.ts index 74bee7f0f..e151f02b4 100644 --- a/packages/automerge-repo/test/DocSynchronizer.test.ts +++ b/packages/automerge-repo/test/DocSynchronizer.test.ts @@ -22,7 +22,9 @@ describe("DocSynchronizer", () => { const setup = () => { const docId = parseAutomergeUrl(generateAutomergeUrl()).documentId handle = new DocHandle(docId, { isNew: true }) - docSynchronizer = new DocSynchronizer(handle) + docSynchronizer = new DocSynchronizer({ + handle: handle as DocHandle, + }) return { handle, docSynchronizer } } @@ -81,7 +83,9 @@ describe("DocSynchronizer", () => { const docId = parseAutomergeUrl(generateAutomergeUrl()).documentId const handle = new DocHandle(docId, { isNew: false }) - docSynchronizer = new DocSynchronizer(handle) + docSynchronizer = new DocSynchronizer({ + handle: handle as DocHandle, + }) docSynchronizer.beginSync([alice]) handle.request() const message = await eventPromise(docSynchronizer, "message") @@ -93,13 +97,17 @@ describe("DocSynchronizer", () => { const docId = parseAutomergeUrl(generateAutomergeUrl()).documentId const bobHandle = new DocHandle(docId, { isNew: false }) - const bobDocSynchronizer = new DocSynchronizer(bobHandle) + const bobDocSynchronizer = new DocSynchronizer({ + handle: bobHandle as DocHandle, + }) bobDocSynchronizer.beginSync([alice]) bobHandle.request() const message = await eventPromise(bobDocSynchronizer, "message") const aliceHandle = new DocHandle(docId, { isNew: false }) - const aliceDocSynchronizer = new DocSynchronizer(aliceHandle) + const aliceDocSynchronizer = new DocSynchronizer({ + handle: aliceHandle as DocHandle, + }) aliceHandle.request() aliceDocSynchronizer.receiveSyncMessage({ ...message, senderId: bob }) diff --git a/packages/automerge-repo/test/Repo.test.ts b/packages/automerge-repo/test/Repo.test.ts index bef65421c..c1e8c81ca 100644 --- a/packages/automerge-repo/test/Repo.test.ts +++ b/packages/automerge-repo/test/Repo.test.ts @@ -770,9 +770,7 @@ describe("Repo", () => { teardown() }) - it("should save & reload remote heads", async () => { - throw new Error("not implemented") - }) + it.todo("should save & reload remote heads") it("should report the remote heads when they change", async () => { const { bobRepo, charlieRepo, teardown } = await setup({ @@ -810,8 +808,6 @@ describe("Repo", () => { const charlieHeads = A.getHeads(charlieHandle.docSync()) const bobHeads = A.getHeads(handle.docSync()) - console.log("Charlie Heads", charlieHeads) - console.log("Bob Heads", bobHeads) assert.deepStrictEqual(charlieHeads, bobHeads) const nextRemoteHeads = await nextRemoteHeadsPromise