Skip to content

Commit

Permalink
Move sync state back to doc synchronizer
Browse files Browse the repository at this point in the history
  • Loading branch information
paulsonnentag committed Nov 6, 2023
1 parent e66305d commit ca61f99
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 103 deletions.
46 changes: 6 additions & 40 deletions packages/automerge-repo/src/DocHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class DocHandle<T> //

#machine: DocHandleXstateMachine<T>
#timeoutDelay: number
#syncStates: Record<PeerId, A.SyncState> = {}
#remoteHeads: Record<PeerId, A.Heads> = {}

/** The URL of this document
*
Expand Down Expand Up @@ -328,42 +328,16 @@ export class DocHandle<T> //
})
}

/** `getSyncState` can be used to read the sync state of a specific peer
*/
getSyncState(peerId: PeerId): A.SyncState | undefined {
return this.#syncStates[peerId]
}

/** `setSyncState` is called by the doc synchronizer when the sync state changes
/** `setRemoteHeads` is called by the doc synchronizer
* @hidden
*/
setSyncState(peerId: PeerId, syncState: A.SyncState): void {
const previousSyncState = this.#syncStates[peerId]

this.#syncStates[peerId] = syncState
const { theirHeads, sharedHeads } = syncState

if (
syncState.theirHeads &&
(!previousSyncState ||
!previousSyncState.theirHeads ||
!arraysEqual(previousSyncState.theirHeads, syncState.theirHeads))
) {
this.emit("remote-heads", { peerId, heads: syncState.theirHeads })
}

this.emit("sync-state", { peerId, syncState })
setRemoteHeads(peerId: PeerId, heads: A.Heads) {
this.#remoteHeads[peerId] = heads
this.emit("remote-heads", { peerId, heads })
}

getRemoteHeads(peerId: PeerId): A.Heads | undefined {
return this.#syncStates[peerId].theirHeads
}

/** `setSyncStates` is called by the repo when the doc is loaded initially
* @hidden
*/
initSyncStates(syncStates: Record<PeerId, A.SyncState>): void {
this.#syncStates = syncStates
return this.#remoteHeads[peerId]
}

/** `change` is called by the repo when the document is changed locally */
Expand Down Expand Up @@ -668,11 +642,3 @@ const {
AWAIT_NETWORK,
NETWORK_READY,
} = Event

function arraysEqual<T>(a: T[], b: T[]): boolean {
if (a.length !== b.length) return false
for (let i = 0; i < a.length; i++) {
if (a[i] !== b[i]) return false
}
return true
}
21 changes: 8 additions & 13 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,11 @@ export class Repo extends EventEmitter<RepoEvents> {
await storageSubsystem.saveDoc(handle.documentId, handle.docSync()!)
} else {
// Try to load from disk
const [loadedDoc, loadedSyncStates] = await Promise.all([
storageSubsystem.loadDoc(handle.documentId),
storageSubsystem.loadSyncStates(handle.documentId),
])

console.log("loadedSyncStates", handle.documentId, loadedSyncStates)

const loadedDoc = await storageSubsystem.loadDoc(handle.documentId)
if (loadedDoc) {
handle.update(() => loadedDoc)
handle.initSyncStates(loadedSyncStates)
}
}

// todo: debounce
handle.on("sync-state", ({ syncState, peerId }) => {
storageSubsystem.saveSyncState(handle.documentId, peerId, syncState)
})
}

