Skip to content

Commit

Permalink
tests all passing
Browse files Browse the repository at this point in the history
  • Loading branch information
pvh committed Jan 3, 2025
1 parent b9f8f9b commit 2b37ea5
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 59 deletions.
32 changes: 24 additions & 8 deletions packages/automerge-repo/src/AutomergeUrl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@ import type {
DocumentId,
AnyDocumentId,
} from "./types.js"
import type { Heads } from "@automerge/automerge/slim"

import * as Uuid from "uuid"
import bs58check from "bs58check"
import {
uint8ArrayFromHexString,
uint8ArrayToHexString,
} from "./helpers/bufferFromHex.js"

import type { Heads as AutomergeHeads } from "@automerge/automerge/slim"

export const urlPrefix = "automerge:"

// We need to define our own version of heads because the AutomergeHeads type is not bs58check encoded
export type UrlHeads = string[] & { __automergeUrlHeadsBrand: unknown }

interface ParsedAutomergeUrl {
/** unencoded DocumentId */
binaryDocumentId: BinaryDocumentId
/** bs58 encoded DocumentId */
documentId: DocumentId
/** Optional array of heads, if specified in URL */
heads?: Heads
heads?: UrlHeads
/** Optional hex array of heads, in Automerge core format */
hexHeads?: string[] // AKA: heads
}

