Skip to content

Commit

Permalink
Merge branch 'removeIsNew'
Browse files Browse the repository at this point in the history
  • Loading branch information
pvh committed Jul 23, 2024
2 parents b820796 + b6d70fc commit 2963205
Show file tree
Hide file tree
Showing 9 changed files with 6,749 additions and 5,374 deletions.
46 changes: 24 additions & 22 deletions packages/automerge-repo/src/DocHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
#machine

/** The last known state of our document. */
#prevDocState: T | undefined
#prevDocState: T = A.init<T>()

/** How long to wait before giving up on a document. (Note that a document will be marked
* unavailable much sooner if all known peers respond that they don't have it.) */
Expand All @@ -49,17 +49,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
this.#timeoutDelay = options.timeoutDelay
}

let doc: T
const isNew = "isNew" in options && options.isNew
if (isNew) {
// T should really be constrained to extend `Record<string, unknown>` (an automerge doc can't be
// e.g. a primitive, an array, etc. - it must be an object). But adding that constraint creates
// a bunch of other problems elsewhere so for now we'll just cast it here to make Automerge happy.
doc = A.from(options.initialValue as Record<string, unknown>) as T
doc = A.emptyChange<T>(doc)
} else {
doc = A.init<T>()
}
const doc = A.init<T>()

this.#log = debug(`automerge-repo:dochandle:${this.documentId.slice(0, 5)}`)

