Skip to content

Commit

Permalink
Merge branch 'debounce-sync-and-save'
Browse files Browse the repository at this point in the history
  • Loading branch information
pvh committed Oct 14, 2023
2 parents cda0087 + 4e726dd commit 704640e
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export class BrowserWebSocketClientAdapter extends WebSocketNetworkAdapter {
}

connect(peerId: PeerId) {

const onOpen = () => {
log(`@ ${this.url}: open`)
clearInterval(this.timerId)
Expand Down
23 changes: 18 additions & 5 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { next as Automerge } from "@automerge/automerge"
import debug from "debug"
import { EventEmitter } from "eventemitter3"
import { DocHandle } from "./DocHandle.js"
import { DocHandle, DocHandleEncodedChangePayload } from "./DocHandle.js"
import {
generateAutomergeUrl,
isValidAutomergeUrl,
parseAutomergeUrl,
parseLegacyUUID,
} from "./DocUrl.js"
import { throttle } from "./helpers/throttle.js"
import { NetworkAdapter } from "./network/NetworkAdapter.js"
import { NetworkSubsystem } from "./network/NetworkSubsystem.js"
import { StorageAdapter } from "./storage/StorageAdapter.js"
Expand All @@ -30,6 +31,11 @@ export class Repo extends EventEmitter<RepoEvents> {
networkSubsystem: NetworkSubsystem
/** @hidden */
storageSubsystem?: StorageSubsystem

/** The debounce rate is adjustable on the repo. */
/** @hidden */
saveDebounceRate = 100

#handleCache: Record<DocumentId, DocHandle<any>> = {}

/** By default, we share generously with all peers. */
Expand All @@ -47,10 +53,17 @@ export class Repo extends EventEmitter<RepoEvents> {
// up a document by ID. We listen for it in order to wire up storage and network synchronization.
this.on("document", async ({ handle, isNew }) => {
if (storageSubsystem) {
// Save when the document changes
handle.on("heads-changed", async ({ handle, doc }) => {
await storageSubsystem.saveDoc(handle.documentId, doc)
})
// Save when the document changes, but no more often than saveDebounceRate.
const saveFn = ({
handle,
doc,
}: DocHandleEncodedChangePayload<any>) => {
void storageSubsystem.saveDoc(handle.documentId, doc)
}
const debouncedSaveFn = handle.on(
"heads-changed",
throttle(saveFn, this.saveDebounceRate)
)

if (isNew) {
// this is a new document, immediately save it
Expand Down
25 changes: 25 additions & 0 deletions packages/automerge-repo/src/helpers/debounce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/** throttle( callback, rate )
* Returns a throttle function with a build in debounce timer that runs after `wait` ms.
*
* Note that the args go inside the parameter and you should be careful not to
* recreate the function on each usage. (In React, see useMemo().)
*
*
* Example usage:
* const callback = throttle((ev) => { doSomethingExpensiveOrOccasional() }, 100)
* target.addEventListener('frequent-event', callback);
*
*/

export const throttle = <F extends (...args: Parameters<F>) => ReturnType<F>>(
fn: F,
rate: number
) => {
let timeout: ReturnType<typeof setTimeout>
return function (...args: Parameters<F>) {
clearTimeout(timeout)
timeout = setTimeout(() => {
fn.apply(null, args)
}, rate)
}
}
43 changes: 43 additions & 0 deletions packages/automerge-repo/src/helpers/throttle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/** Throttle
* Returns a function with a built in throttle timer that runs after `delay` ms.
*
* This function differs from a conventional `throttle` in that it ensures the final
* call will also execute and delays sending the first one until `delay` ms to allow
* additional work to accumulate.
*
* Here's a diagram:
*
* calls +----++++++-----++----
* dlay ^--v ^--v^--v ^--v
* execs ---+----+---+------+--
*
* The goal in this design is to create batches of changes without flooding
* communication or storage systems while still feeling responsive.
* (By default we communicate at 10hz / every 100ms.)
*
* Note that the args go inside the parameter and you should be careful not to
* recreate the function on each usage. (In React, see useMemo().)
*
*
* Example usage:
* const callback = debounce((ev) => { doSomethingExpensiveOrOccasional() }, 100)
* target.addEventListener('frequent-event', callback);
*
*/

export const throttle = <F extends (...args: Parameters<F>) => ReturnType<F>>(
fn: F,
delay: number
) => {
let lastCall = Date.now()
let wait
let timeout: ReturnType<typeof setTimeout>
return function (...args: Parameters<F>) {
wait = lastCall + delay - Date.now()
clearTimeout(timeout)
timeout = setTimeout(() => {
fn.apply(null, args)
lastCall = Date.now()
}, wait)
}
}
11 changes: 9 additions & 2 deletions packages/automerge-repo/src/synchronizer/DocSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
} from "../network/messages.js"
import { PeerId } from "../types.js"
import { Synchronizer } from "./Synchronizer.js"
import { throttle } from "../helpers/throttle.js"

type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants"

Expand All @@ -28,6 +29,7 @@ type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants"
*/
export class DocSynchronizer extends Synchronizer {
#log: debug.Debugger
syncDebounceRate = 100

/** Active peers */
#peers: PeerId[] = []
Expand All @@ -46,7 +48,10 @@ export class DocSynchronizer extends Synchronizer {
const docId = handle.documentId.slice(0, 5)
this.#log = debug(`automerge-repo:docsync:${docId}`)

handle.on("change", () => this.#syncWithPeers())
handle.on(
"change",
throttle(() => this.#syncWithPeers(), this.syncDebounceRate)
)

handle.on("ephemeral-message-outbound", payload =>
this.#broadcastToPeers(payload)
Expand Down Expand Up @@ -164,7 +169,9 @@ export class DocSynchronizer extends Synchronizer {
}

beginSync(peerIds: PeerId[]) {
const newPeers = new Set(peerIds.filter(peerId => !this.#peers.includes(peerId)))
const newPeers = new Set(
peerIds.filter(peerId => !this.#peers.includes(peerId))
)
this.#log(`beginSync: ${peerIds.join(", ")}`)

// HACK: if we have a sync state already, we round-trip it through the encoding system to make
Expand Down
4 changes: 2 additions & 2 deletions packages/automerge-repo/test/Repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ describe("Repo", () => {

assert.equal(handle.isReady(), true)

await pause()
await pause(150)

const repo2 = new Repo({
storage: storageAdapter,
Expand Down Expand Up @@ -884,7 +884,7 @@ describe("Repo", () => {
handle.merge(handle2)

// wait for the network to do it's thang
await pause(50)
await pause(350)

await charlieHandle.doc()
assert.deepStrictEqual(charlieHandle.docSync(), { foo: "baz" })
Expand Down

0 comments on commit 704640e

Please sign in to comment.