diff --git a/.gitignore b/.gitignore index f015d4135..2d5f8d25e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ coverage docs stats.html yarn-error.log +.DS_Store +wallaby.conf.cjs diff --git a/examples/react-counter/package.json b/examples/react-counter/package.json index cbcb65cc1..db7627039 100644 --- a/examples/react-counter/package.json +++ b/examples/react-counter/package.json @@ -2,7 +2,7 @@ "name": "@automerge/automerge-repo-demo-counter", "repository": "https://github.com/automerge/automerge-repo/tree/master/examples/react-counter", "private": true, - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "type": "module", "scripts": { "dev": "vite --open", @@ -10,11 +10,11 @@ }, "dependencies": { "@automerge/automerge": "^2.1.0", - "@automerge/automerge-repo": "^1.1.0-alpha.2", - "@automerge/automerge-repo-network-messagechannel": "^1.1.0-alpha.2", - "@automerge/automerge-repo-network-websocket": "^1.1.0-alpha.2", - "@automerge/automerge-repo-react-hooks": "^1.1.0-alpha.2", - "@automerge/automerge-repo-storage-indexeddb": "^1.1.0-alpha.2", + "@automerge/automerge-repo": "^1.1.0-alpha.4", + "@automerge/automerge-repo-network-messagechannel": "^1.1.0-alpha.4", + "@automerge/automerge-repo-network-websocket": "^1.1.0-alpha.4", + "@automerge/automerge-repo-react-hooks": "^1.1.0-alpha.4", + "@automerge/automerge-repo-storage-indexeddb": "^1.1.0-alpha.4", "react": "^18.2.0", "react-dom": "^18.2.0" } diff --git a/examples/react-todo/package.json b/examples/react-todo/package.json index fae7e8654..a6e05d409 100644 --- a/examples/react-todo/package.json +++ b/examples/react-todo/package.json @@ -2,7 +2,7 @@ "name": "@automerge/automerge-repo-demo-todo", "repository": "https://github.com/automerge/automerge-repo/tree/master/examples/react-todo", "private": true, - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "type": "module", "scripts": { "dev": "vite --open", @@ -10,11 +10,11 @@ }, "dependencies": { "@automerge/automerge": "^2.1.0", - "@automerge/automerge-repo": "^1.1.0-alpha.2", - "@automerge/automerge-repo-network-broadcastchannel": "^1.1.0-alpha.2", - "@automerge/automerge-repo-network-websocket": "^1.1.0-alpha.2", - "@automerge/automerge-repo-react-hooks": "^1.1.0-alpha.2", - "@automerge/automerge-repo-storage-indexeddb": "^1.1.0-alpha.2", + "@automerge/automerge-repo": "^1.1.0-alpha.4", + "@automerge/automerge-repo-network-broadcastchannel": "^1.1.0-alpha.4", + "@automerge/automerge-repo-network-websocket": "^1.1.0-alpha.4", + "@automerge/automerge-repo-react-hooks": "^1.1.0-alpha.4", + "@automerge/automerge-repo-storage-indexeddb": "^1.1.0-alpha.4", "@ibm/plex": "^6.1.1", "autoprefixer": "^10.4.13", "classnames": "^2.3.2", diff --git a/examples/react-use-awareness/package.json b/examples/react-use-awareness/package.json index f53d44b62..6dcd46906 100644 --- a/examples/react-use-awareness/package.json +++ b/examples/react-use-awareness/package.json @@ -2,16 +2,16 @@ "name": "automerge-use-awareness-example-project", "repository": "https://github.com/automerge/automerge-repo/tree/master/examples/react-use-awareness", "private": true, - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "type": "module", "scripts": { "dev": "vite", "preview": "vite preview" }, "dependencies": { - "@automerge/automerge-repo": "^1.1.0-alpha.2", - "@automerge/automerge-repo-network-broadcastchannel": "^1.1.0-alpha.2", - "@automerge/automerge-repo-react-hooks": "^1.1.0-alpha.2", + "@automerge/automerge-repo": "^1.1.0-alpha.4", + "@automerge/automerge-repo-network-broadcastchannel": "^1.1.0-alpha.4", + "@automerge/automerge-repo-react-hooks": "^1.1.0-alpha.4", "eventemitter3": "^5.0.1", "react": "^18.2.0", "react-dom": "^18.2.0", diff --git a/examples/svelte-counter/package.json b/examples/svelte-counter/package.json index 62b04f596..b6ab25f55 100644 --- a/examples/svelte-counter/package.json +++ b/examples/svelte-counter/package.json @@ -2,7 +2,7 @@ "name": "@automerge/automerge-repo-demo-counter-svelte", "repository": "https://github.com/automerge/automerge-repo/tree/master/examples/svelte-counter", "private": true, - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "type": "module", "scripts": { "dev": "vite", @@ -11,11 +11,11 @@ }, "dependencies": { "@automerge/automerge": "^2.1.0", - "@automerge/automerge-repo": "^1.1.0-alpha.2", - "@automerge/automerge-repo-network-broadcastchannel": "^1.1.0-alpha.2", - "@automerge/automerge-repo-network-websocket": "^1.1.0-alpha.2", - "@automerge/automerge-repo-storage-indexeddb": "^1.1.0-alpha.2", - "@automerge/automerge-repo-svelte-store": "^1.1.0-alpha.2", + "@automerge/automerge-repo": "^1.1.0-alpha.4", + "@automerge/automerge-repo-network-broadcastchannel": "^1.1.0-alpha.4", + "@automerge/automerge-repo-network-websocket": "^1.1.0-alpha.4", + "@automerge/automerge-repo-storage-indexeddb": "^1.1.0-alpha.4", + "@automerge/automerge-repo-svelte-store": "^1.1.0-alpha.4", "svelte": "^3.0.0" }, "devDependencies": { diff --git a/examples/sync-server/package.json b/examples/sync-server/package.json index ac85ff265..5b2465693 100644 --- a/examples/sync-server/package.json +++ b/examples/sync-server/package.json @@ -2,7 +2,7 @@ "name": "@automerge/example-sync-server", "repository": "https://github.com/automerge/automerge-repo/tree/master/examples/sync-server", "private": true, - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "main": "index.js", "license": "MIT", "type": "module", @@ -11,9 +11,9 @@ }, "bin": "./src/index.js", "dependencies": { - "@automerge/automerge-repo": "^1.1.0-alpha.2", - "@automerge/automerge-repo-network-websocket": "^1.1.0-alpha.2", - "@automerge/automerge-repo-storage-nodefs": "^1.1.0-alpha.2", + "@automerge/automerge-repo": "^1.1.0-alpha.4", + "@automerge/automerge-repo-network-websocket": "^1.1.0-alpha.4", + "@automerge/automerge-repo-storage-nodefs": "^1.1.0-alpha.4", "express": "^4.18.1", "ws": "^8.7.0" }, diff --git a/lerna.json b/lerna.json index 8151298ee..01e6887b3 100644 --- a/lerna.json +++ b/lerna.json @@ -3,5 +3,5 @@ "useNx": true, "npmClient": "yarn", "useWorkspaces": true, - "version": "1.1.0-alpha.2" + "version": "1.1.0-alpha.4" } diff --git a/package.json b/package.json index d1bf35f8e..4bb47d44b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo-monorepo", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "Automerge Repo monorepo", "main": "packages/automerge-repo/dist/index.js", "repository": "https://github.com/automerge/automerge-repo", @@ -17,6 +17,7 @@ "start:syncserver": "cross-env DEBUG='WebsocketServer' yarn workspace @automerge/example-sync-server start", "repocheck": "manypkg check", "test": "vitest", + "test:log": "cross-env DEBUG='automerge-repo:*' vitest", "test:coverage": "vitest --coverage", "test:ui": "vitest --coverage --ui", "watch": "lerna run watch --parallel --stream" @@ -36,7 +37,7 @@ "arrowParens": "avoid" }, "devDependencies": { - "@automerge/automerge": "2.1.8-alpha.1", + "@automerge/automerge": "^2.1.8-alpha.2", "@manypkg/cli": "^0.21.0", "@types/debug": "^4.1.7", "@types/node": "^20.4.8", diff --git a/packages/automerge-repo-network-broadcastchannel/package.json b/packages/automerge-repo-network-broadcastchannel/package.json index 0dc80addd..1525adace 100644 --- a/packages/automerge-repo-network-broadcastchannel/package.json +++ b/packages/automerge-repo-network-broadcastchannel/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo-network-broadcastchannel", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "BroadcastChannel network adapter for Automerge Repo", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo-network-broadcastchannel", "author": "Peter van Hardenberg ", @@ -13,7 +13,7 @@ "test": "vitest" }, "dependencies": { - "@automerge/automerge-repo": "^1.1.0-alpha.2" + "@automerge/automerge-repo": "^1.1.0-alpha.4" }, "watch": { "build": { diff --git a/packages/automerge-repo-network-messagechannel/package.json b/packages/automerge-repo-network-messagechannel/package.json index 3ef9a9a4a..c9f03cb0e 100644 --- a/packages/automerge-repo-network-messagechannel/package.json +++ b/packages/automerge-repo-network-messagechannel/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo-network-messagechannel", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "MessageChannel network adapter for Automerge Repo", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo-network-messagechannel", "author": "Peter van Hardenberg ", @@ -13,7 +13,7 @@ "test": "vitest" }, "dependencies": { - "@automerge/automerge-repo": "^1.1.0-alpha.2" + "@automerge/automerge-repo": "^1.1.0-alpha.4" }, "watch": { "build": { diff --git a/packages/automerge-repo-network-websocket/package.json b/packages/automerge-repo-network-websocket/package.json index e3c1b1684..e719b4660 100644 --- a/packages/automerge-repo-network-websocket/package.json +++ b/packages/automerge-repo-network-websocket/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo-network-websocket", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "isomorphic node/browser Websocket network adapter for Automerge Repo", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo-network-websocket", "author": "Peter van Hardenberg ", @@ -13,7 +13,7 @@ "test": "vitest" }, "dependencies": { - "@automerge/automerge-repo": "^1.1.0-alpha.2", + "@automerge/automerge-repo": "^1.1.0-alpha.4", "cbor-x": "^1.3.0", "eventemitter3": "^5.0.1", "isomorphic-ws": "^5.0.0", diff --git a/packages/automerge-repo-react-hooks/package.json b/packages/automerge-repo-react-hooks/package.json index 778e8dcbe..b28bf8b5b 100644 --- a/packages/automerge-repo-react-hooks/package.json +++ b/packages/automerge-repo-react-hooks/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo-react-hooks", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "Hooks to access an Automerge Repo from your react app.", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo-react-hooks", "author": "Peter van Hardenberg ", @@ -15,7 +15,7 @@ }, "dependencies": { "@automerge/automerge": "^2.1.8-alpha.1", - "@automerge/automerge-repo": "^1.1.0-alpha.2", + "@automerge/automerge-repo": "^1.1.0-alpha.4", "eventemitter3": "^5.0.1", "react": "^18.2.0", "react-dom": "^18.2.0", diff --git a/packages/automerge-repo-storage-indexeddb/package.json b/packages/automerge-repo-storage-indexeddb/package.json index e9f8562ad..a7ef8e87b 100644 --- a/packages/automerge-repo-storage-indexeddb/package.json +++ b/packages/automerge-repo-storage-indexeddb/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo-storage-indexeddb", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "IndexedDB storage adapter for Automerge Repo", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo-storage-indexeddb", "author": "Peter van Hardenberg ", @@ -12,7 +12,7 @@ "watch": "npm-watch" }, "dependencies": { - "@automerge/automerge-repo": "^1.1.0-alpha.2" + "@automerge/automerge-repo": "^1.1.0-alpha.4" }, "watch": { "build": { diff --git a/packages/automerge-repo-storage-nodefs/package.json b/packages/automerge-repo-storage-nodefs/package.json index 6c841b0bf..3a5338668 100644 --- a/packages/automerge-repo-storage-nodefs/package.json +++ b/packages/automerge-repo-storage-nodefs/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo-storage-nodefs", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "Simple Node filesystem storage adapter for Automerge Repo", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo-storage-nodefs", "author": "Peter van Hardenberg ", @@ -12,7 +12,7 @@ "watch": "npm-watch" }, "dependencies": { - "@automerge/automerge-repo": "^1.1.0-alpha.2", + "@automerge/automerge-repo": "^1.1.0-alpha.4", "rimraf": "^5.0.1" }, "watch": { diff --git a/packages/automerge-repo-svelte-store/package.json b/packages/automerge-repo-svelte-store/package.json index 90e5bc4f3..83de3c818 100644 --- a/packages/automerge-repo-svelte-store/package.json +++ b/packages/automerge-repo-svelte-store/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo-svelte-store", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "A Svelte store containing your automerge documentsj", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo-svelte-store", "license": "MIT", @@ -15,7 +15,7 @@ "svelte": "^3.0.0" }, "dependencies": { - "@automerge/automerge-repo": "^1.1.0-alpha.2" + "@automerge/automerge-repo": "^1.1.0-alpha.4" }, "watch": { "build": { diff --git a/packages/automerge-repo/package.json b/packages/automerge-repo/package.json index f6159e74d..e1fd90b25 100644 --- a/packages/automerge-repo/package.json +++ b/packages/automerge-repo/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/automerge-repo", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "A repository object to manage a collection of automerge documents", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/automerge-repo", "author": "Peter van Hardenberg ", @@ -13,7 +13,6 @@ "test:coverage": "c8 --reporter=lcov --reporter=html --reporter=text yarn test", "test": "vitest", "test:watch": "npm-watch test", - "test:log": "cross-env DEBUG='automerge-repo:*' yarn test", "fuzz": "ts-node --esm --experimentalSpecifierResolution=node fuzz/fuzz.ts" }, "browser": { @@ -24,7 +23,7 @@ "vite": "^4.4.11" }, "dependencies": { - "@automerge/automerge": "^2.1.8-alpha.1", + "@automerge/automerge": "^2.1.8-alpha.2", "bs58check": "^3.0.1", "cbor-x": "^1.3.0", "debug": "^4.3.4", diff --git a/packages/automerge-repo/src/RemoteHeadsSubscriptions.ts b/packages/automerge-repo/src/RemoteHeadsSubscriptions.ts index 581056191..045565196 100644 --- a/packages/automerge-repo/src/RemoteHeadsSubscriptions.ts +++ b/packages/automerge-repo/src/RemoteHeadsSubscriptions.ts @@ -44,6 +44,9 @@ export class RemoteHeadsSubscriptions extends EventEmitter> = new Map() // Peers we will always share remote heads with even if they are not subscribed #generousPeers: Set = new Set() + // Documents each peer has open, we need this information so we only send remote heads of documents that the peer knows + #subscribedDocsByPeer: Map> = new Map() + #log = debug("automerge-repo:remote-heads-subscriptions") subscribeToRemotes(remotes: StorageId[]) { @@ -89,11 +92,17 @@ export class RemoteHeadsSubscriptions extends EventEmitter { networkSubsystem.send(message) }) + this.#synchronizer.on("open-doc", ({ peerId, documentId }) => { + this.#remoteHeadsSubscriptions.subscribePeerToDoc(peerId, documentId) + }) + // STORAGE // The storage subsystem has access to some form of persistence, and deals with save and loading documents. const storageSubsystem = storage ? new StorageSubsystem(storage) : undefined @@ -332,6 +336,10 @@ export class Repo extends EventEmitter { return this.#synchronizer.peers } + getStorageIdOfPeer(peerId: PeerId): StorageId | undefined { + return this.peerMetadataByPeerId[peerId]?.storageId + } + /** * Creates a new document and returns a handle to it. The initial value of the document is * an empty object `{}`. Its documentId is generated by the system. we emit a `document` event diff --git a/packages/automerge-repo/src/network/NetworkAdapter.ts b/packages/automerge-repo/src/network/NetworkAdapter.ts index 80908f89c..e95a08656 100644 --- a/packages/automerge-repo/src/network/NetworkAdapter.ts +++ b/packages/automerge-repo/src/network/NetworkAdapter.ts @@ -30,10 +30,7 @@ export abstract class NetworkAdapter extends EventEmitter * @argument peerId - the peerId of this repo * @argument peerMetadata - how this adapter should present itself to other peers */ - abstract connect( - peerId: PeerId, - peerMetadata?: PeerMetadata - ): void + abstract connect(peerId: PeerId, peerMetadata?: PeerMetadata): void /** Called by the {@link Repo} to send a message to a peer * diff --git a/packages/automerge-repo/src/network/messages.ts b/packages/automerge-repo/src/network/messages.ts index 4f2bf5c4e..2e3df7c47 100644 --- a/packages/automerge-repo/src/network/messages.ts +++ b/packages/automerge-repo/src/network/messages.ts @@ -104,19 +104,19 @@ export type AuthMessage = { } export type RemoteSubscriptionControlMessage = { - type: "remote-subscription-change", - senderId: PeerId, - targetId: PeerId, - add?: StorageId[], - remove?: StorageId[], + type: "remote-subscription-change" + senderId: PeerId + targetId: PeerId + add?: StorageId[] + remove?: StorageId[] } export type RemoteHeadsChanged = { - type: "remote-heads-changed", - senderId: PeerId, - targetId: PeerId, - documentId: DocumentId, - newHeads: {[key: StorageId]: {heads: string[], timestamp: number}}, + type: "remote-heads-changed" + senderId: PeerId + targetId: PeerId + documentId: DocumentId + newHeads: { [key: StorageId]: { heads: string[]; timestamp: number } } } /** These are message types that a {@link NetworkAdapter} surfaces to a {@link Repo}. */ @@ -128,7 +128,11 @@ export type RepoMessage = | RemoteSubscriptionControlMessage | RemoteHeadsChanged -export type DocMessage = SyncMessage | EphemeralMessage | RequestMessage | DocumentUnavailableMessage +export type DocMessage = + | SyncMessage + | EphemeralMessage + | RequestMessage + | DocumentUnavailableMessage /** These are all the message types that a {@link NetworkAdapter} might see. */ export type Message = RepoMessage | AuthMessage @@ -148,6 +152,12 @@ export interface SyncStateMessage { syncState: SyncState } +/** Notify the repo that a peer started syncing with a doc */ +export interface OpenDocMessage { + peerId: PeerId + documentId: DocumentId +} + // TYPE GUARDS export const isValidRepoMessage = (message: Message): message is RepoMessage => @@ -174,7 +184,9 @@ export const isSyncMessage = (msg: Message): msg is SyncMessage => export const isEphemeralMessage = (msg: Message): msg is EphemeralMessage => msg.type === "ephemeral" -export const isRemoteSubscriptionControlMessage = (msg: Message): msg is RemoteSubscriptionControlMessage => +export const isRemoteSubscriptionControlMessage = ( + msg: Message +): msg is RemoteSubscriptionControlMessage => msg.type === "remote-subscription-change" export const isRemoteHeadsChanged = (msg: Message): msg is RemoteHeadsChanged => diff --git a/packages/automerge-repo/src/storage/keyHash.ts b/packages/automerge-repo/src/storage/keyHash.ts index b35ec5ac1..fa8cdf618 100644 --- a/packages/automerge-repo/src/storage/keyHash.ts +++ b/packages/automerge-repo/src/storage/keyHash.ts @@ -7,11 +7,13 @@ export function keyHash(binary: Uint8Array) { const hash = sha256.hash(binary) return bufferToHexString(hash) } + export function headsHash(heads: A.Heads): string { const encoder = new TextEncoder() const headsbinary = mergeArrays(heads.map((h: string) => encoder.encode(h))) return keyHash(headsbinary) } + function bufferToHexString(data: Uint8Array) { return Array.from(data, byte => byte.toString(16).padStart(2, "0")).join("") } diff --git a/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts b/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts index d5b82b601..91a35994a 100644 --- a/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts @@ -55,6 +55,7 @@ export class CollectionSynchronizer extends Synchronizer { }, }) docSynchronizer.on("message", event => this.emit("message", event)) + docSynchronizer.on("open-doc", event => this.emit("open-doc", event)) docSynchronizer.on("sync-state", event => this.emit("sync-state", event)) return docSynchronizer } diff --git a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts index 3f78341b1..0858d31bf 100644 --- a/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/DocSynchronizer.ts @@ -20,7 +20,6 @@ import { import { PeerId } from "../types.js" import { Synchronizer } from "./Synchronizer.js" import { throttle } from "../helpers/throttle.js" -import { headsAreSame } from "../helpers/headsAreSame.js" type PeerDocumentStatus = "unknown" | "has" | "unavailable" | "wants" @@ -124,9 +123,7 @@ export class DocSynchronizer extends Synchronizer { } #withSyncState(peerId: PeerId, callback: (syncState: A.SyncState) => void) { - if (!this.#peers.includes(peerId)) { - this.#peers.push(peerId) - } + this.#addPeer(peerId) if (!(peerId in this.#peerDocumentStatuses)) { this.#peerDocumentStatuses[peerId] = "unknown" @@ -149,6 +146,13 @@ export class DocSynchronizer extends Synchronizer { pendingCallbacks.push(callback) } + #addPeer(peerId: PeerId) { + if (!this.#peers.includes(peerId)) { + this.#peers.push(peerId) + this.emit("open-doc", { documentId: this.documentId, peerId }) + } + } + #initSyncState(peerId: PeerId, syncState: A.SyncState) { const pendingCallbacks = this.#pendingSyncStateCallbacks[peerId] if (pendingCallbacks) { diff --git a/packages/automerge-repo/src/synchronizer/Synchronizer.ts b/packages/automerge-repo/src/synchronizer/Synchronizer.ts index 2a76a26ab..1b0744ead 100644 --- a/packages/automerge-repo/src/synchronizer/Synchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/Synchronizer.ts @@ -1,6 +1,7 @@ import { EventEmitter } from "eventemitter3" import { MessageContents, + OpenDocMessage, RepoMessage, SyncStateMessage, } from "../network/messages.js" @@ -12,4 +13,5 @@ export abstract class Synchronizer extends EventEmitter { export interface SynchronizerEvents { message: (arg: MessageContents) => void "sync-state": (arg: SyncStateMessage) => void + "open-doc": (arg: OpenDocMessage) => void } diff --git a/packages/automerge-repo/test/RemoteHeadsSubscriptions.test.ts b/packages/automerge-repo/test/RemoteHeadsSubscriptions.test.ts index 775895a2c..3a1238b01 100644 --- a/packages/automerge-repo/test/RemoteHeadsSubscriptions.test.ts +++ b/packages/automerge-repo/test/RemoteHeadsSubscriptions.test.ts @@ -1,15 +1,14 @@ import * as A from "@automerge/automerge" import assert from "assert" import { describe, it } from "vitest" +import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js" import { RemoteHeadsSubscriptions } from "../src/RemoteHeadsSubscriptions.js" import { PeerId, StorageId } from "../src/index.js" -import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js" -import { pause } from "../src/helpers/pause.js" -import { EventEmitter } from "eventemitter3" import { RemoteHeadsChanged, RemoteSubscriptionControlMessage, } from "../src/network/messages.js" +import { waitForMessages } from "./helpers/waitForMessages.js" describe("RepoHeadsSubscriptions", () => { const storageA = "remote-a" as StorageId @@ -224,6 +223,8 @@ describe("RepoHeadsSubscriptions", () => { remoteHeadsSubscriptions, "notify-remote-heads" ) + remoteHeadsSubscriptions.subscribePeerToDoc(peerC, docA) + remoteHeadsSubscriptions.subscribePeerToDoc(peerC, docC) // change message for docA in storageB remoteHeadsSubscriptions.handleRemoteHeads(docAHeadsChangedForStorageB) @@ -260,6 +261,32 @@ describe("RepoHeadsSubscriptions", () => { assert.strictEqual(messages.length, 0) }) + it("should not send remote heads for docs that the peer is not subscribed to", async () => { + const remoteHeadsSubscriptions = new RemoteHeadsSubscriptions() + remoteHeadsSubscriptions.subscribeToRemotes([storageB]) + + // subscribe peer c to storage b + remoteHeadsSubscriptions.handleControlMessage(subscribePeerCToStorageB) + const messagesAfterSubscribePromise = waitForMessages( + remoteHeadsSubscriptions, + "notify-remote-heads" + ) + + // change message for docA in storageB + remoteHeadsSubscriptions.handleRemoteHeads(docAHeadsChangedForStorageB) + + // change heads directly + remoteHeadsSubscriptions.handleImmediateRemoteHeadsChanged( + docC, + storageB, + [] + ) + + // expect peer c to be notified both changes + let messages = await messagesAfterSubscribePromise + assert.strictEqual(messages.length, 0) + }) + it("should ignore sync states with an older timestamp", async () => { const remoteHeadsSubscription = new RemoteHeadsSubscriptions() @@ -321,23 +348,3 @@ describe("RepoHeadsSubscriptions", () => { assert.deepStrictEqual(messages[2].peers, []) }) }) - -async function waitForMessages( - emitter: EventEmitter, - event: string, - timeout: number = 100 -): Promise { - const messages = [] - - const onEvent = message => { - messages.push(message) - } - - emitter.on(event, onEvent) - - await pause(timeout) - - emitter.off(event) - - return messages -} diff --git a/packages/automerge-repo/test/helpers/waitForMessages.ts b/packages/automerge-repo/test/helpers/waitForMessages.ts new file mode 100644 index 000000000..d5b8e196c --- /dev/null +++ b/packages/automerge-repo/test/helpers/waitForMessages.ts @@ -0,0 +1,22 @@ +import { EventEmitter } from "eventemitter3" +import { pause } from "../../src/helpers/pause.js" + +export async function waitForMessages( + emitter: EventEmitter, + event: string, + timeout: number = 100 +): Promise { + const messages = [] + + const onEvent = message => { + messages.push(message) + } + + emitter.on(event, onEvent) + + await pause(timeout) + + emitter.off(event) + + return messages +} diff --git a/packages/automerge-repo/test/remoteHeads.test.ts b/packages/automerge-repo/test/remoteHeads.test.ts index 299af721c..3c20ea2dd 100644 --- a/packages/automerge-repo/test/remoteHeads.test.ts +++ b/packages/automerge-repo/test/remoteHeads.test.ts @@ -1,20 +1,19 @@ +import { MessageChannelNetworkAdapter } from "@automerge/automerge-repo-network-messagechannel" import * as A from "@automerge/automerge/next" import assert from "assert" -import { decode } from "cbor-x" +import { setTimeout } from "timers/promises" import { describe, it } from "vitest" import { generateAutomergeUrl, parseAutomergeUrl } from "../src/AutomergeUrl.js" import { eventPromise } from "../src/helpers/eventPromise.js" -import { pause } from "../src/helpers/pause.js" import { DocHandle, DocHandleRemoteHeadsPayload, PeerId, Repo, } from "../src/index.js" -import { TestDoc } from "./types.js" -import { MessageChannelNetworkAdapter } from "@automerge/automerge-repo-network-messagechannel" -import { setTimeout } from "timers/promises" import { DummyStorageAdapter } from "./helpers/DummyStorageAdapter.js" +import { waitForMessages } from "./helpers/waitForMessages.js" +import { TestDoc } from "./types.js" describe("DocHandle.remoteHeads", () => { const TEST_ID = parseAutomergeUrl(generateAutomergeUrl()).documentId @@ -40,86 +39,207 @@ describe("DocHandle.remoteHeads", () => { assert.deepStrictEqual(handle.getRemoteHeads(bobStorageId), []) }) - it("should report remoteHeads for peers who are several hops away", async () => { - // replicates a tab -> service worker -> sync server <- service worker <- tab scenario - const leftTab = new Repo({ - peerId: "left-tab" as PeerId, - network: [], - sharePolicy: async () => true, - }) - const leftServiceWorker = new Repo({ - peerId: "left-service-worker" as PeerId, - network: [], - sharePolicy: async peer => peer === "sync-server", - storage: new DummyStorageAdapter(), - isEphemeral: false, - }) - const syncServer = new Repo({ - peerId: "sync-server" as PeerId, - network: [], - isEphemeral: false, - sharePolicy: async () => false, - storage: new DummyStorageAdapter(), - }) - const rightServiceWorker = new Repo({ - peerId: "right-service-worker" as PeerId, - network: [], - sharePolicy: async peer => peer === "sync-server", - isEphemeral: false, - storage: new DummyStorageAdapter(), + describe("multi hop sync", () => { + async function setup() { + // setup topology: tab -> service worker -> sync server <- service worker <- tab + const leftTab1 = new Repo({ + peerId: "left-tab-1" as PeerId, + network: [], + sharePolicy: async () => true, + }) + const leftTab2 = new Repo({ + peerId: "left-tab-2" as PeerId, + network: [], + sharePolicy: async () => true, + }) + const leftServiceWorker = new Repo({ + peerId: "left-service-worker" as PeerId, + network: [], + sharePolicy: async peer => peer === "sync-server", + storage: new DummyStorageAdapter(), + isEphemeral: false, + }) + const syncServer = new Repo({ + peerId: "sync-server" as PeerId, + network: [], + isEphemeral: false, + sharePolicy: async () => false, + storage: new DummyStorageAdapter(), + }) + const rightServiceWorker = new Repo({ + peerId: "right-service-worker" as PeerId, + network: [], + sharePolicy: async peer => peer === "sync-server", + isEphemeral: false, + storage: new DummyStorageAdapter(), + }) + const rightTab = new Repo({ + peerId: "right-tab" as PeerId, + network: [], + sharePolicy: async () => true, + }) + + // connect them all up + connectRepos(leftTab1, leftServiceWorker) + connectRepos(leftTab2, leftServiceWorker) + connectRepos(leftServiceWorker, syncServer) + connectRepos(syncServer, rightServiceWorker) + connectRepos(rightServiceWorker, rightTab) + + await setTimeout(100) + + return { + leftTab1, + leftTab2, + leftServiceWorker, + syncServer, + rightServiceWorker, + rightTab, + } + } + + it("should report remoteHeads for peers", async () => { + const { rightTab, rightServiceWorker, leftServiceWorker, leftTab1 } = + await setup() + + // subscribe to the left service worker storage ID on the right tab + rightTab.subscribeToRemotes([await leftServiceWorker.storageId()!]) + + await setTimeout(100) + + // create a doc in the left tab + const leftTabDoc = leftTab1.create() + leftTabDoc.change(d => (d.foo = "bar")) + + // wait for the document to arrive on the right tab + const rightTabDoc = rightTab.find(leftTabDoc.url) + await rightTabDoc.whenReady() + + // wait for the document to arrive in the left service worker + const leftServiceWorkerDoc = leftServiceWorker.find(leftTabDoc.documentId) + await leftServiceWorkerDoc.whenReady() + + const leftServiceWorkerStorageId = await leftServiceWorker.storageId() + let leftSeenByRightPromise = new Promise( + resolve => { + rightTabDoc.on("remote-heads", message => { + if (message.storageId === leftServiceWorkerStorageId) { + resolve(message) + } + }) + } + ) + + // make a change on the right + rightTabDoc.change(d => (d.foo = "baz")) + + // wait for the change to be acknolwedged by the left + const leftSeenByRight = await leftSeenByRightPromise + + assert.deepStrictEqual( + leftSeenByRight.heads, + A.getHeads(leftServiceWorkerDoc.docSync()) + ) }) - const rightTab = new Repo({ - peerId: "right-tab" as PeerId, - network: [], - sharePolicy: async () => true, + + it("should report remoteHeads only for documents the subscriber has open", async () => { + const { leftTab1, rightTab, rightServiceWorker } = await setup() + + // subscribe leftTab to storageId of rightServiceWorker + leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!]) + + await setTimeout(100) + + // create 2 docs in right tab + const rightTabDocA = rightTab.create() + rightTabDocA.change(d => (d.foo = "A")) + + const rightTabDocB = rightTab.create() + rightTabDocB.change(d => (d.foo = "B")) + + // open doc b in left tab 1 + const leftTabDocA = leftTab1.find(rightTabDocA.url) + + const remoteHeadsChangedMessages = ( + await waitForMessages(leftTab1.networkSubsystem, "message") + ).filter(({ type }) => type === "remote-heads-changed") + + // we should only be notified of the head changes of doc A + assert.strictEqual(remoteHeadsChangedMessages.length, 1) + assert.strictEqual( + remoteHeadsChangedMessages[0].documentId, + leftTabDocA.documentId + ) }) - // connect them all up - connectRepos(leftTab, leftServiceWorker) - connectRepos(leftServiceWorker, syncServer) - connectRepos(syncServer, rightServiceWorker) - connectRepos(rightServiceWorker, rightTab) + it("should report remote heads for doc on subscribe if peer already knows them", async () => { + const { leftTab1, leftTab2, rightTab, rightServiceWorker } = await setup() - await setTimeout(100) + // create 2 docs in right tab + const rightTabDocA = rightTab.create() + rightTabDocA.change(d => (d.foo = "A")) - // subscribe to the left service worker storage ID on the right tab - rightTab.subscribeToRemotes([await leftServiceWorker.storageId()!]) + const rightTabDocB = rightTab.create() + rightTabDocB.change(d => (d.foo = "B")) - await setTimeout(100) + // open docs in left tab 1 + const leftTab1DocA = leftTab1.find(rightTabDocA.url) + const leftTab1DocB = leftTab1.find(rightTabDocB.url) - // create a doc in the left tab - const leftTabDoc = leftTab.create() - leftTabDoc.change(d => (d.foo = "bar")) + // subscribe leftTab 1 to storageId of rightServiceWorker + leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!]) - // wait for the document to arrive on the right tab - const rightTabDoc = rightTab.find(leftTabDoc.url) - await rightTabDoc.whenReady() + await setTimeout(200) - // wait for the document to arrive in the left service worker - const leftServiceWorkerDoc = leftServiceWorker.find(leftTabDoc.documentId) - await leftServiceWorkerDoc.whenReady() + // now the left service worker has the remote heads of the right service worker for both doc A and doc B + // if we subscribe from left tab 1 the left service workers should send it's stored remote heads immediately - const leftServiceWorkerStorageId = await leftServiceWorker.storageId() - let leftSeenByRightPromise = new Promise( - resolve => { - rightTabDoc.on("remote-heads", message => { - if (message.storageId === leftServiceWorkerStorageId) { - resolve(message) - } - }) - } - ) + // open doc and subscribe leftTab 2 to storageId of rightServiceWorker + const leftTab2DocA = leftTab2.find(rightTabDocA.url) + leftTab2.subscribeToRemotes([await rightServiceWorker.storageId()!]) + + const remoteHeadsChangedMessages = ( + await waitForMessages(leftTab2.networkSubsystem, "message") + ).filter(({ type }) => type === "remote-heads-changed") - // make a change on the right - rightTabDoc.change(d => (d.foo = "baz")) + // we should only be notified of the head changes of doc A + assert.strictEqual(remoteHeadsChangedMessages.length, 1) + assert.strictEqual( + remoteHeadsChangedMessages[0].documentId, + leftTab1DocA.documentId + ) + }) + + it("should report remote heads for subscribed storage id once we open a new doc", async () => { + const { leftTab1, leftTab2, rightTab, rightServiceWorker } = await setup() + + // create 2 docs in right tab + const rightTabDocA = rightTab.create() + rightTabDocA.change(d => (d.foo = "A")) + + const rightTabDocB = rightTab.create() + rightTabDocB.change(d => (d.foo = "B")) + + await setTimeout(200) - // wait for the change to be acknolwedged by the left - const leftSeenByRight = await leftSeenByRightPromise + // subscribe leftTab 1 to storageId of rightServiceWorker + leftTab1.subscribeToRemotes([await rightServiceWorker.storageId()!]) - assert.deepStrictEqual( - leftSeenByRight.heads, - A.getHeads(leftServiceWorkerDoc.docSync()) - ) + // in leftTab 1 open doc A + const leftTab1DocA = leftTab1.find(rightTabDocA.url) + + const remoteHeadsChangedMessages = ( + await waitForMessages(leftTab1.networkSubsystem, "message") + ).filter(({ type }) => type === "remote-heads-changed") + + console.log(JSON.stringify(remoteHeadsChangedMessages, null, 2)) + + assert.strictEqual(remoteHeadsChangedMessages.length, 1) + assert.strictEqual( + remoteHeadsChangedMessages[0].documentId, + leftTab1DocA.documentId + ) + }) }) }) diff --git a/packages/create-repo-node-app/package.json b/packages/create-repo-node-app/package.json index 1c0024f25..a59901904 100644 --- a/packages/create-repo-node-app/package.json +++ b/packages/create-repo-node-app/package.json @@ -1,6 +1,6 @@ { "name": "@automerge/create-repo-node-app", - "version": "1.1.0-alpha.2", + "version": "1.1.0-alpha.4", "description": "Create an automerge-repo app for node", "repository": "https://github.com/automerge/automerge-repo/tree/master/packages/create-repo-node-app", "author": "Alex Good ", diff --git a/yarn.lock b/yarn.lock index de9fb5bf6..4db5be0e5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -30,7 +30,15 @@ resolved "https://registry.yarnpkg.com/@automerge/automerge-wasm/-/automerge-wasm-0.6.0-alpha.1.tgz#4505b32b9ca6255a2acb866f955ab540241b0f89" integrity sha512-tXVFRO0dee6IaRbcv3IlXEpFtkvGrWEpodl2chvAeERTM68xn5QZHvWm3LzeVcgGl09e3Ck2p3I+yBP/covjtQ== -"@automerge/automerge@2.1.8-alpha.1", "@automerge/automerge@^2.1.8-alpha.1": +"@automerge/automerge@^2.1.0": + version "2.1.7" + resolved "https://registry.yarnpkg.com/@automerge/automerge/-/automerge-2.1.7.tgz#c62d4aec3cce6b53f766a38a9ab74ab69898bbaf" + integrity sha512-/EO7OJyQ+5PXjQShfUn9yrwKSNy23XQujGQC1azL9vgRSwjf2jNc0eZZwxK5gWpjKeMsz3TB4vqoNVMtjKmrxA== + dependencies: + "@automerge/automerge-wasm" "0.5.0" + uuid "^9.0.0" + +"@automerge/automerge@^2.1.8-alpha.1": version "2.1.8-alpha.1" resolved "https://registry.yarnpkg.com/@automerge/automerge/-/automerge-2.1.8-alpha.1.tgz#8ae477cfeb24eb6ab8dee13c07009eaa2507538b" integrity sha512-+nU03L2E6E2VKwquDl6ifqufAVYKHd2jwUrkHHgKTgRMgqza3kY3YvbnfkXJE/Cc1fV37TC+wUhKiO0er/oWzQ== @@ -38,12 +46,12 @@ "@automerge/automerge-wasm" "0.6.0-alpha.1" uuid "^9.0.0" -"@automerge/automerge@^2.1.0": - version "2.1.7" - resolved "https://registry.yarnpkg.com/@automerge/automerge/-/automerge-2.1.7.tgz#c62d4aec3cce6b53f766a38a9ab74ab69898bbaf" - integrity sha512-/EO7OJyQ+5PXjQShfUn9yrwKSNy23XQujGQC1azL9vgRSwjf2jNc0eZZwxK5gWpjKeMsz3TB4vqoNVMtjKmrxA== +"@automerge/automerge@^2.1.8-alpha.2": + version "2.1.8-alpha.2" + resolved "https://registry.yarnpkg.com/@automerge/automerge/-/automerge-2.1.8-alpha.2.tgz#73d01e834e5b399eb1ac0091c841e11eda48a8b0" + integrity sha512-MGR/d5KNp3abM0c2vnZwIIYVSzG9nhx8NfBhMjGhcJstnuKR/OaDIPNWE3bTbBik8vO9YOKBjxgdNZr+28Us0A== dependencies: - "@automerge/automerge-wasm" "0.5.0" + "@automerge/automerge-wasm" "0.6.0-alpha.1" uuid "^9.0.0" "@babel/code-frame@^7.0.0", "@babel/code-frame@^7.10.4", "@babel/code-frame@^7.22.13":