Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove the awaiting_network states from dochandle #363

Merged
merged 3 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 6 additions & 38 deletions packages/automerge-repo/src/DocHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,16 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
states: {
idle: {
on: {
CREATE: "ready",
FIND: "loading",
BEGIN: "loading",
},
},
loading: {
on: {
REQUEST: "requesting",
DOC_READY: "ready",
AWAIT_NETWORK: "awaitingNetwork",
},
after: { [delay]: "unavailable" },
},
awaitingNetwork: {
on: {
DOC_READY: "ready",
NETWORK_READY: "requesting"
},
},
requesting: {
on: {
DOC_UNAVAILABLE: "unavailable",
Expand Down Expand Up @@ -139,7 +131,7 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {

// Start the machine, and send a create or find event to get things going
this.#machine.start()
this.#machine.send({ type: FIND })
this.#machine.send({ type: BEGIN })
}

// PRIVATE
Expand Down Expand Up @@ -429,17 +421,6 @@ export class DocHandle<T> extends EventEmitter<DocHandleEvents<T>> {
if (this.#state === "loading") this.#machine.send({ type: REQUEST })
}

/** @hidden */
awaitNetwork() {
if (this.#state === "loading") this.#machine.send({ type: AWAIT_NETWORK })
}

/** @hidden */
networkReady() {
if (this.#state === "awaitingNetwork")
this.#machine.send({ type: NETWORK_READY })
}

/** Called by the repo when the document is deleted. */
delete() {
this.#machine.send({ type: DELETE })
Expand Down Expand Up @@ -554,8 +535,6 @@ export const HandleState = {
IDLE: "idle",
/** We are waiting for storage to finish loading */
LOADING: "loading",
/** We are waiting for the network to be come ready */
AWAITING_NETWORK: "awaitingNetwork",
/** We are waiting for someone in the network to respond to a sync request */
REQUESTING: "requesting",
/** The document is available */
Expand All @@ -567,15 +546,8 @@ export const HandleState = {
} as const
export type HandleState = (typeof HandleState)[keyof typeof HandleState]

export const {
IDLE,
LOADING,
AWAITING_NETWORK,
REQUESTING,
READY,
DELETED,
UNAVAILABLE,
} = HandleState
export const { IDLE, LOADING, REQUESTING, READY, DELETED, UNAVAILABLE } =
HandleState

// context

Expand All @@ -588,7 +560,7 @@ interface DocHandleContext<T> {

/** These are the (internal) events that can be sent to the state machine */
type DocHandleEvent<T> =
| { type: typeof FIND }
| { type: typeof BEGIN }
| { type: typeof REQUEST }
| { type: typeof DOC_READY }
| {
Expand All @@ -598,14 +570,10 @@ type DocHandleEvent<T> =
| { type: typeof TIMEOUT }
| { type: typeof DELETE }
| { type: typeof DOC_UNAVAILABLE }
| { type: typeof AWAIT_NETWORK }
| { type: typeof NETWORK_READY }

const FIND = "FIND"
const BEGIN = "BEGIN"
const REQUEST = "REQUEST"
const DOC_READY = "DOC_READY"
const AWAIT_NETWORK = "AWAIT_NETWORK"
const NETWORK_READY = "NETWORK_READY"
const UPDATE = "UPDATE"
const DELETE = "DELETE"
const TIMEOUT = "TIMEOUT"
Expand Down
42 changes: 17 additions & 25 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { SyncStatePayload } from "./synchronizer/Synchronizer.js"
import type { AnyDocumentId, DocumentId, PeerId } from "./types.js"

function randomPeerId() {
return "peer-" + Math.random().toString(36).slice(4) as PeerId
return ("peer-" + Math.random().toString(36).slice(4)) as PeerId
}

/** A Repo is a collection of documents with networking, syncing, and storage capabilities. */
Expand Down Expand Up @@ -88,9 +88,7 @@ export class Repo extends EventEmitter<RepoEvents> {
}: DocHandleEncodedChangePayload<any>) => {
void storageSubsystem.saveDoc(handle.documentId, doc)
}
handle.on("heads-changed",
throttle(saveFn, this.saveDebounceRate)
)
handle.on("heads-changed", throttle(saveFn, this.saveDebounceRate))
}

handle.on("unavailable", () => {
Expand All @@ -100,18 +98,6 @@ export class Repo extends EventEmitter<RepoEvents> {
})
})

if (!this.networkSubsystem.isReady()) {
handle.awaitNetwork()
this.networkSubsystem
.whenReady()
.then(() => {
handle.networkReady()
})
.catch(err => {
this.#log("error waiting for network", { err })
})
}

// Register the document with the synchronizer. This advertises our interest in the document.
this.#synchronizer.addDocument(handle.documentId)
})
Expand Down Expand Up @@ -316,7 +302,7 @@ export class Repo extends EventEmitter<RepoEvents> {

/** Returns an existing handle if we have it; creates one otherwise. */
#getHandle<T>({
documentId
documentId,
}: {
/** The documentId of the handle to look up or create */
documentId: DocumentId /** If we know we're creating a new document, specify this so we can have access to it immediately */
Expand Down Expand Up @@ -354,7 +340,7 @@ export class Repo extends EventEmitter<RepoEvents> {
// Generate a new UUID and store it in the buffer
const { documentId } = parseAutomergeUrl(generateAutomergeUrl())
const handle = this.#getHandle<T>({
documentId
documentId,
}) as DocHandle<T>

this.emit("document", { handle })
Expand All @@ -365,10 +351,10 @@ export class Repo extends EventEmitter<RepoEvents> {
nextDoc = Automerge.from(initialValue)
} else {
nextDoc = Automerge.emptyChange(Automerge.init())
}
}
return nextDoc
})

handle.doneLoading()
return handle
}
Expand Down Expand Up @@ -438,7 +424,7 @@ export class Repo extends EventEmitter<RepoEvents> {
const handle = this.#getHandle<T>({
documentId,
}) as DocHandle<T>

