Skip to content

Commit

Permalink
the patch is in a messy state but tests are passing
Browse files Browse the repository at this point in the history
  • Loading branch information
pvh committed Jan 8, 2025
1 parent d568155 commit 5015c44
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 22 deletions.
51 changes: 32 additions & 19 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import type {
PeerId,
} from "./types.js"
import { abortable, AbortOptions } from "./helpers/abortable.js"
import { FindProgress } from "./FindProgress.js"
import { skip } from "node:test"
import { FindProgress, FindProgressWithMethods } from "./FindProgress.js"
import { pause } from "./helpers/pause.js"

function randomPeerId() {
return ("peer-" + Math.random().toString(36).slice(4)) as PeerId
Expand Down Expand Up @@ -418,7 +418,7 @@ export class Repo extends EventEmitter<RepoEvents> {
findWithProgress<T>(
id: AnyDocumentId,
options: AbortOptions = {}
): FindProgress<T> {
): FindProgressWithMethods<T> | FindProgress<T> {
const { signal } = options
const abortPromise = abortable(signal)
const documentId = interpretAsDocumentId(id)
Expand All @@ -427,15 +427,18 @@ export class Repo extends EventEmitter<RepoEvents> {
if (this.#handleCache[documentId]) {
const handle = this.#handleCache[documentId]
if (handle.state === UNAVAILABLE) {
return {
state: "failed",
const result = {
state: "unavailable" as const,
error: new Error(`Document ${id} is unavailable`),
handle,
}
return result
}
if (handle.state === DELETED) {
return {
state: "failed",
error: new Error(`Document ${id} was deleted`),
handle,
}
}
if (handle.state === READY) {
Expand All @@ -446,6 +449,8 @@ export class Repo extends EventEmitter<RepoEvents> {
}
}

// the generator takes over `this`, so we need an alias to the repo this
// eslint-disable-next-line @typescript-eslint/no-this-alias
const that = this
async function* progressGenerator(): AsyncGenerator<FindProgress<T>> {
try {
Expand All @@ -469,13 +474,14 @@ export class Repo extends EventEmitter<RepoEvents> {
}

that.#registerHandleWithSubsystems(handle)

await Promise.race([
handle.whenReady([READY, UNAVAILABLE, DELETED]),
handle.whenReady([READY, UNAVAILABLE]),
abortPromise,
])

if (handle.state === UNAVAILABLE) {
throw new Error(`Document ${id} is unavailable`)
yield { state: "unavailable", handle }
}
if (handle.state === DELETED) {
throw new Error(`Document ${id} was deleted`)
Expand All @@ -486,6 +492,7 @@ export class Repo extends EventEmitter<RepoEvents> {
yield {
state: "failed",
error: error instanceof Error ? error : new Error(String(error)),
handle,
}
}
}
Expand All @@ -497,11 +504,14 @@ export class Repo extends EventEmitter<RepoEvents> {
return { ...result.value, next }
}

const untilReady = async (skipReady: boolean) => {
const untilReady = async (allowableStates: string[]) => {
for await (const state of iterator) {
if (skipReady && state.handle.state === "requesting") {
if (allowableStates.includes(state.handle.state)) {
return state.handle
}
if (state.state === "unavailable") {
throw new Error(`Document ${id} is unavailable`)
}
if (state.state === "ready") return state.handle
if (state.state === "failed") throw state.error
}
Expand All @@ -517,18 +527,21 @@ export class Repo extends EventEmitter<RepoEvents> {
id: AnyDocumentId,
options: RepoFindOptions & AbortOptions = {}
): Promise<DocHandle<T>> {
const { skipReady, signal } = options
const { allowableStates = ["ready"], signal } = options
const progress = this.findWithProgress<T>(id, { signal })

if (progress.state === "ready") {
/*if (allowableStates.includes(progress.state)) {
console.log("returning early")
return progress.handle
}
}*/

if (progress.state === "failed") {
throw progress.error
// @ts-expect-error -- my initial result is a FindProgressWithMethods which has untilReady
if (progress.untilReady) {
this.#registerHandleWithSubsystems(progress.handle)
return progress.untilReady(allowableStates)
} else {
return progress.handle
}

return progress.untilReady(skipReady)
}

/**
Expand Down Expand Up @@ -573,12 +586,12 @@ export class Repo extends EventEmitter<RepoEvents> {
options: RepoFindOptions & AbortOptions = {}
): Promise<DocHandle<T>> {
const documentId = interpretAsDocumentId(id)
const { skipReady, signal } = options
const { allowableStates, signal } = options

return Promise.race([
(async () => {
const handle = await this.#loadDocument<T>(documentId)
if (!skipReady) {
if (!allowableStates) {
await handle.whenReady([READY, UNAVAILABLE])
if (handle.state === UNAVAILABLE && !signal?.aborted) {
throw new Error(`Document ${id} is unavailable`)
Expand Down Expand Up @@ -781,7 +794,7 @@ export interface RepoEvents {
}

export interface RepoFindOptions {
skipReady?: boolean
allowableStates?: string[]
}

export interface DocumentPayload {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ export class CollectionSynchronizer extends Synchronizer {

this.#docSetUp[documentId] = true

const handle = await this.repo.find(documentId, { skipReady: true })
const handle = await this.repo.find(documentId, {
allowableStates: ["ready", "unavailable", "requesting"],
})
const docSynchronizer = this.#fetchDocSynchronizer(handle)

docSynchronizer.receiveMessage(message)
Expand Down
4 changes: 2 additions & 2 deletions packages/automerge-repo/test/Repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1471,13 +1471,13 @@ describe("Repo.find() abort behavior", () => {
await expect(findPromise).rejects.not.toThrow("unavailable")
})

it("returns handle immediately when skipReady is true, even with abort signal", async () => {
it("returns handle immediately when allow unavailable is true, even with abort signal", async () => {
const repo = new Repo()
const controller = new AbortController()
const url = generateAutomergeUrl()

const handle = await repo.find(url, {
skipReady: true,
allowableStates: ["unavailable"],
signal: controller.signal,
})

Expand Down

0 comments on commit 5015c44

Please sign in to comment.