Expand All @@ -80,7 +70,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
}),
onDelete: assign(() => {
this.emit("delete", { handle: this })
return { doc: undefined }
return { doc: A.init() }
}),
onUnavailable: () => {
this.emit("unavailable", { handle: this })
Expand Down Expand Up @@ -114,7 +104,10 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
after: { [delay]: "unavailable" },
},
awaitingNetwork: {
on: { NETWORK_READY: "requesting" },
on: {
DOC_READY: "ready",
NETWORK_READY: "requesting"
},
},
requesting: {
on: {
Expand Down Expand Up @@ -146,7 +139,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {

// Start the machine, and send a create or find event to get things going
this.#machine.start()
this.#machine.send(isNew ? { type: CREATE } : { type: FIND })
this.#machine.send({ type: FIND })
}

// PRIVATE
Expand Down Expand Up @@ -178,13 +171,14 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
* Called after state transitions. If the document has changed, emits a change event. If we just
* received the document for the first time, signal that our request has been completed.
*/
#checkForChanges(before: T | undefined, after: T) {
const docChanged =
after && before && !headsAreSame(A.getHeads(after), A.getHeads(before))
#checkForChanges(before: A.Doc<T>, after: A.Doc<T>) {
const beforeHeads = A.getHeads(before)
const afterHeads = A.getHeads(after)
const docChanged = !headsAreSame(afterHeads, beforeHeads)
if (docChanged) {
this.emit("heads-changed", { handle: this, doc: after })

const patches = A.diff(after, A.getHeads(before), A.getHeads(after))
const patches = A.diff(after, beforeHeads, afterHeads)
if (patches.length > 0) {
this.emit("change", {
handle: this,
Expand Down Expand Up @@ -308,12 +302,22 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
/**
* `update` is called by the repo when we receive changes from the network
* Called by the repo when we receive changes from the network.
* Does not cause state changes.
* @hidden
*/
update(callback: (doc: A.Doc<T>) => A.Doc<T>) {
this.#machine.send({ type: UPDATE, payload: { callback } })
}

/**
* `doneLoading` is called by the repo after it decides it has all the changes
* it's going to get during setup. This might mean it was created locally,
* or that it was loaded from storage, or that it was received from a peer.
*/
doneLoading() {
this.#machine.send({ type: DOC_READY })
}

/**
* Called by the repo either when a doc handle changes or we receive new remote heads.
* @hidden
Expand Down Expand Up @@ -346,7 +350,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
change(callback: A.ChangeFn<T>, options: A.ChangeOptions<T> = {}) {
if (!this.isReady()) {
throw new Error(
`DocHandle#${this.documentId} is not ready. Check \`handle.isReady()\` before accessing the document.`
`DocHandle#${this.documentId} is in ${this.state} and not ready. Check \`handle.isReady()\` before accessing the document.`
)
}
this.#machine.send({
Expand Down Expand Up @@ -584,7 +588,6 @@ interface DocHandleContext<T> {

/** These are the (internal) events that can be sent to the state machine */
type DocHandleEvent<T> =
| { type: typeof CREATE }
| { type: typeof FIND }
| { type: typeof REQUEST }
| { type: typeof DOC_READY }
Expand All @@ -598,7 +601,6 @@ type DocHandleEvent<T> =
| { type: typeof AWAIT_NETWORK }
| { type: typeof NETWORK_READY }

const CREATE = "CREATE"
const FIND = "FIND"
const REQUEST = "REQUEST"
const DOC_READY = "DOC_READY"
Expand Down
81 changes: 49 additions & 32 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import { CollectionSynchronizer } from "./synchronizer/CollectionSynchronizer.js
import { SyncStatePayload } from "./synchronizer/Synchronizer.js"
import type { AnyDocumentId, DocumentId, PeerId } from "./types.js"

function randomPeerId() {
return "peer-" + Math.random().toString(36).slice(4) as PeerId
}

/** A Repo is a collection of documents with networking, syncing, and storage capabilities. */
/** The `Repo` is the main entry point of this library
*
Expand Down Expand Up @@ -61,7 +65,7 @@ export class Repo extends EventEmitter<RepoEvents> {
constructor({
storage,
network = [],
peerId,
peerId = randomPeerId(),
sharePolicy,
isEphemeral = storage === undefined,
enableRemoteHeadsGossiping = false,
Expand All @@ -75,7 +79,7 @@ export class Repo extends EventEmitter<RepoEvents> {

// The `document` event is fired by the DocCollection any time we create a new document or look
// up a document by ID. We listen for it in order to wire up storage and network synchronization.
this.on("document", async ({ handle, isNew }) => {
this.on("document", async ({ handle }) => {
if (storageSubsystem) {
// Save when the document changes, but no more often than saveDebounceRate.
const saveFn = ({
Expand All @@ -84,18 +88,9 @@ export class Repo extends EventEmitter<RepoEvents> {
}: DocHandleEncodedChangePayload<any>) => {
void storageSubsystem.saveDoc(handle.documentId, doc)
}
handle.on("heads-changed", throttle(saveFn, this.saveDebounceRate))

if (isNew) {
// this is a new document, immediately save it
await storageSubsystem.saveDoc(handle.documentId, handle.docSync()!)
} else {
// Try to load from disk
const loadedDoc = await storageSubsystem.loadDoc(handle.documentId)
if (loadedDoc) {
handle.update(() => loadedDoc)
}
}
handle.on("heads-changed",
throttle(saveFn, this.saveDebounceRate)
)
}

handle.on("unavailable", () => {
Expand All @@ -105,9 +100,7 @@ export class Repo extends EventEmitter<RepoEvents> {
})
})

if (this.networkSubsystem.isReady()) {
handle.request()
} else {
if (!this.networkSubsystem.isReady()) {
handle.awaitNetwork()
this.networkSubsystem
.whenReady()
Expand Down Expand Up @@ -323,21 +316,17 @@ export class Repo extends EventEmitter<RepoEvents> {

/** Returns an existing handle if we have it; creates one otherwise. */
#getHandle<T>({
documentId,
isNew,
initialValue,
documentId
}: {
/** The documentId of the handle to look up or create */
documentId: DocumentId /** If we know we're creating a new document, specify this so we can have access to it immediately */
isNew: boolean
initialValue?: T
}) {
// If we have the handle cached, return it
if (this.#handleCache[documentId]) return this.#handleCache[documentId]

// If not, create a new handle, cache it, and return it
if (!documentId) throw new Error(`Invalid documentId ${documentId}`)
const handle = new DocHandle<T>(documentId, { isNew, initialValue })
const handle = new DocHandle<T>(documentId)
this.#handleCache[documentId] = handle
return handle
}
Expand Down Expand Up @@ -365,11 +354,22 @@ export class Repo extends EventEmitter<RepoEvents> {
// Generate a new UUID and store it in the buffer
const { documentId } = parseAutomergeUrl(generateAutomergeUrl())
const handle = this.#getHandle<T>({
documentId,
isNew: true,
initialValue,
documentId
}) as DocHandle<T>
this.emit("document", { handle, isNew: true })

this.emit("document", { handle })

handle.update(() => {
let nextDoc: Automerge.Doc<T>
if (initialValue) {
nextDoc = Automerge.from(initialValue)
} else {
nextDoc = Automerge.emptyChange(Automerge.init())
}
return nextDoc
})

handle.doneLoading()
return handle
}

Expand Down Expand Up @@ -434,11 +434,28 @@ export class Repo extends EventEmitter<RepoEvents> {
return this.#handleCache[documentId]
}

// If we don't already have the handle, make an empty one and try loading it
const handle = this.#getHandle<T>({
documentId,
isNew: false,
}) as DocHandle<T>
this.emit("document", { handle, isNew: false })

// Try to load from disk
if (this.storageSubsystem) {
void this.storageSubsystem.loadDoc(handle.documentId).then(loadedDoc => {
if (loadedDoc) {
// uhhhh, sorry if you're reading this because we were lying to the type system
handle.update(() => loadedDoc as Automerge.Doc<T>)
handle.doneLoading()
} else {
handle.request()
}
})

} else {
handle.request()
}

this.emit("document", { handle })
return handle
}

