Skip to content

Commit

Permalink
put remote heads gossiping behind a feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjg committed Dec 14, 2023
1 parent 5c8153b commit 33b7bd8
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 33 deletions.
82 changes: 49 additions & 33 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,18 @@ export class Repo extends EventEmitter<RepoEvents> {
peerMetadataByPeerId: Record<PeerId, PeerMetadata> = {}

#remoteHeadsSubscriptions = new RemoteHeadsSubscriptions()
#remoteHeadsGossipingEnabled = false

constructor({
storage,
network,
peerId,
sharePolicy,
isEphemeral = storage === undefined,
enableRemoteHeadsGossiping = false,
}: RepoConfig) {
super()
this.#remoteHeadsGossipingEnabled = enableRemoteHeadsGossiping
this.#log = debug(`automerge-repo:repo`)
this.sharePolicy = sharePolicy ?? this.sharePolicy

Expand Down Expand Up @@ -140,9 +143,11 @@ export class Repo extends EventEmitter<RepoEvents> {
networkSubsystem.send(message)
})

this.#synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId)
})
if (this.#remoteHeadsGossipingEnabled) {
this.#synchronizer.on("open-doc", ({ peerId, documentId }) => {
this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId)
})
}

// STORAGE
// The storage subsystem has access to some form of persistence, and deals with save and loading documents.
Expand Down Expand Up @@ -177,7 +182,7 @@ export class Repo extends EventEmitter<RepoEvents> {

this.sharePolicy(peerId)
.then(shouldShare => {
if (shouldShare) {
if (shouldShare && this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.addGenerousPeer(peerId)
}
})
Expand Down Expand Up @@ -217,7 +222,7 @@ export class Repo extends EventEmitter<RepoEvents> {
if (haveHeadsChanged) {
handle.setRemoteHeads(storageId, message.syncState.theirHeads)

if (storageId) {
if (storageId && this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleImmediateRemoteHeadsChanged(
message.documentId,
storageId,
Expand All @@ -227,45 +232,51 @@ export class Repo extends EventEmitter<RepoEvents> {
}
})

this.#remoteHeadsSubscriptions.on("notify-remote-heads", message => {
this.networkSubsystem.send({
type: "remote-heads-changed",
targetId: message.targetId,
documentId: message.documentId,
newHeads: {
[message.storageId]: {
heads: message.heads,
timestamp: message.timestamp,
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.on("notify-remote-heads", message => {
this.networkSubsystem.send({
type: "remote-heads-changed",
targetId: message.targetId,
documentId: message.documentId,
newHeads: {
[message.storageId]: {
heads: message.heads,
timestamp: message.timestamp,
},
},
},
})
})
})

this.#remoteHeadsSubscriptions.on("change-remote-subs", message => {
this.#log("change-remote-subs", message)
for (const peer of message.peers) {
this.networkSubsystem.send({
type: "remote-subscription-change",
targetId: peer,
add: message.add,
remove: message.remove,
})
}
})
this.#remoteHeadsSubscriptions.on("change-remote-subs", message => {
this.#log("change-remote-subs", message)
for (const peer of message.peers) {
this.networkSubsystem.send({
type: "remote-subscription-change",
targetId: peer,
add: message.add,
remove: message.remove,
})
}
})

this.#remoteHeadsSubscriptions.on("remote-heads-changed", message => {
const handle = this.#handleCache[message.documentId]
handle.setRemoteHeads(message.storageId, message.remoteHeads)
})
this.#remoteHeadsSubscriptions.on("remote-heads-changed", message => {
const handle = this.#handleCache[message.documentId]
handle.setRemoteHeads(message.storageId, message.remoteHeads)
})
}
}

#receiveMessage(message: RepoMessage) {
switch (message.type) {
case "remote-subscription-change":
this.#remoteHeadsSubscriptions.handleControlMessage(message)
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleControlMessage(message)
}
break
case "remote-heads-changed":
this.#remoteHeadsSubscriptions.handleRemoteHeads(message)
if (this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleRemoteHeads(message)
}
break
case "sync":
case "request":
Expand Down Expand Up @@ -483,6 +494,11 @@ export interface RepoConfig {
* all peers). A server only syncs documents that a peer explicitly requests by ID.
*/
sharePolicy?: SharePolicy

/**
* Whether to enable the experimental remote heads gossiping feature
*/
enableRemoteHeadsGossiping?: boolean
}

/** A function that determines whether we should share a document with a peer
Expand Down
7 changes: 7 additions & 0 deletions packages/automerge-repo/test/remoteHeads.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ describe("DocHandle.remoteHeads", () => {
peerId: "bob" as PeerId,
network: [],
storage: new DummyStorageAdapter(),
enableRemoteHeadsGossiping: true,
})
const bobStorageId = await bobRepo.storageId()

Expand All @@ -46,37 +47,43 @@ describe("DocHandle.remoteHeads", () => {
peerId: "left-tab-1" as PeerId,
network: [],
sharePolicy: async () => true,
enableRemoteHeadsGossiping: true,
})
const leftTab2 = new Repo({
peerId: "left-tab-2" as PeerId,
network: [],
sharePolicy: async () => true,
enableRemoteHeadsGossiping: true,
})
const leftServiceWorker = new Repo({
peerId: "left-service-worker" as PeerId,
network: [],
sharePolicy: async peer => peer === "sync-server",
storage: new DummyStorageAdapter(),
isEphemeral: false,
enableRemoteHeadsGossiping: true,
})
const syncServer = new Repo({
peerId: "sync-server" as PeerId,
network: [],
isEphemeral: false,
sharePolicy: async () => false,
storage: new DummyStorageAdapter(),
enableRemoteHeadsGossiping: true,
})
const rightServiceWorker = new Repo({
peerId: "right-service-worker" as PeerId,
network: [],
sharePolicy: async peer => peer === "sync-server",
isEphemeral: false,
storage: new DummyStorageAdapter(),
enableRemoteHeadsGossiping: true,
})
const rightTab = new Repo({
peerId: "right-tab" as PeerId,
network: [],
sharePolicy: async () => true,
enableRemoteHeadsGossiping: true,
})

// connect them all up
Expand Down

0 comments on commit 33b7bd8

Please sign in to comment.