// Try to load from disk
if (this.storageSubsystem) {
void this.storageSubsystem.loadDoc(handle.documentId).then(loadedDoc => {
Expand All @@ -447,15 +433,21 @@ export class Repo extends EventEmitter<RepoEvents> {
handle.update(() => loadedDoc as Automerge.Doc<T>)
handle.doneLoading()
} else {
handle.request()
this.networkSubsystem
.whenReady()
.then(() => {
handle.request()
})
.catch(err => {
this.#log("error waiting for network", { err })
})
this.emit("document", { handle })
}
})

} else {
handle.request()
this.emit("document", { handle })
}

this.emit("document", { handle })
return handle
}

Expand Down
5 changes: 4 additions & 1 deletion packages/automerge-repo/src/network/NetworkSubsystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ export class NetworkSubsystem extends EventEmitter<NetworkSubsystemEvents> {
}

isReady = () => {
return this.#readyAdapterCount === this.#adapters.length
return (
this.#adapters.length === 0 ||
this.#readyAdapterCount === this.#adapters.length
)
}

whenReady = async () => {
Expand Down
12 changes: 6 additions & 6 deletions packages/automerge-repo/test/DocHandle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe("DocHandle", () => {
const TEST_ID = parseAutomergeUrl(generateAutomergeUrl()).documentId
const setup = (options?) => {
const handle = new DocHandle<TestDoc>(TEST_ID, options)
handle.update( () => A.init() )
handle.update(() => A.init())
handle.doneLoading()
return handle
}
Expand Down Expand Up @@ -58,7 +58,7 @@ describe("DocHandle", () => {

it("should return the heads when requested", async () => {
const handle = setup()
handle.change( d => d.foo = "bar")
handle.change(d => (d.foo = "bar"))
assert.equal(handle.isReady(), true)

const heads = A.getHeads(handle.docSync())
Expand Down Expand Up @@ -143,7 +143,7 @@ describe("DocHandle", () => {

it("should emit a change message when changes happen", async () => {
const handle = setup()

const p = new Promise<DocHandleChangePayload<TestDoc>>(resolve =>
handle.once("change", d => resolve(d))
)
Expand Down Expand Up @@ -174,7 +174,7 @@ describe("DocHandle", () => {

it("should update the internal doc prior to emitting the change message", async () => {
const handle = setup()

const p = new Promise<void>(resolve =>
handle.once("change", ({ handle, doc }) => {
assert.equal(handle.docSync()?.foo, doc.foo)
Expand All @@ -192,7 +192,7 @@ describe("DocHandle", () => {

it("should emit distinct change messages when consecutive changes happen", async () => {
const handle = setup()

let calls = 0
const p = new Promise(resolve =>
handle.on("change", async ({ doc: d }) => {
Expand Down Expand Up @@ -284,7 +284,7 @@ describe("DocHandle", () => {

it("should not time out if the document is updated in time", async () => {
// set docHandle time out after 5 ms
const handle = setup({timeoutDelay:1})
const handle = setup({ timeoutDelay: 1 })

// simulate requesting from the network
handle.request()
Expand Down
4 changes: 2 additions & 2 deletions packages/automerge-repo/test/DocSynchronizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ describe("DocSynchronizer", () => {
const docId = parseAutomergeUrl(generateAutomergeUrl()).documentId
handle = new DocHandle<TestDoc>(docId)
handle.doneLoading()

docSynchronizer = new DocSynchronizer({
handle: handle as DocHandle<unknown>,
})

return { handle, docSynchronizer }
}

Expand Down
18 changes: 12 additions & 6 deletions packages/automerge-repo/test/Repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,11 @@ describe("Repo", () => {
it("doesn't find a document that doesn't exist", async () => {
const { repo } = setup()
const handle = repo.find<TestDoc>(generateAutomergeUrl())
assert.equal(handle.isReady(), false)

await handle.whenReady(["ready", "unavailable"])

assert.equal(handle.isReady(), false)
assert.equal(handle.state, "unavailable")
const doc = await handle.doc()
assert.equal(doc, undefined)
})
Expand All @@ -221,6 +224,7 @@ describe("Repo", () => {
handle.on("unavailable", () => {
wasUnavailable = true
})

await pause(50)
assert.equal(wasUnavailable, false)

Expand Down Expand Up @@ -400,6 +404,8 @@ describe("Repo", () => {
d.count = 1
})

await repo.flush()

for (let i = 0; i < 3; i++) {
const repo2 = new Repo({
storage,
Expand Down Expand Up @@ -484,8 +490,8 @@ describe("Repo", () => {
let resume = (documentIds?: DocumentId[]) => {
const savesToUnblock = documentIds
? Array.from(blockedSaves).filter(({ path }) =>
documentIds.some(documentId => path.includes(documentId))
)
documentIds.some(documentId => path.includes(documentId))
)
: Array.from(blockedSaves)
savesToUnblock.forEach(({ resolve }) => resolve())
}
Expand Down Expand Up @@ -1023,9 +1029,9 @@ describe("Repo", () => {
const doc =
Math.random() < 0.5
? // heads, create a new doc
repo.create<TestDoc>()
repo.create<TestDoc>()
: // tails, pick a random doc
(getRandomItem(docs) as DocHandle<TestDoc>)
(getRandomItem(docs) as DocHandle<TestDoc>)

// make sure the doc is ready
if (!doc.isReady()) {
Expand Down Expand Up @@ -1408,7 +1414,7 @@ describe("Repo", () => {
})

const warn = console.warn
const NO_OP = () => { }
const NO_OP = () => {}

const disableConsoleWarn = () => {
console.warn = NO_OP
Expand Down
Loading