Expand All @@ -448,7 +465,7 @@ export class Repo extends EventEmitter<RepoEvents> {
) {
const documentId = interpretAsDocumentId(id)

const handle = this.#getHandle({ documentId, isNew: false })
const handle = this.#getHandle({ documentId })
handle.delete()

delete this.#handleCache[documentId]
Expand All @@ -465,7 +482,7 @@ export class Repo extends EventEmitter<RepoEvents> {
async export(id: AnyDocumentId): Promise<Uint8Array | undefined> {
const documentId = interpretAsDocumentId(id)

const handle = this.#getHandle({ documentId, isNew: false })
const handle = this.#getHandle({ documentId })
const doc = await handle.doc()
if (!doc) return undefined
return Automerge.save(doc)
Expand Down Expand Up @@ -528,6 +545,7 @@ export class Repo extends EventEmitter<RepoEvents> {
return this.storageSubsystem!.saveDoc(handle.documentId, doc)
})
)
return
}
}

Expand Down Expand Up @@ -582,7 +600,6 @@ export interface RepoEvents {

export interface DocumentPayload {
handle: DocHandle<any>
isNew: boolean
}

export interface DeleteDocumentPayload {
Expand Down
2 changes: 1 addition & 1 deletion packages/automerge-repo/src/helpers/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*
*
* Example usage:
* const callback = debounce((ev) => { doSomethingExpensiveOrOccasional() }, 100)
* const callback = throttle((ev) => { doSomethingExpensiveOrOccasional() }, 100)
* target.addEventListener('frequent-event', callback);
*
*/
Expand Down
4 changes: 4 additions & 0 deletions packages/automerge-repo/src/network/NetworkSubsystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ export class NetworkSubsystem extends EventEmitter<NetworkSubsystemEvents> {
this.#log(`peer candidate: ${peerId} `)
// TODO: This is where authentication would happen

// TODO: on reconnection, this would create problems!
// the server would see a reconnection as a late-arriving channel
// for an existing peer and decide to ignore it until the connection
// times out: turns out my ICE/SIP emulation laziness did not pay off here
if (!this.#adaptersByPeer[peerId]) {
// TODO: handle losing a server here
this.#adaptersByPeer[peerId] = networkAdapter
Expand Down
Loading

0 comments on commit 2963205

Please sign in to comment.