diff --git a/packages/automerge-repo-network-websocket/src/BrowserWebSocketClientAdapter.ts b/packages/automerge-repo-network-websocket/src/BrowserWebSocketClientAdapter.ts index aae61a966..4264128ea 100644 --- a/packages/automerge-repo-network-websocket/src/BrowserWebSocketClientAdapter.ts +++ b/packages/automerge-repo-network-websocket/src/BrowserWebSocketClientAdapter.ts @@ -31,7 +31,6 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter { } connect(peerId: PeerId) { - const onOpen = () => { log(`@ ${this.url}: open`) clearInterval(this.timerId) diff --git a/packages/automerge-repo/src/Repo.ts b/packages/automerge-repo/src/Repo.ts index a9b1417fc..ede0bdd65 100644 --- a/packages/automerge-repo/src/Repo.ts +++ b/packages/automerge-repo/src/Repo.ts @@ -1,13 +1,14 @@ import { next as Automerge } from "@automerge/automerge" import debug from "debug" import { EventEmitter } from "eventemitter3" -import { DocHandle } from "./DocHandle.js" +import { DocHandle, DocHandleEncodedChangePayload } from "./DocHandle.js" import { generateAutomergeUrl, isValidAutomergeUrl, parseAutomergeUrl, parseLegacyUUID, } from "./DocUrl.js" +import { throttle } from "./helpers/throttle.js" import { NetworkAdapter } from "./network/NetworkAdapter.js" import { NetworkSubsystem } from "./network/NetworkSubsystem.js" import { StorageAdapter } from "./storage/StorageAdapter.js" @@ -30,6 +31,11 @@ export class Repo extends EventEmitter { networkSubsystem: NetworkSubsystem /** @hidden */ storageSubsystem?: StorageSubsystem + + /** The debounce rate is adjustable on the repo. */ + /** @hidden */ + saveDebounceRate = 100 + #handleCache: Record> = {} /** By default, we share generously with all peers. */ @@ -47,10 +53,17 @@ export class Repo extends EventEmitter { // up a document by ID. We listen for it in order to wire up storage and network synchronization. this.on("document", async ({ handle, isNew }) => { if (storageSubsystem) { - // Save when the document changes - handle.on("heads-changed", async ({ handle, doc }) => { - await storageSubsystem.saveDoc(handle.documentId, doc) - }) + // Save when the document changes, but no more often than saveDebounceRate. + const saveFn = ({ + handle, + doc, + }: DocHandleEncodedChangePayload) => { + void storageSubsystem.saveDoc(handle.documentId, doc) + } + const debouncedSaveFn = handle.on( + "heads-changed", + throttle(saveFn, this.saveDebounceRate) + ) if (isNew) { // this is a new document, immediately save it diff --git a/packages/automerge-repo/src/helpers/debounce.ts b/packages/automerge-repo/src/helpers/debounce.ts new file mode 100644 index 000000000..70614c7a4 --- /dev/null +++ b/packages/automerge-repo/src/helpers/debounce.ts @@ -0,0 +1,25 @@ +/** throttle( callback, rate ) + * Returns a throttle function with a build in debounce timer that runs after `wait` ms. + * + * Note that the args go inside the parameter and you should be careful not to + * recreate the function on each usage. (In React, see useMemo().) + * + * + * Example usage: + * const callback = throttle((ev) => { doSomethingExpensiveOrOccasional() }, 100) + * target.addEventListener('frequent-event', callback); + * + */ + +export const throttle = ) => ReturnType>( + fn: F, + rate: number +) => { + let timeout: ReturnType + return function (...args: Parameters) { + clearTimeout(timeout) + timeout = setTimeout(() => { + fn.apply(null, args) + }, rate) + } +} diff --git a/packages/automerge-repo/src/helpers/throttle.ts b/packages/automerge-repo/src/helpers/throttle.ts new file mode 100644 index 000000000..7d09a0767 --- /dev/null +++ b/packages/automerge-repo/src/helpers/throttle.ts @@ -0,0 +1,43 @@ +/** Throttle + * Returns a function with a built in throttle timer that runs after `delay` ms. + * + * This function differs from a conventional `throttle` in that it ensures the final + * call will also execute and delays sending the first one until `delay` ms to allow + * additional work to accumulate. + * + * Here's a diagram: + * + * calls +----++++++-----++---- + * dlay ^--v ^--v^--v ^--v + * execs ---+----+---+------+-- + * + * The goal in this design is to create batches of changes without flooding + * communication or storage systems while still feeling responsive. + * (By default we communicate at 10hz / every 100ms.) + * + * Note that the args go inside the parameter and you should be careful not to + * recreate the function on each usage. (In React, see useMemo().) + * + * + * Example usage: + * const callback = debounce((ev) => { doSomethingExpensiveOrOccasional() }, 100) + * target.addEventListener('frequent-event', callback); + * + */ + +export const throttle = ) => ReturnType>( + fn: F, + delay: number +) => { + let lastCall = Date.now() + let wait + let timeout: ReturnType + return function (...args: Parameters) { + wait = lastCall + delay - Date.now() + clearTimeout(timeout) + timeout = setTimeout(() => { + fn.apply(null, args) + lastCall = Date.now() + }, wait) + } +} diff --git a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts index 615111c1a..b801bbd80 100644 --- a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts @@ -19,6 +19,7 @@ import { } from "../network/messages.js" import { PeerId } from "../types.js" import { Synchronizer } from "./Synchronizer.js" +import { throttle } from "../helpers/throttle.js" type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants" @@ -28,6 +29,7 @@ type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants" */ export class DocSynchronizer extends Synchronizer { #log: debug.Debugger + syncDebounceRate = 100 /** Active peers */ #peers: PeerId[] = [] @@ -46,7 +48,10 @@ export class DocSynchronizer extends Synchronizer { const docId = handle.documentId.slice(0, 5) this.#log = debug(`automerge-repo:docsync:${docId}`) - handle.on("change", () => this.#syncWithPeers()) + handle.on( + "change", + throttle(() => this.#syncWithPeers(), this.syncDebounceRate) + ) handle.on("ephemeral-message-outbound", payload => this.#broadcastToPeers(payload) @@ -164,7 +169,9 @@ export class DocSynchronizer extends Synchronizer { } beginSync(peerIds: PeerId[]) { - const newPeers = new Set(peerIds.filter(peerId => !this.#peers.includes(peerId))) + const newPeers = new Set( + peerIds.filter(peerId => !this.#peers.includes(peerId)) + ) this.#log(`beginSync: ${peerIds.join(", ")}`) // HACK: if we have a sync state already, we round-trip it through the encoding system to make diff --git a/packages/automerge-repo/test/Repo.test.ts b/packages/automerge-repo/test/Repo.test.ts index 2ec2332b0..f325002fc 100644 --- a/packages/automerge-repo/test/Repo.test.ts +++ b/packages/automerge-repo/test/Repo.test.ts @@ -238,7 +238,7 @@ describe("Repo", () => { assert.equal(handle.isReady(), true) - await pause() + await pause(150) const repo2 = new Repo({ storage: storageAdapter, @@ -884,7 +884,7 @@ describe("Repo", () => { handle.merge(handle2) // wait for the network to do it's thang - await pause(50) + await pause(350) await charlieHandle.doc() assert.deepStrictEqual(charlieHandle.docSync(), { foo: "baz" })