Skip to content

Commit

Permalink
Add network and storage adapter interfaces (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
acurrieclark authored Feb 6, 2024
1 parent 7a341b1 commit 6b19210
Show file tree
Hide file tree
Showing 11 changed files with 557 additions and 348 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
"@automerge/automerge": "^2.1.10",
"@manypkg/cli": "^0.21.2",
"@types/debug": "^4.1.12",
"@types/node": "^20.11.15",
"@types/react": "^18.2.50",
"@types/node": "^20.11.16",
"@types/react": "^18.2.55",
"@types/react-dom": "^18.2.18",
"@types/uuid": "^8.3.4",
"@types/ws": "^8.5.10",
"@typescript-eslint/eslint-plugin": "^6.20.0",
"@typescript-eslint/parser": "^6.20.0",
"@typescript-eslint/eslint-plugin": "^6.21.0",
"@typescript-eslint/parser": "^6.21.0",
"@vitejs/plugin-react": "^3.1.0",
"@vitest/coverage-v8": "^1.2.2",
"@vitest/ui": "^1.2.2",
Expand Down
14 changes: 7 additions & 7 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ import { next as Automerge } from "@automerge/automerge"
import debug from "debug"
import { EventEmitter } from "eventemitter3"
import {
generateAutomergeUrl,
interpretAsDocumentId,
parseAutomergeUrl,
generateAutomergeUrl,
interpretAsDocumentId,
parseAutomergeUrl,
} from "./AutomergeUrl.js"
import { DocHandle, DocHandleEncodedChangePayload } from "./DocHandle.js"
import { RemoteHeadsSubscriptions } from "./RemoteHeadsSubscriptions.js"
import { headsAreSame } from "./helpers/headsAreSame.js"
import { throttle } from "./helpers/throttle.js"
import { NetworkAdapter, type PeerMetadata } from "./network/NetworkAdapter.js"
import { NetworkAdapterInterface, type PeerMetadata } from "./network/NetworkAdapterInterface.js"
import { NetworkSubsystem } from "./network/NetworkSubsystem.js"
import { RepoMessage } from "./network/messages.js"
import { StorageAdapter } from "./storage/StorageAdapter.js"
import { StorageAdapterInterface } from "./storage/StorageAdapterInterface.js"
import { StorageSubsystem } from "./storage/StorageSubsystem.js"
import { StorageId } from "./storage/types.js"
import { CollectionSynchronizer } from "./synchronizer/CollectionSynchronizer.js"
Expand Down Expand Up @@ -513,10 +513,10 @@ export interface RepoConfig {
isEphemeral?: boolean

/** A storage adapter can be provided, or not */
storage?: StorageAdapter
storage?: StorageAdapterInterface

/** One or more network adapters must be provided */
network: NetworkAdapter[]
network: NetworkAdapterInterface[]

/**
* Normal peers typically share generously with everyone (meaning we sync all our documents with
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import assert from "assert"
import { describe, expect, it } from "vitest"
import {
PeerId,
Repo,
type NetworkAdapter,
StorageId,
PeerMetadata,
} from "../../index.js"
import { PeerId, PeerMetadata, Repo, StorageId } from "../../index.js"
import type { NetworkAdapterInterface } from "../../network/NetworkAdapterInterface.js"
import { eventPromise, eventPromises } from "../eventPromise.js"
import { pause } from "../pause.js"

Expand Down Expand Up @@ -35,7 +30,10 @@ export function runAdapterTests(_setup: SetupFn, title?: string): void {

describe(`Adapter acceptance tests ${title ? `(${title})` : ""}`, () => {
it("can sync 2 repos", async () => {
const doTest = async (a: NetworkAdapter[], b: NetworkAdapter[]) => {
const doTest = async (
a: NetworkAdapterInterface[],
b: NetworkAdapterInterface[]
) => {
const aliceRepo = new Repo({ network: a, peerId: alice })
const bobRepo = new Repo({ network: b, peerId: bob })

Expand Down Expand Up @@ -174,7 +172,7 @@ export function runAdapterTests(_setup: SetupFn, title?: string): void {

const NO_OP = () => {}

type Network = NetworkAdapter | NetworkAdapter[]
type Network = NetworkAdapterInterface | NetworkAdapterInterface[]

export type SetupFn = () => Promise<{
adapters: [Network, Network, Network]
Expand Down
4 changes: 3 additions & 1 deletion packages/automerge-repo/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ export {
} from "./AutomergeUrl.js"
export { Repo } from "./Repo.js"
export { NetworkAdapter } from "./network/NetworkAdapter.js"
export type { NetworkAdapterInterface } from "./network/NetworkAdapterInterface.js"
export { isRepoMessage } from "./network/messages.js"
export { StorageAdapter } from "./storage/StorageAdapter.js"
export type { StorageAdapterInterface } from "./storage/StorageAdapterInterface.js"

/** @hidden **/
export * as cbor from "./helpers/cbor.js"
Expand Down Expand Up @@ -68,7 +70,7 @@ export type {
PeerCandidatePayload,
PeerDisconnectedPayload,
PeerMetadata,
} from "./network/NetworkAdapter.js"
} from "./network/NetworkAdapterInterface.js"

export type {
DocumentUnavailableMessage,
Expand Down
52 changes: 7 additions & 45 deletions packages/automerge-repo/src/network/NetworkAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
/* c8 ignore start */

import { EventEmitter } from "eventemitter3"
import { NetworkAdapterEvents, PeerMetadata } from "../index.js"
import { PeerId } from "../types.js"
import { Message } from "./messages.js"
import { StorageId } from "../storage/types.js"

/**
* Describes a peer intent to the system
* storageId: the key for syncState to decide what the other peer already has
* isEphemeral: to decide if we bother recording this peer's sync state
*
*/
export interface PeerMetadata {
storageId?: StorageId
isEphemeral?: boolean
}
import { NetworkAdapterInterface } from "./NetworkAdapterInterface.js"

/** An interface representing some way to connect to other peers
* @deprecated use {@link NetworkAdapterInterface}
*
* @remarks
* The {@link Repo} uses one or more `NetworkAdapter`s to connect to other peers.
* Because the network may take some time to be ready the {@link Repo} will wait
* until the adapter emits a `ready` event before it starts trying to use it
*/
export abstract class NetworkAdapter extends EventEmitter<NetworkAdapterEvents> {
export abstract class NetworkAdapter
extends EventEmitter<NetworkAdapterEvents>
implements NetworkAdapterInterface
{
peerId?: PeerId
peerMetadata?: PeerMetadata

Expand All @@ -43,35 +37,3 @@ export abstract class NetworkAdapter extends EventEmitter<NetworkAdapterEvents>
/** Called by the {@link Repo} to disconnect from the network */
abstract disconnect(): void
}

// events & payloads

export interface NetworkAdapterEvents {
/** Emitted when the network is ready to be used */
ready: (payload: OpenPayload) => void

/** Emitted when the network is closed */
close: () => void

/** Emitted when the network adapter learns about a new peer */
"peer-candidate": (payload: PeerCandidatePayload) => void

/** Emitted when the network adapter learns that a peer has disconnected */
"peer-disconnected": (payload: PeerDisconnectedPayload) => void

/** Emitted when the network adapter receives a message from a peer */
message: (payload: Message) => void
}

export interface OpenPayload {
network: NetworkAdapter
}

export interface PeerCandidatePayload {
peerId: PeerId
peerMetadata: PeerMetadata
}

export interface PeerDisconnectedPayload {
peerId: PeerId
}
77 changes: 77 additions & 0 deletions packages/automerge-repo/src/network/NetworkAdapterInterface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/* c8 ignore start */

import { EventEmitter } from "eventemitter3"
import { PeerId } from "../types.js"
import { Message } from "./messages.js"
import { StorageId } from "../storage/types.js"

/**
* Describes a peer intent to the system
* storageId: the key for syncState to decide what the other peer already has
* isEphemeral: to decide if we bother recording this peer's sync state
*
*/
export interface PeerMetadata {
storageId?: StorageId
isEphemeral?: boolean
}

/** An interface representing some way to connect to other peers
*
* @remarks
* The {@link Repo} uses one or more `NetworkAdapter`s to connect to other peers.
* Because the network may take some time to be ready the {@link Repo} will wait
* until the adapter emits a `ready` event before it starts trying to use it
*/
export interface NetworkAdapterInterface extends EventEmitter<NetworkAdapterEvents> {
peerId?: PeerId
peerMetadata?: PeerMetadata

/** Called by the {@link Repo} to start the connection process
*
* @argument peerId - the peerId of this repo
* @argument peerMetadata - how this adapter should present itself to other peers
*/
connect(peerId: PeerId, peerMetadata?: PeerMetadata): void

/** Called by the {@link Repo} to send a message to a peer
*
* @argument message - the message to send
*/
send(message: Message): void

/** Called by the {@link Repo} to disconnect from the network */
disconnect(): void
}

// events & payloads

export interface NetworkAdapterEvents {
/** Emitted when the network is ready to be used */
ready: (payload: OpenPayload) => void

/** Emitted when the network is closed */
close: () => void

/** Emitted when the network adapter learns about a new peer */
"peer-candidate": (payload: PeerCandidatePayload) => void

/** Emitted when the network adapter learns that a peer has disconnected */
"peer-disconnected": (payload: PeerDisconnectedPayload) => void

/** Emitted when the network adapter receives a message from a peer */
message: (payload: Message) => void
}

export interface OpenPayload {
network: NetworkAdapterInterface
}

export interface PeerCandidatePayload {
peerId: PeerId
peerMetadata: PeerMetadata
}

export interface PeerDisconnectedPayload {
peerId: PeerId
}
24 changes: 13 additions & 11 deletions packages/automerge-repo/src/network/NetworkSubsystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import debug from "debug"
import { EventEmitter } from "eventemitter3"
import { PeerId, SessionId } from "../types.js"
import type {
NetworkAdapter,
NetworkAdapterInterface,
PeerDisconnectedPayload,
PeerMetadata,
} from "./NetworkAdapter.js"
} from "./NetworkAdapterInterface.js"
import {
EphemeralMessage,
MessageContents,
Expand All @@ -21,16 +21,16 @@ const getEphemeralMessageSource = (message: EphemeralMessage) =>

export class NetworkSubsystem extends EventEmitter<NetworkSubsystemEvents> {
#log: debug.Debugger
#adaptersByPeer: Record<PeerId, NetworkAdapter> = {}
#adaptersByPeer: Record<PeerId, NetworkAdapterInterface> = {}

#count = 0
#sessionId: SessionId = Math.random().toString(36).slice(2) as SessionId
#ephemeralSessionCounts: Record<EphemeralMessageSource, number> = {}
#readyAdapterCount = 0
#adapters: NetworkAdapter[] = []
#adapters: NetworkAdapterInterface[] = []

constructor(
adapters: NetworkAdapter[],
adapters: NetworkAdapterInterface[],
public peerId = randomPeerId(),
private peerMetadata: Promise<PeerMetadata>
) {
Expand All @@ -39,7 +39,7 @@ export class NetworkSubsystem extends EventEmitter<NetworkSubsystemEvents> {
adapters.forEach(a => this.addNetworkAdapter(a))
}

addNetworkAdapter(networkAdapter: NetworkAdapter) {
addNetworkAdapter(networkAdapter: NetworkAdapterInterface) {
this.#adapters.push(networkAdapter)
networkAdapter.once("ready", () => {
this.#readyAdapterCount++
Expand Down Expand Up @@ -105,11 +105,13 @@ export class NetworkSubsystem extends EventEmitter<NetworkSubsystemEvents> {
})
})

this.peerMetadata.then(peerMetadata => {
networkAdapter.connect(this.peerId, peerMetadata)
}).catch(err => {
this.#log("error connecting to network", err)
})
this.peerMetadata
.then(peerMetadata => {
networkAdapter.connect(this.peerId, peerMetadata)
})
.catch(err => {
this.#log("error connecting to network", err)
})
}

send(message: MessageContents) {
Expand Down
4 changes: 3 additions & 1 deletion packages/automerge-repo/src/storage/StorageAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { StorageAdapterInterface } from "./StorageAdapterInterface.js"
import { StorageKey, Chunk } from "./types.js"

/** A storage adapter represents some way of storing binary data for a {@link Repo}
* @deprecated use {@link StorageAdapterInterface}
*
* @remarks
* `StorageAdapter`s provide a key/value storage interface. The keys are arrays of strings
* ({@link StorageKey}) and the values are binary blobs.
*/
export abstract class StorageAdapter {
export abstract class StorageAdapter implements StorageAdapterInterface {
/** Load the single value corresponding to `key` */
abstract load(key: StorageKey): Promise<Uint8Array | undefined>

Expand Down
34 changes: 34 additions & 0 deletions packages/automerge-repo/src/storage/StorageAdapterInterface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { StorageKey, Chunk } from "./types.js"

/** A storage adapter represents some way of storing binary data for a {@link Repo}
*
* @remarks
* `StorageAdapter`s provide a key/value storage interface. The keys are arrays of strings
* ({@link StorageKey}) and the values are binary blobs.
*/
export interface StorageAdapterInterface {
/** Load the single value corresponding to `key` */
load(key: StorageKey): Promise<Uint8Array | undefined>

/** Save the value `data` to the key `key` */
save(key: StorageKey, data: Uint8Array): Promise<void>

/** Remove the value corresponding to `key` */
remove(key: StorageKey): Promise<void>

/**
* Load all values with keys that start with `keyPrefix`.
*
* @remarks
* The `keyprefix` will match any key that starts with the given array. For example:
* - `[documentId, "incremental"]` will match all incremental saves
* - `[documentId]` will match all data for a given document.
*
* Be careful! `[documentId]` would also match something like `[documentId, "syncState"]`! We
* aren't using this yet but keep it in mind.)
*/
loadRange(keyPrefix: StorageKey): Promise<Chunk[]>

/** Remove all values with keys that start with `keyPrefix` */
removeRange(keyPrefix: StorageKey): Promise<void>
}
Loading

0 comments on commit 6b19210

Please sign in to comment.