/** Given an Automerge URL, returns the DocumentId in both base58check-encoded form and binary form */
Expand All @@ -34,16 +45,15 @@ export const parseAutomergeUrl = (url: AutomergeUrl): ParsedAutomergeUrl => {
if (!binaryDocumentId) throw new Error("Invalid document URL: " + url)
if (headsSection === undefined) return { binaryDocumentId, documentId }

const encodedHeads = headsSection === "" ? [] : headsSection.split("|")
const heads = encodedHeads.map(head => {
const heads = (headsSection === "" ? [] : headsSection.split("|")) as UrlHeads
const hexHeads = heads.map(head => {
try {
bs58check.decode(head)
return head
return uint8ArrayToHexString(bs58check.decode(head))
} catch (e) {
throw new Error(`Invalid head in URL: ${head}`)
}
})
return { binaryDocumentId, documentId, heads }
return { binaryDocumentId, hexHeads, documentId, heads }
}

/**
Expand Down Expand Up @@ -157,6 +167,12 @@ export const documentIdToBinary = (docId: DocumentId) =>
export const binaryToDocumentId = (docId: BinaryDocumentId) =>
bs58check.encode(docId) as DocumentId

export const encodeHeads = (heads: AutomergeHeads): UrlHeads =>
heads.map(h => bs58check.encode(uint8ArrayFromHexString(h))) as UrlHeads

export const decodeHeads = (heads: UrlHeads): AutomergeHeads =>
heads.map(h => uint8ArrayToHexString(bs58check.decode(h))) as AutomergeHeads

export const parseLegacyUUID = (str: string) => {
if (!Uuid.validate(str)) return undefined
const documentId = Uuid.parse(str) as BinaryDocumentId
Expand Down Expand Up @@ -201,5 +217,5 @@ export const interpretAsDocumentId = (id: AnyDocumentId) => {

type UrlOptions = {
documentId: DocumentId | BinaryDocumentId
heads?: Heads
heads?: UrlHeads
}
75 changes: 44 additions & 31 deletions packages/automerge-repo/src/DocHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import * as A from "@automerge/automerge/slim/next"
import debug from "debug"
import { EventEmitter } from "eventemitter3"
import { assertEvent, assign, createActor, setup, waitFor } from "xstate"
import { stringifyAutomergeUrl } from "./AutomergeUrl.js"
import {
decodeHeads,
encodeHeads,
stringifyAutomergeUrl,
UrlHeads,
} from "./AutomergeUrl.js"
import { encode } from "./helpers/cbor.js"
import { headsAreSame } from "./helpers/headsAreSame.js"
import { withTimeout } from "./helpers/withTimeout.js"
Expand All @@ -29,7 +34,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
#machine

/** If set, this handle will only show the document at these heads */
#fixedHeads?: A.Heads
#fixedHeads?: UrlHeads

/** The last known state of our document. */
#prevDocState: T = A.init<T>()
Expand All @@ -39,7 +44,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
#timeoutDelay = 60_000

/** A dictionary mapping each peer to the last heads we know they have. */
#remoteHeads: Record<StorageId, A.Heads> = {}
#remoteHeads: Record<StorageId, UrlHeads> = {}

/** @hidden */
constructor(
Expand Down Expand Up @@ -289,7 +294,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
if (this.#fixedHeads) {
const doc = this.#doc
if (!doc || this.isUnavailable()) return undefined
return A.view(doc, this.#fixedHeads)
return A.view(doc, decodeHeads(this.#fixedHeads))
}
// Return the document
return !this.isUnavailable() ? this.#doc : undefined
Expand All @@ -312,7 +317,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
if (!this.isReady()) return undefined
if (this.#fixedHeads) {
const doc = this.#doc
return doc ? A.view(doc, this.#fixedHeads) : undefined
return doc ? A.view(doc, decodeHeads(this.#fixedHeads)) : undefined
}
return this.#doc
}
Expand All @@ -322,12 +327,12 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
* This precisely defines the state of a document.
* @returns the current document's heads, or undefined if the document is not ready
*/
heads(): A.Heads | undefined {
heads(): UrlHeads | undefined {
if (!this.isReady()) return undefined
if (this.#fixedHeads) {
return this.#fixedHeads
}
return A.getHeads(this.#doc)
return encodeHeads(A.getHeads(this.#doc))
}

begin() {
Expand All @@ -344,15 +349,17 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
* history views would be quite large under concurrency (every thing in each branch against each other).
* There might be a clever way to think about this, but we haven't found it yet, so for now at least
* we present a single traversable view which excludes concurrency.
* @returns A.Heads[] - The individual heads for every change in the document. Each item is a tagged string[1].
* @returns UrlHeads[] - The individual heads for every change in the document. Each item is a tagged string[1].
*/
history(): A.Heads[] | undefined {
history(): UrlHeads[] | undefined {
if (!this.isReady()) {
return undefined
}
// This just returns all the heads as individual strings.

return A.topoHistoryTraversal(this.#doc).map(h => [h]) as A.Heads[]
return A.topoHistoryTraversal(this.#doc).map(h =>
encodeHeads([h])
) as UrlHeads[]
}

/**
Expand All @@ -368,7 +375,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
* @argument heads - The heads to view the document at. See history().
* @returns DocHandle<T> at the time of `heads`
*/
view(heads: A.Heads): DocHandle<T> {
view(heads: UrlHeads): DocHandle<T> {
if (!this.isReady()) {
throw new Error(
`DocHandle#${this.documentId} is not ready. Check \`handle.isReady()\` before calling view().`
Expand Down Expand Up @@ -398,7 +405,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
* @throws Error if the documents don't share history or if either document is not ready
* @returns Automerge patches that go from one document state to the other
*/
diff(first: A.Heads | DocHandle<T>, second?: A.Heads): A.Patch[] {
diff(first: UrlHeads | DocHandle<T>, second?: UrlHeads): A.Patch[] {
if (!this.isReady()) {
throw new Error(
`DocHandle#${this.documentId} is not ready. Check \`handle.isReady()\` before calling diff().`
Expand All @@ -417,19 +424,19 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
if (!otherHeads) throw new Error("Other document's heads not available")

// Create a temporary merged doc to verify shared history and compute diff
try {
const mergedDoc = A.merge(A.clone(doc), first.docSync()!)
// Use the merged doc to compute the diff
return A.diff(mergedDoc, this.heads()!, otherHeads)
} catch (e) {
throw new Error("Documents do not share history")
}
const mergedDoc = A.merge(A.clone(doc), first.docSync()!)
// Use the merged doc to compute the diff
return A.diff(
mergedDoc,
decodeHeads(this.heads()!),
decodeHeads(otherHeads)
)
}

// Otherwise treat as heads
const from = second ? first : this.heads() || []
const from = second ? first : ((this.heads() || []) as UrlHeads)
const to = second ? second : first
return A.diff(doc, from, to)
return A.diff(doc, decodeHeads(from), decodeHeads(to))
}

/**
Expand All @@ -447,11 +454,15 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
if (!this.isReady()) {
return undefined
}

if (!change) {
change = this.heads()![0]
}
// we return undefined instead of null by convention in this API
return A.inspectChange(this.#doc, change) || undefined
return (
A.inspectChange(this.#doc, decodeHeads([change] as UrlHeads)[0]) ||
undefined
)
}

/**
Expand All @@ -477,13 +488,13 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
* Called by the repo either when a doc handle changes or we receive new remote heads.
* @hidden
*/
setRemoteHeads(storageId: StorageId, heads: A.Heads) {
setRemoteHeads(storageId: StorageId, heads: UrlHeads) {
this.#remoteHeads[storageId] = heads
this.emit("remote-heads", { storageId, heads })
}

/** Returns the heads of the storageId. */
getRemoteHeads(storageId: StorageId): A.Heads | undefined {
getRemoteHeads(storageId: StorageId): UrlHeads | undefined {
return this.#remoteHeads[storageId]
}

Expand Down Expand Up @@ -526,10 +537,10 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
* @returns A set of heads representing the concurrent change that was made.
*/
changeAt(
heads: A.Heads,
heads: UrlHeads,
callback: A.ChangeFn<T>,
options: A.ChangeOptions<T> = {}
): A.Heads[] | undefined {
): UrlHeads[] | undefined {
if (!this.isReady()) {
throw new Error(
`DocHandle#${this.documentId} is not ready. Check \`handle.isReady()\` before accessing the document.`
Expand All @@ -540,13 +551,15 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
`DocHandle#${this.documentId} is in view-only mode at specific heads. Use clone() to create a new document from this state.`
)
}
let resultHeads: A.Heads | undefined = undefined
let resultHeads: UrlHeads | undefined = undefined
this.#machine.send({
type: UPDATE,
payload: {
callback: doc => {
const result = A.changeAt(doc, heads, options, callback)
resultHeads = result.newHeads || undefined
const result = A.changeAt(doc, decodeHeads(heads), options, callback)
resultHeads = result.newHeads
? encodeHeads(result.newHeads)
: undefined
return result.newDoc
},
},
Expand Down Expand Up @@ -652,7 +665,7 @@ export type DocHandleOptions<T> =
isNew?: false

// An optional point in time to lock the document to.
heads?: A.Heads
heads?: UrlHeads

/** The number of milliseconds before we mark this document as unavailable if we don't have it and nobody shares it with us. */
timeoutDelay?: number
Expand Down Expand Up @@ -717,7 +730,7 @@ export interface DocHandleOutboundEphemeralMessagePayload<T> {
/** Emitted when we have new remote heads for this document */
export interface DocHandleRemoteHeadsPayload {
storageId: StorageId
heads: A.Heads
heads: UrlHeads
}

// STATE MACHINE TYPES & CONSTANTS
Expand Down
11 changes: 6 additions & 5 deletions packages/automerge-repo/src/RemoteHeadsSubscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import {
} from "./network/messages.js"
import { StorageId } from "./index.js"
import debug from "debug"
import { UrlHeads } from "./AutomergeUrl.js"

// Notify a DocHandle that remote heads have changed
export type RemoteHeadsSubscriptionEventPayload = {
documentId: DocumentId
storageId: StorageId
remoteHeads: A.Heads
remoteHeads: UrlHeads
timestamp: number
}

Expand All @@ -21,7 +22,7 @@ export type NotifyRemoteHeadsPayload = {
targetId: PeerId
documentId: DocumentId
storageId: StorageId
heads: A.Heads
heads: UrlHeads
timestamp: number
}

Expand Down Expand Up @@ -216,7 +217,7 @@ export class RemoteHeadsSubscriptions extends EventEmitter<RemoteHeadsSubscripti
handleImmediateRemoteHeadsChanged(
documentId: DocumentId,
storageId: StorageId,
heads: A.Heads
heads: UrlHeads
) {
this.#log("handleLocalHeadsChanged", documentId, storageId, heads)
const remote = this.#knownHeads.get(documentId)
Expand Down Expand Up @@ -334,7 +335,7 @@ export class RemoteHeadsSubscriptions extends EventEmitter<RemoteHeadsSubscripti
#changedHeads(msg: RemoteHeadsChanged): {
documentId: DocumentId
storageId: StorageId
remoteHeads: A.Heads
remoteHeads: UrlHeads
timestamp: number
}[] {
const changedHeads = []
Expand Down Expand Up @@ -371,5 +372,5 @@ export class RemoteHeadsSubscriptions extends EventEmitter<RemoteHeadsSubscripti

type LastHeads = {
timestamp: number
heads: A.Heads
heads: UrlHeads
}
12 changes: 8 additions & 4 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { next as Automerge } from "@automerge/automerge/slim"
import debug from "debug"
import { EventEmitter } from "eventemitter3"
import {
encodeHeads,
generateAutomergeUrl,
getHeadsFromUrl,
interpretAsDocumentId,
isValidAutomergeUrl,
parseAutomergeUrl,
Expand Down Expand Up @@ -195,16 +195,20 @@ export class Repo extends EventEmitter<RepoEvents> {
const heads = handle.getRemoteHeads(storageId)
const haveHeadsChanged =
message.syncState.theirHeads &&
(!heads || !headsAreSame(heads, message.syncState.theirHeads))
(!heads ||
!headsAreSame(heads, encodeHeads(message.syncState.theirHeads)))

if (haveHeadsChanged && message.syncState.theirHeads) {
handle.setRemoteHeads(storageId, message.syncState.theirHeads)
handle.setRemoteHeads(
storageId,
encodeHeads(message.syncState.theirHeads)
)

if (storageId && this.#remoteHeadsGossipingEnabled) {
this.#remoteHeadsSubscriptions.handleImmediateRemoteHeadsChanged(
message.documentId,
storageId,
message.syncState.theirHeads
encodeHeads(message.syncState.theirHeads)
)
}
}
Expand Down
14 changes: 14 additions & 0 deletions packages/automerge-repo/src/helpers/bufferFromHex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export const uint8ArrayFromHexString = (hexString: string): Uint8Array => {
if (hexString.length % 2 !== 0) {
throw new Error("Hex string must have an even length")
}
const bytes = new Uint8Array(hexString.length / 2)
for (let i = 0; i < hexString.length; i += 2) {
bytes[i >> 1] = parseInt(hexString.slice(i, i + 2), 16)
}
return bytes
}

export const uint8ArrayToHexString = (data: Uint8Array): string => {
return Array.from(data, byte => byte.toString(16).padStart(2, "0")).join("")
}
Loading

0 comments on commit 2b37ea5

Please sign in to comment.