Skip to content

Commit

Permalink
Add list of active peers to repo
Browse files Browse the repository at this point in the history
  • Loading branch information
paulsonnentag committed Nov 10, 2023
1 parent 3e95b85 commit 20c4750
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
28 changes: 19 additions & 9 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export class Repo extends EventEmitter<RepoEvents> {

#handleCache: Record<DocumentId, DocHandle<any>> = {}

#synchronizer: CollectionSynchronizer

/** By default, we share generously with all peers. */
/** @hidden */
sharePolicy: SharePolicy = async () => true
Expand Down Expand Up @@ -98,7 +100,7 @@ export class Repo extends EventEmitter<RepoEvents> {
}

// Register the document with the synchronizer. This advertises our interest in the document.
synchronizer.addDocument(handle.documentId)
this.#synchronizer.addDocument(handle.documentId)
})

this.on("delete-document", ({ documentId }) => {
Expand All @@ -114,10 +116,10 @@ export class Repo extends EventEmitter<RepoEvents> {

// SYNCHRONIZER
// The synchronizer uses the network subsystem to keep documents in sync with peers.
const synchronizer = new CollectionSynchronizer(this)
this.#synchronizer = new CollectionSynchronizer(this)

// When the synchronizer emits messages, send them to peers
synchronizer.on("message", message => {
this.#synchronizer.on("message", message => {
this.#log(`sending ${message.type} message to ${message.targetId}`)
networkSubsystem.send(message)
})
Expand All @@ -135,24 +137,27 @@ export class Repo extends EventEmitter<RepoEvents> {
// When we get a new peer, register it with the synchronizer
networkSubsystem.on("peer", async ({ peerId }) => {
this.#log("peer connected", { peerId })
synchronizer.addPeer(peerId)
this.#synchronizer.addPeer(peerId)
})

// When a peer disconnects, remove it from the synchronizer
networkSubsystem.on("peer-disconnected", ({ peerId }) => {
synchronizer.removePeer(peerId)
this.#synchronizer.removePeer(peerId)
})

// Handle incoming messages
networkSubsystem.on("message", async msg => {
await synchronizer.receiveMessage(msg)
await this.#synchronizer.receiveMessage(msg)
})

if (storageSubsystem) {
// todo: debounce
synchronizer.on("sync-state", ({ documentId, peerId, syncState }) => {
storageSubsystem.saveSyncState(documentId, peerId, syncState)
})
this.#synchronizer.on(
"sync-state",
({ documentId, peerId, syncState }) => {
storageSubsystem.saveSyncState(documentId, peerId, syncState)
}
)
}
}

Expand All @@ -179,6 +184,11 @@ export class Repo extends EventEmitter<RepoEvents> {
return this.#handleCache
}

/** Returns a list of all connected peer ids */
get peers(): PeerId[] {
return this.#synchronizer.peers
}

/**
* Creates a new document and returns a handle to it. The initial value of the document is
* an empty object `{}`. Its documentId is generated by the system. we emit a `document` event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,9 @@ export class CollectionSynchronizer extends Synchronizer {
docSynchronizer.endSync(peerId)
}
}

/** Returns a list of all connected peer ids */
get peers(): PeerId[] {
return Array.from(this.#peers)
}
}
9 changes: 9 additions & 0 deletions packages/automerge-repo/test/Repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,15 @@ describe("Repo", () => {
A.getHeads(charlieHandle.docSync())
)
})

it("can report the connected peers", async () => {
const { bobRepo, charlieRepo, teardown } = await setup()

assert.deepStrictEqual(bobRepo.peers, ["alice", "charlie"])
assert.deepStrictEqual(charlieRepo.peers, ["bob"])

teardown()
})
})

describe("with peers (mesh network)", () => {
Expand Down

0 comments on commit 20c4750

Please sign in to comment.