handle.on("unavailable", () => {
Expand Down Expand Up @@ -159,6 +147,13 @@ export class Repo extends EventEmitter<RepoEvents> {
networkSubsystem.on("message", async msg => {
await synchronizer.receiveMessage(msg)
})

if (storageSubsystem) {
// todo: debounce
synchronizer.on("sync-state", ({ documentId, peerId, syncState }) => {
storageSubsystem.saveSyncState(documentId, peerId, syncState)
})
}
}

/** Returns an existing handle if we have it; creates one otherwise. */
Expand Down
26 changes: 7 additions & 19 deletions packages/automerge-repo/src/storage/StorageSubsystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,25 +208,13 @@ export class StorageSubsystem {
this.#compacting = false
}

async loadSyncStates(
documentId: DocumentId
): Promise<Record<PeerId, A.SyncState>> {
const key = [documentId, "sync-state"]
const loaded = await this.#storageAdapter.loadRange(key)

const syncStates: Record<PeerId, A.SyncState> = {}

for (const chunk of loaded) {
const peerId = chunk.key[2] as PeerId
if (chunk.data === undefined) {
console.warn(`Failed to load a syncState for ${documentId}:${peerId}`)
continue
}
const syncState = A.decodeSyncState(chunk.data)
syncStates[peerId] = syncState
}

return syncStates
async loadSyncState(
documentId: DocumentId,
peerId: PeerId
): Promise<A.SyncState | undefined> {
const key = [documentId, "sync-state", peerId]
const loaded = await this.#storageAdapter.load(key)
return loaded ? A.decodeSyncState(loaded) : undefined
}

async saveSyncState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,19 @@ export class CollectionSynchronizer extends Synchronizer {

/** Creates a new docSynchronizer and sets it up to propagate messages */
#initDocSynchronizer(handle: DocHandle<unknown>): DocSynchronizer {
const docSynchronizer = new DocSynchronizer(handle)
const docSynchronizer = new DocSynchronizer({
handle,
onLoadSyncState: peerId => {
if (!this.repo.storageSubsystem) {
return undefined
}

return this.repo.storageSubsystem.loadSyncState(
handle.documentId,
peerId
)
},
})
docSynchronizer.on("message", event => this.emit("message", event))
docSynchronizer.on("sync-state", event => this.emit("sync-state", event))
return docSynchronizer
Expand Down
80 changes: 59 additions & 21 deletions packages/automerge-repo/src/synchronizer/DocSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
import { PeerId } from "../types.js"
import { Synchronizer } from "./Synchronizer.js"
import { throttle } from "../helpers/throttle.js"
import { headsAreSame } from "../helpers/headsAreSame.js"

type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants"

Expand All @@ -34,6 +35,11 @@ type PendingMessage = {
received: Date
}

interface DocSynchronizerConfig {
handle: DocHandle<unknown>
onLoadSyncState?: (peerId: PeerId) => A.SyncState | undefined
}

/**
* DocSynchronizer takes a handle to an Automerge document, and receives & dispatches sync messages
* to bring it inline with all other peers' versions.
Expand All @@ -47,14 +53,23 @@ export class DocSynchronizer extends Synchronizer {

#peerDocumentStatuses: Record<PeerId, PeerDocumentStatus> = {}

/** Sync state for each peer we've communicated with (including inactive peers) */
#syncStates: Record<PeerId, A.SyncState> = {}

#lastSyncs: Record<PeerId, LastSync> = {}

#pendingSyncMessages: Array<PendingMessage> = []

#syncStarted = false

constructor(private handle: DocHandle<unknown>) {
#handle: DocHandle<unknown>
#onLoadSyncState: (peerId: PeerId) => Promise<A.SyncState | undefined>

constructor({ handle, onLoadSyncState }: DocSynchronizerConfig) {
super()
this.#handle = handle
this.#onLoadSyncState = onLoadSyncState ?? (() => undefined)

const docId = handle.documentId.slice(0, 5)
this.#log = debug(`automerge-repo:docsync:${docId}`)

Expand All @@ -79,14 +94,14 @@ export class DocSynchronizer extends Synchronizer {
}

get documentId() {
return this.handle.documentId
return this.#handle.documentId
}

/// PRIVATE

async #syncWithPeers() {
this.#log(`syncWithPeers`)
const doc = await this.handle.doc()
const doc = await this.#handle.doc()
if (doc === undefined) return
this.#peers.forEach(peerId => this.#sendSyncMessage(peerId, doc))
}
Expand All @@ -104,7 +119,7 @@ export class DocSynchronizer extends Synchronizer {
const message: MessageContents<EphemeralMessage> = {
type: "ephemeral",
targetId: peerId,
documentId: this.handle.documentId,
documentId: this.#handle.documentId,
data,
}
this.emit("message", message)
Expand All @@ -121,13 +136,36 @@ export class DocSynchronizer extends Synchronizer {
this.#peerDocumentStatuses[peerId] = "unknown"
}

return this.handle.getSyncState(peerId) ?? A.initSyncState()
const prevSyncState = this.#syncStates[peerId]
if (prevSyncState) {
return prevSyncState
}

return A.initSyncState()
}

#setSyncState(peerId: PeerId, syncState: A.SyncState) {
// TODO: we only need to do this on reconnect

this.handle.setSyncState(peerId, syncState)
const previousSyncState = this.#syncStates[peerId]

this.#syncStates[peerId] = syncState

const haveTheirSyncedHeadsChanged =
syncState.theirHeads &&
(!previousSyncState ||
!previousSyncState.theirHeads ||
!headsAreSame(previousSyncState.theirHeads, syncState.theirHeads))

if (haveTheirSyncedHeadsChanged) {
this.#handle.setRemoteHeads(peerId, syncState.theirHeads)
}

this.emit("sync-state", {
peerId,
syncState,
documentId: this.#handle.documentId,
})
}

#sendSyncMessage(peerId: PeerId, doc: A.Doc<unknown>) {
Expand All @@ -140,7 +178,7 @@ export class DocSynchronizer extends Synchronizer {
const isNew = A.getHeads(doc).length === 0

if (
!this.handle.isReady() &&
!this.#handle.isReady() &&
isNew &&
newSyncState.sharedHeads.length === 0 &&
!Object.values(this.#peerDocumentStatuses).includes("has") &&
Expand All @@ -150,15 +188,15 @@ export class DocSynchronizer extends Synchronizer {
this.emit("message", {
type: "request",
targetId: peerId,
documentId: this.handle.documentId,
documentId: this.#handle.documentId,
data: message,
} as RequestMessage)
} else {
this.emit("message", {
type: "sync",
targetId: peerId,
data: message,
documentId: this.handle.documentId,
documentId: this.#handle.documentId,
} as SyncMessage)
}

Expand Down Expand Up @@ -186,14 +224,14 @@ export class DocSynchronizer extends Synchronizer {
// messages during disconnection.
// TODO: cover that case with a test and remove this hack
peerIds.forEach(peerId => {
const syncStateRaw = this.#getSyncState(peerId)
const syncState = A.decodeSyncState(A.encodeSyncState(syncStateRaw))
const syncState = this.#getSyncState(peerId)
// const syncState = A.decodeSyncState(A.encodeSyncState(syncStateRaw))
this.#setSyncState(peerId, syncState)
})

// At this point if we don't have anything in our storage, we need to use an empty doc to sync
// with; but we don't want to surface that state to the front end
void this.handle.doc([READY, REQUESTING, UNAVAILABLE]).then(doc => {
void this.#handle.doc([READY, REQUESTING, UNAVAILABLE]).then(doc => {
// we register out peers first, then say that sync has started
this.#syncStarted = true
this.#checkDocUnavailable()
Expand Down Expand Up @@ -236,15 +274,15 @@ export class DocSynchronizer extends Synchronizer {
}

receiveEphemeralMessage(message: EphemeralMessage) {
if (message.documentId !== this.handle.documentId)
if (message.documentId !== this.#handle.documentId)
throw new Error(`channelId doesn't match documentId`)

const { senderId, data } = message

const contents = decode(new Uint8Array(data))

this.handle.emit("ephemeral-message", {
handle: this.handle,
this.#handle.emit("ephemeral-message", {
handle: this.#handle,
senderId,
message: contents,
})
Expand All @@ -259,11 +297,11 @@ export class DocSynchronizer extends Synchronizer {
}

receiveSyncMessage(message: SyncMessage | RequestMessage) {
if (message.documentId !== this.handle.documentId)
if (message.documentId !== this.#handle.documentId)
throw new Error(`channelId doesn't match documentId`)

// We need to block receiving the syncMessages until we've checked local storage
if (!this.handle.inState([READY, REQUESTING, UNAVAILABLE])) {
if (!this.#handle.inState([READY, REQUESTING, UNAVAILABLE])) {
this.#pendingSyncMessages.push({ message, received: new Date() })
return
}
Expand All @@ -284,7 +322,7 @@ export class DocSynchronizer extends Synchronizer {
this.#peerDocumentStatuses[message.senderId] = "has"
}

this.handle.update(doc => {
this.#handle.update(doc => {
const [newDoc, newSyncState] = A.receiveSyncMessage(
doc,
this.#getSyncState(message.senderId),
Expand Down Expand Up @@ -312,7 +350,7 @@ export class DocSynchronizer extends Synchronizer {
// if we know none of the peers have the document, tell all our peers that we don't either
if (
this.#syncStarted &&
this.handle.inState([REQUESTING]) &&
this.#handle.inState([REQUESTING]) &&
this.#peers.every(
peerId =>
this.#peerDocumentStatuses[peerId] === "unavailable" ||
Expand All @@ -324,13 +362,13 @@ export class DocSynchronizer extends Synchronizer {
.forEach(peerId => {
const message: MessageContents<DocumentUnavailableMessage> = {
type: "doc-unavailable",
documentId: this.handle.documentId,
documentId: this.#handle.documentId,
targetId: peerId,
}
this.emit("message", message)
})

this.handle.unavailable()
this.#handle.unavailable()
}
}

Expand Down
Loading

0 comments on commit ca61f99

Please sign in to comment.