Skip to content

Commit

Permalink
expose load and receive sync message metrics
Browse files Browse the repository at this point in the history
Problem: when analyzing a sync server under heavy load it is difficult
to identify what is causing the load. There are lots of questions we
might have such as "what documents are in memory?" or "which documents are
taking longest  in `receiveSyncMessage`?". 

Solution: expose a `"doc-metrics"` event on `Repo` which is fired
whenever a documnt is loaded or receives a sync message. This can be
wired up to Prometheus or other monitoring tools to allow operators to
answer these questions.

Add num ops and changes to events
  • Loading branch information
alexjg committed Oct 3, 2024
1 parent a052bb5 commit ab7a34f
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 2 deletions.
25 changes: 24 additions & 1 deletion packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import { StorageAdapterInterface } from "./storage/StorageAdapterInterface.js"
import { StorageSubsystem } from "./storage/StorageSubsystem.js"
import { StorageId } from "./storage/types.js"
import { CollectionSynchronizer } from "./synchronizer/CollectionSynchronizer.js"
import { SyncStatePayload } from "./synchronizer/Synchronizer.js"
import {
DocSyncMetrics,
SyncStatePayload,
} from "./synchronizer/Synchronizer.js"
import type { AnyDocumentId, DocumentId, PeerId } from "./types.js"

function randomPeerId() {
Expand Down Expand Up @@ -104,6 +107,9 @@ export class Repo extends EventEmitter<RepoEvents> {
networkSubsystem.send(message)
})

// Forward metrics from doc synchronizers
this.synchronizer.on("metrics", event => this.emit("doc-metrics", event))

if (this.#remoteHeadsGossipingEnabled) {
this.synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId)
Expand All @@ -113,6 +119,12 @@ export class Repo extends EventEmitter<RepoEvents> {
// STORAGE
// The storage subsystem has access to some form of persistence, and deals with save and loading documents.
const storageSubsystem = storage ? new StorageSubsystem(storage) : undefined
if (storageSubsystem) {
storageSubsystem.on("document-loaded", event =>
this.emit("doc-metrics", { type: "doc-loaded", ...event })
)
}

this.storageSubsystem = storageSubsystem

// NETWORK
Expand Down Expand Up @@ -638,6 +650,7 @@ export interface RepoEvents {
"delete-document": (arg: DeleteDocumentPayload) => void
/** A document was marked as unavailable (we don't have it and none of our peers have it) */
"unavailable-document": (arg: DeleteDocumentPayload) => void
"doc-metrics": (arg: DocMetrics) => void
}

export interface DocumentPayload {
Expand All @@ -647,3 +660,13 @@ export interface DocumentPayload {
export interface DeleteDocumentPayload {
documentId: DocumentId
}

export type DocMetrics =
| DocSyncMetrics
| {
type: "doc-loaded"
documentId: DocumentId
durationMillis: number
numOps: number
numChanges: number
}
20 changes: 19 additions & 1 deletion packages/automerge-repo/src/storage/StorageSubsystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,22 @@ import { ChunkInfo, StorageKey, StorageId } from "./types.js"
import { keyHash, headsHash } from "./keyHash.js"
import { chunkTypeFromKey } from "./chunkTypeFromKey.js"
import * as Uuid from "uuid"
import { EventEmitter } from "eventemitter3"

type StorageSubsystemEvents = {
"document-loaded": (arg: {
documentId: DocumentId
durationMillis: number
numOps: number
numChanges: number
}) => void
}

/**
* The storage subsystem is responsible for saving and loading Automerge documents to and from
* storage adapter. It also provides a generic key/value storage interface for other uses.
*/
export class StorageSubsystem {
export class StorageSubsystem extends EventEmitter<StorageSubsystemEvents> {
/** The storage adapter to use for saving and loading documents */
#storageAdapter: StorageAdapterInterface

Expand All @@ -29,6 +39,7 @@ export class StorageSubsystem {
#log = debug(`automerge-repo:storage-subsystem`)

constructor(storageAdapter: StorageAdapterInterface) {
super()
this.#storageAdapter = storageAdapter
}

Expand Down Expand Up @@ -130,7 +141,14 @@ export class StorageSubsystem {
if (binary.length === 0) return null

// Load into an Automerge document
const start = performance.now()
const newDoc = A.loadIncremental(A.init(), binary) as A.Doc<T>
const end = performance.now()
this.emit("document-loaded", {
documentId,
durationMillis: end - start,
...A.stats(newDoc),
})

// Record the latest heads for the document
this.#storedHeads.set(documentId, A.getHeads(newDoc))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export class CollectionSynchronizer extends Synchronizer {
docSynchronizer.on("message", event => this.emit("message", event))
docSynchronizer.on("open-doc", event => this.emit("open-doc", event))
docSynchronizer.on("sync-state", event => this.emit("sync-state", event))
docSynchronizer.on("metrics", event => this.emit("metrics", event))
return docSynchronizer
}

Expand Down
8 changes: 8 additions & 0 deletions packages/automerge-repo/src/synchronizer/DocSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,19 @@ export class DocSynchronizer extends Synchronizer {

this.#withSyncState(message.senderId, syncState => {
this.#handle.update(doc => {
const start = performance.now()
const [newDoc, newSyncState] = A.receiveSyncMessage(
doc,
syncState,
message.data
)
const end = performance.now()
this.emit("metrics", {
type: "receive-sync-message",
documentId: this.#handle.documentId,
durationMillis: end - start,
...A.stats(doc),
})

this.#setSyncState(message.senderId, newSyncState)

Expand Down
9 changes: 9 additions & 0 deletions packages/automerge-repo/src/synchronizer/Synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface SynchronizerEvents {
message: (payload: MessageContents) => void
"sync-state": (payload: SyncStatePayload) => void
"open-doc": (arg: OpenDocMessage) => void
metrics: (arg: DocSyncMetrics) => void
}

/** Notify the repo that the sync state has changed */
Expand All @@ -23,3 +24,11 @@ export interface SyncStatePayload {
documentId: DocumentId
syncState: SyncState
}

export type DocSyncMetrics = {
type: "receive-sync-message"
documentId: DocumentId
durationMillis: number
numOps: number
numChanges: number
}

0 comments on commit ab7a34f

Please sign in to comment.