diff --git a/packages/capabilities/package.json b/packages/capabilities/package.json index 495b71146..401094816 100644 --- a/packages/capabilities/package.json +++ b/packages/capabilities/package.json @@ -88,7 +88,8 @@ "@ucanto/principal": "^9.0.0", "@ucanto/transport": "^9.1.0", "@ucanto/validator": "^9.0.1", - "@web3-storage/data-segment": "^3.2.0" + "@web3-storage/data-segment": "^3.2.0", + "uint8arrays": "^5.0.3" }, "devDependencies": { "@web3-storage/eslint-config-w3up": "workspace:^", diff --git a/packages/capabilities/src/blob.js b/packages/capabilities/src/blob.js new file mode 100644 index 000000000..cbe71aa88 --- /dev/null +++ b/packages/capabilities/src/blob.js @@ -0,0 +1,212 @@ +/** + * Blob Capabilities. + * + * Blob is a fixed size byte array addressed by the multihash. + * Usually blobs are used to represent set of IPLD blocks at different byte ranges. + * + * These can be imported directly with: + * ```js + * import * as Blob from '@web3-storage/capabilities/blob' + * ``` + * + * @module + */ +import { capability, Link, Schema, ok, fail } from '@ucanto/validator' +import { equal, equalBlob, equalContent, equalWith, checkLink, SpaceDID, and } from './utils.js' + +/** + * Agent capabilities for Blob protocol + */ + +/** + * Capability can only be delegated (but not invoked) allowing audience to + * derived any `blob/` prefixed capability for the (memory) space identified + * by DID in the `with` field. + */ +export const blob = capability({ + can: 'blob/*', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + derives: equalWith, +}) + +/** + * Blob description for being ingested by the service. + */ +export const blobStruct = Schema.struct({ + /** + * A multihash digest of the blob payload bytes, uniquely identifying blob. + */ + content: Schema.bytes(), + /** + * Size of the Blob file to be stored. Service will provision write target + * for this exact size. Attempt to write a larger Blob file will fail. + */ + size: Schema.integer(), +}) + +/** + * `blob/add` capability allows agent to store a Blob into a (memory) space + * identified by did:key in the `with` field. Agent must precompute Blob locally + * and provide it's multihash and size using `nb.content` and `nb.size` fields, allowing + * a service to provision a write location for the agent to PUT or POST desired + * Blob into. + */ +export const add = capability({ + can: 'blob/add', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * Blob to allocate on the space. + */ + blob: blobStruct + }), + derives: equalBlob, +}) + +/** + * `blob/remove` capability can be used to remove the stored Blob from the (memory) + * space identified by `with` field. + */ +export const remove = capability({ + can: 'blob/remove', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * A multihash digest of the blob payload bytes, uniquely identifying blob. + */ + content: Schema.bytes(), + }), + derives: equalContent, +}) + +/** + * `blob/list` capability can be invoked to request a list of stored Blobs in the + * (memory) space identified by `with` field. + */ +export const list = capability({ + can: 'blob/list', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * A pointer that can be moved back and forth on the list. + * It can be used to paginate a list for instance. + */ + cursor: Schema.string().optional(), + /** + * Maximum number of items per page. + */ + size: Schema.integer().optional(), + /** + * If true, return page of results preceding cursor. Defaults to false. + */ + pre: Schema.boolean().optional(), + }), + derives: (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } + return ok({}) + }, +}) + +/** + * Service capabilities for Blob protocol + */ + +// TODO: should we preffix these with some tmp service namespace that eases delegation of blob? +// OR +// export const blob = add.or(remove).or(list) + +/** + * `blob/allocate` capability can be invoked to create a memory + * address where blob content can be written via HTTP PUT request. + */ +export const allocate = capability({ + can: 'blob/allocate', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * Blob to allocate on the space. + */ + blob: blobStruct, + /** + * The Link for an Add Blob task, that caused an allocation + */ + cause: Link, + /** + * DID of the user space where allocation takes place + */ + space: SpaceDID + }), + derives: (claim, from) => { + return ( + and(equalWith(claim, from)) || + and(equalBlob(claim, from)) || + and(checkLink(claim.nb.cause, from.nb.cause, 'cause')) || + and(equal(claim.nb.space, from.nb.space, 'space')) || + ok({}) + ) + }, +}) + +/** + * `blob/accept` capability invocation should either succeed when content is + * delivered on allocated address or fail if no content is allocation expires + * without content being delivered. + */ +export const accept = capability({ + can: 'blob/accept', + /** + * DID of the (memory) space where Blob is intended to + * be stored. + */ + with: SpaceDID, + nb: Schema.struct({ + /** + * Blob to accept. + */ + blob: blobStruct, + /** + * Expiration.. + */ + exp: Schema.integer(), + }), + derives: (claim, from) => { + const result = equalBlob(claim, from) + if (result.error) { + return result + } else if (claim.nb.exp !== undefined && from.nb.exp !== undefined) { + return claim.nb.exp > from.nb.exp + ? fail(`exp constraint violation: ${claim.nb.exp} > ${from.nb.exp}`) + : ok({}) + } else { + return ok({}) + } + }, +}) + +// ⚠️ We export imports here so they are not omitted in generated typedes +// @see https://github.com/microsoft/TypeScript/issues/51548 +export { Schema, Link } diff --git a/packages/capabilities/src/index.js b/packages/capabilities/src/index.js index d80fbff46..4aaaa8ebd 100644 --- a/packages/capabilities/src/index.js +++ b/packages/capabilities/src/index.js @@ -19,6 +19,7 @@ import * as DealTracker from './filecoin/deal-tracker.js' import * as UCAN from './ucan.js' import * as Plan from './plan.js' import * as Usage from './usage.js' +import * as Blob from './blob.js' export { Access, @@ -86,4 +87,10 @@ export const abilitiesAsStrings = [ Plan.get.can, Usage.usage.can, Usage.report.can, + Blob.blob.can, + Blob.add.can, + Blob.remove.can, + Blob.list.can, + Blob.allocate.can, + Blob.accept.can, ] diff --git a/packages/capabilities/src/types.ts b/packages/capabilities/src/types.ts index 9848a42ca..36c65759e 100644 --- a/packages/capabilities/src/types.ts +++ b/packages/capabilities/src/types.ts @@ -21,6 +21,7 @@ import { import { space, info } from './space.js' import * as provider from './provider.js' import { top } from './top.js' +import * as BlobCaps from './blob.js' import * as StoreCaps from './store.js' import * as UploadCaps from './upload.js' import * as AccessCaps from './access.js' @@ -439,6 +440,72 @@ export interface UploadNotFound extends Ucanto.Failure { export type UploadGetFailure = UploadNotFound | Ucanto.Failure +// Blob +export type Blob = InferInvokedCapability +export type BlobAdd = InferInvokedCapability +export type BlobRemove = InferInvokedCapability +export type BlobList = InferInvokedCapability +export type BlobAllocate = InferInvokedCapability +export type BlobAccept = InferInvokedCapability + +export type BlobMultihash = Uint8Array + +// Blob add +export interface BlobAddSuccess { + claim: { + 'await/ok': Link + } +} +export type BlobAddFailure = Ucanto.Failure + +// Blob remove +export interface BlobRemoveSuccess { + size: number +} + +export interface BlobItemNotFound extends Ucanto.Failure { + name: 'BlobItemNotFound' +} + +export type BlobRemoveFailure = BlobItemNotFound | Ucanto.Failure + +// Blob list +export interface BlobListSuccess extends ListResponse {} +export interface BlobListItem { + blob: { content: Uint8Array, size: Number } + insertedAt: ISO8601Date +} + +export type BlobListFailure = Ucanto.Failure + +// Blob allocate +export interface BlobAllocateSuccess { + size: Number + address?: BlobAddress +} + +export interface BlobAddress { + url: ToString + headers: Record +} + +export interface BlobItemNotFound extends Ucanto.Failure { + name: 'BlobItemNotFound' +} + +export interface BlobNotAllocableToSpace extends Ucanto.Failure { + name: 'BlobNotAllocableToSpace' +} + +export type BlobAllocateFailure = BlobItemNotFound | BlobNotAllocableToSpace | Ucanto.Failure + +// Blob accept +export interface BlobAcceptSuccess { + claim: Link +} + +export type BlobAcceptFailure = BlobItemNotFound | Ucanto.Failure + // Store export type Store = InferInvokedCapability export type StoreAdd = InferInvokedCapability @@ -708,7 +775,13 @@ export type ServiceAbilityArray = [ AdminStoreInspect['can'], PlanGet['can'], Usage['can'], - UsageReport['can'] + UsageReport['can'], + Blob['can'], + BlobAdd['can'], + BlobRemove['can'], + BlobList['can'], + BlobAllocate['can'], + BlobAccept['can'], ] /** diff --git a/packages/capabilities/src/utils.js b/packages/capabilities/src/utils.js index ac1e7e317..37e9dca5a 100644 --- a/packages/capabilities/src/utils.js +++ b/packages/capabilities/src/utils.js @@ -2,6 +2,8 @@ import { DID, fail, ok } from '@ucanto/validator' // eslint-disable-next-line no-unused-vars import * as Types from '@ucanto/interface' +import { equals } from 'uint8arrays/equals' + // e.g. did:web:web3.storage or did:web:staging.web3.storage export const ProviderDID = DID.match({ method: 'web' }) @@ -85,6 +87,60 @@ export const equalLink = (claimed, delegated) => { } } +/** + * @template {Types.ParsedCapability<"blob/add"|"blob/remove"|"blob/allocate"|"blob/accept", Types.URI<'did:'>, {blob: { content: Uint8Array, size: number }}>} T + * @param {T} claimed + * @param {T} delegated + * @returns {Types.Result<{}, Types.Failure>} + */ +export const equalBlob = (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } else if ( + delegated.nb.blob.content && + !equals(delegated.nb.blob.content, claimed.nb.blob.content) + ) { + return fail( + `Link ${claimed.nb.blob.content ? `${claimed.nb.blob.content}` : ''} violates imposed ${ + delegated.nb.blob.content + } constraint.` + ) + } else if (claimed.nb.blob.size !== undefined && delegated.nb.blob.size !== undefined) { + return claimed.nb.blob.size > delegated.nb.blob.size + ? fail(`Size constraint violation: ${claimed.nb.blob.size} > ${delegated.nb.blob.size}`) + : ok({}) + } else { + return ok({}) + } +} + +/** + * @template {Types.ParsedCapability<"blob/add"|"blob/remove"|"blob/allocate"|"blob/accept", Types.URI<'did:'>, {content: Uint8Array}>} T + * @param {T} claimed + * @param {T} delegated + * @returns {Types.Result<{}, Types.Failure>} + */ +export const equalContent = (claimed, delegated) => { + if (claimed.with !== delegated.with) { + return fail( + `Expected 'with: "${delegated.with}"' instead got '${claimed.with}'` + ) + } else if ( + delegated.nb.content && + !equals(delegated.nb.content, claimed.nb.content) + ) { + return fail( + `Link ${claimed.nb.content ? `${claimed.nb.content}` : ''} violates imposed ${ + delegated.nb.content + } constraint.` + ) + } else { + return ok({}) + } +} + /** * Checks that `claimed` {@link Types.Link} meets an `imposed` constraint. * diff --git a/packages/upload-api/package.json b/packages/upload-api/package.json index f7cb1db1a..6ce509814 100644 --- a/packages/upload-api/package.json +++ b/packages/upload-api/package.json @@ -182,6 +182,7 @@ "@web3-storage/did-mailto": "workspace:^", "@web3-storage/filecoin-api": "workspace:^", "multiformats": "^12.1.2", + "uint8arrays": "^5.0.3", "p-retry": "^5.1.2" }, "devDependencies": { diff --git a/packages/upload-api/src/blob.js b/packages/upload-api/src/blob.js new file mode 100644 index 000000000..c9506da20 --- /dev/null +++ b/packages/upload-api/src/blob.js @@ -0,0 +1,19 @@ +import { blobAddProvider } from './blob/add.js' +import { blobAllocateProvider } from './blob/allocate.js' +import { blobAcceptProvider } from './blob/accept.js' +import { blobListProvider } from './blob/list.js' +import { blobRemoveProvider } from './blob/remove.js' +import * as API from './types.js' + +/** + * @param {API.BlobServiceContext} context + */ +export function createService(context) { + return { + add: blobAddProvider(context), + allocate: blobAllocateProvider(context), + accept: blobAcceptProvider(context), + list: blobListProvider(context), + remove: blobRemoveProvider(context), + } +} diff --git a/packages/upload-api/src/blob/accept.js b/packages/upload-api/src/blob/accept.js new file mode 100644 index 000000000..008f491d9 --- /dev/null +++ b/packages/upload-api/src/blob/accept.js @@ -0,0 +1,19 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' +import { BlobItemNotFound } from './lib.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAcceptProvider(context) { + return Server.provide(Blob.accept, async ({ capability }) => { + const space = /** @type {import('@ucanto/interface').DIDKey} */ ( + Server.DID.parse(capability.with).did() + ) + return { + error: new BlobItemNotFound(space) + } + }) +} diff --git a/packages/upload-api/src/blob/add.js b/packages/upload-api/src/blob/add.js new file mode 100644 index 000000000..540163f03 --- /dev/null +++ b/packages/upload-api/src/blob/add.js @@ -0,0 +1,71 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAddProvider(context) { + return Server.provideAdvanced({ + capability: Blob.add, + handler: async ({ capability, invocation }) => { + const { id, allocationStorage, maxUploadSize } = context + const { blob } = capability.nb + const space = /** @type {import('@ucanto/interface').DIDKey} */ ( + Server.DID.parse(capability.with).did() + ) + + if (blob.size > maxUploadSize) { + return { + error: new Server.Failure( + `Maximum size exceeded: ${maxUploadSize}, split DAG into smaller shards.` + ), + } + } + + // Create effects for receipt + const [allocatefx, acceptfx] = await Promise.all([ + Blob.allocate + .invoke({ + issuer: id, + audience: id, + with: id.toDIDKey(), + nb: { + blob, + cause: invocation.link(), + space + }, + expiration: Infinity, + }) + .delegate(), + Blob.accept + .invoke({ + issuer: id, + audience: id, + with: id.toDIDKey(), + nb: { + blob, + exp: Number.POSITIVE_INFINITY, + }, + expiration: Infinity, + }) + .delegate(), + ]) + + // Queue for allocation if not allocated + const allocated = await allocationStorage.exists(space, blob.content) + if (!allocated.ok) { + // TODO + } + + /** @type {API.OkBuilder} */ + const result = Server.ok({ + claim: { + 'await/ok': acceptfx.link() + } + }) + return result.fork(allocatefx.link()).join(acceptfx.link()) + } + }) +} diff --git a/packages/upload-api/src/blob/allocate.js b/packages/upload-api/src/blob/allocate.js new file mode 100644 index 000000000..e66ebc497 --- /dev/null +++ b/packages/upload-api/src/blob/allocate.js @@ -0,0 +1,60 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' +import { BlobItemNotFound } from './lib.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobAllocateProvider(context) { + return Server.provide(Blob.allocate, async ({ capability, invocation }) => { + const { blob, cause, space } = capability.nb + + // TODO: Read original invocation? + + // If blob is stored, we can just allocate it to the space + const hasBlob = await context.blobStorage.has(blob.content) + if (hasBlob.error) { + return { + error: new BlobItemNotFound(space) + } + } + // Get presigned URL for the write target + const createUploadUrl = await context.blobStorage.createUploadUrl(blob.content, blob.size) + if (createUploadUrl.error) { + return { + error: new Server.Failure('failed to provide presigned url') + } + } + + // Allocate in space, ignoring if already allocated + const allocationInsert = await context.allocationStorage.insert({ + space, + blob, + invocation: cause, + // TODO: add write target + }) + if (allocationInsert.error) { + return { + error: new Server.Failure('failed to allocate blob bytes') + } + } + + if (hasBlob) { + return { + ok: { size: blob.size } + } + } + + return { + ok: { + size: blob.size, + address: { + url: createUploadUrl.ok.url.toString(), + headers: createUploadUrl.ok.headers + } + } + } + }) +} diff --git a/packages/upload-api/src/blob/lib.js b/packages/upload-api/src/blob/lib.js new file mode 100644 index 000000000..bc16a242f --- /dev/null +++ b/packages/upload-api/src/blob/lib.js @@ -0,0 +1,26 @@ +import { Failure } from '@ucanto/server' + +export class BlobItemNotFound extends Failure { + /** + * @param {import('@ucanto/interface').DID} space + */ + constructor(space) { + super() + this.space = space + } + + get name() { + return 'BlobItemNotFound' + } + + describe() { + return `Blob not found in ${this.space}` + } + + toJSON() { + return { + ...super.toJSON(), + space: this.space, + } + } +} diff --git a/packages/upload-api/src/blob/list.js b/packages/upload-api/src/blob/list.js new file mode 100644 index 000000000..6694804fd --- /dev/null +++ b/packages/upload-api/src/blob/list.js @@ -0,0 +1,15 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobListProvider(context) { + return Server.provide(Blob.list, async ({ capability }) => { + const { cursor, size, pre } = capability.nb + const space = Server.DID.parse(capability.with).did() + return await context.allocationStorage.list(space, { size, cursor, pre }) + }) +} diff --git a/packages/upload-api/src/blob/remove.js b/packages/upload-api/src/blob/remove.js new file mode 100644 index 000000000..546f4935f --- /dev/null +++ b/packages/upload-api/src/blob/remove.js @@ -0,0 +1,22 @@ +import * as Server from '@ucanto/server' +import * as Blob from '@web3-storage/capabilities/blob' +import * as API from '../types.js' +import { BlobItemNotFound } from './lib.js' + +/** + * @param {API.BlobServiceContext} context + * @returns {API.ServiceMethod} + */ +export function blobRemoveProvider(context) { + return Server.provide(Blob.remove, async ({ capability }) => { + const { content } = capability.nb + const space = Server.DID.parse(capability.with).did() + + const res = await context.allocationStorage.remove(space, content) + if (res.error && res.error.name === 'RecordNotFound') { + return Server.error(new BlobItemNotFound(space)) + } + + return res + }) +} diff --git a/packages/upload-api/src/lib.js b/packages/upload-api/src/lib.js index 62c5e817f..a74daf637 100644 --- a/packages/upload-api/src/lib.js +++ b/packages/upload-api/src/lib.js @@ -4,6 +4,7 @@ import * as Types from './types.js' import * as Legacy from '@ucanto/transport/legacy' import * as CAR from '@ucanto/transport/car' import { create as createRevocationChecker } from './utils/revocation.js' +import { createService as createBlobService } from './blob.js' import { createService as createStoreService } from './store.js' import { createService as createUploadService } from './upload.js' import { createService as createConsoleService } from './console.js' @@ -43,6 +44,7 @@ export const createServer = ({ id, codec = Legacy.inbound, ...context }) => */ export const createService = (context) => ({ access: createAccessService(context), + blob: createBlobService(context), console: createConsoleService(context), consumer: createConsumerService(context), customer: createCustomerService(context), diff --git a/packages/upload-api/src/types.ts b/packages/upload-api/src/types.ts index 789be64e1..8e643bfa9 100644 --- a/packages/upload-api/src/types.ts +++ b/packages/upload-api/src/types.ts @@ -54,6 +54,23 @@ export interface DebugEmail extends Email { } import { + BlobMultihash, + BlobAdd, + BlobAddSuccess, + BlobAddFailure, + BlobRemove, + BlobRemoveSuccess, + BlobRemoveFailure, + BlobList, + BlobListItem, + BlobListSuccess, + BlobListFailure, + BlobAllocate, + BlobAllocateSuccess, + BlobAllocateFailure, + BlobAccept, + BlobAcceptSuccess, + BlobAcceptFailure, StoreAdd, StoreGet, StoreAddSuccess, @@ -163,6 +180,13 @@ import { UsageStorage } from './types/usage.js' export type { UsageStorage } export interface Service extends StorefrontService { + blob: { + add: ServiceMethod + remove: ServiceMethod + list: ServiceMethod + allocate: ServiceMethod + accept: ServiceMethod + } store: { add: ServiceMethod get: ServiceMethod @@ -273,9 +297,18 @@ export interface Service extends StorefrontService { } } -export type StoreServiceContext = SpaceServiceContext & { +export type BlobServiceContext = { + /** + * Service signer + */ + id: Signer maxUploadSize: number + allocationStorage: AllocationStorage + blobStorage: BlobStorage +} +export type StoreServiceContext = SpaceServiceContext & { + maxUploadSize: number storeTable: StoreTable carStoreBucket: CarStoreBucket } @@ -362,6 +395,7 @@ export interface ServiceContext ProviderServiceContext, SpaceServiceContext, StoreServiceContext, + BlobServiceContext, SubscriptionServiceContext, RateLimitServiceContext, RevocationServiceContext, @@ -396,6 +430,20 @@ export interface ErrorReporter { catch: (error: HandlerExecutionError) => void } +export interface BlobStorage { + has: (content: BlobMultihash) => Promise> + createUploadUrl: ( + content: BlobMultihash, + size: number + ) => Promise + }, Failure>> +} + export interface CarStoreBucket { has: (link: UnknownLink) => Promise createUploadUrl: ( @@ -442,6 +490,23 @@ export interface RecordKeyConflict extends Failure { name: 'RecordKeyConflict' } +export interface AllocationStorage { + exists: (space: DID, blobMultihash: BlobMultihash) => Promise> + /** Inserts an item in the table if it does not already exist. */ + insert: ( + item: BlobAddInput + ) => Promise> + /** Removes an item from the table but fails if the item does not exist. */ + remove: ( + space: DID, + blobMultihash: BlobMultihash + ) => Promise> + list: ( + space: DID, + options?: ListOptions + ) => Promise, Failure>> +} + export interface StoreTable { inspect: (link: UnknownLink) => Promise> exists: (space: DID, link: UnknownLink) => Promise> @@ -510,6 +575,20 @@ export type AdminUploadInspectResult = Result< AdminUploadInspectFailure > +export interface Blob { + content: BlobMultihash + size: number +} + +export interface BlobAddInput { + space: DID + invocation: UnknownLink + blob: Blob +} + +export interface BlobAddOutput + extends Omit {} + export interface StoreAddInput { space: DID link: UnknownLink diff --git a/packages/upload-api/test/helpers/context.js b/packages/upload-api/test/helpers/context.js index 0c126d122..c0f9e948d 100644 --- a/packages/upload-api/test/helpers/context.js +++ b/packages/upload-api/test/helpers/context.js @@ -5,6 +5,8 @@ import { getStoreImplementations, getQueueImplementations, } from '@web3-storage/filecoin-api/test/context/service' +import { AllocationStorage } from '../storage/allocation-storage.js' +import { BlobStorage } from '../storage/blob-storage.js' import { CarStoreBucket } from '../storage/car-store-bucket.js' import { StoreTable } from '../storage/store-table.js' import { UploadTable } from '../storage/upload-table.js' @@ -36,7 +38,9 @@ export const createContext = async ( ) => { const requirePaymentPlan = options.requirePaymentPlan const storeTable = new StoreTable() + const allocationStorage = new AllocationStorage() const uploadTable = new UploadTable() + const blobStorage = await BlobStorage.activate(options) const carStoreBucket = await CarStoreBucket.activate(options) const dudewhereBucket = new DudewhereBucket() const revocationsStorage = new RevocationsStorage() @@ -90,8 +94,10 @@ export const createContext = async ( }, maxUploadSize: 5_000_000_000, storeTable, + allocationStorage, uploadTable, carStoreBucket, + blobStorage, dudewhereBucket, filecoinSubmitQueue, pieceOfferQueue, diff --git a/packages/upload-api/test/storage/allocation-storage.js b/packages/upload-api/test/storage/allocation-storage.js new file mode 100644 index 000000000..d59274b99 --- /dev/null +++ b/packages/upload-api/test/storage/allocation-storage.js @@ -0,0 +1,107 @@ +import * as Types from '../../src/types.js' +import { equals } from 'uint8arrays/equals' + +/** + * @implements {Types.AllocationStorage} + */ +export class AllocationStorage { + constructor() { + /** @type {(Types.BlobAddInput & Types.BlobListItem)[]} */ + this.items = [] + } + + /** + * @param {Types.BlobAddInput} input + * @returns {ReturnType} + */ + async insert({ space, invocation, ...output }) { + if ( + this.items.some((i) => i.space === space && equals(i.blob.content, output.blob.content)) + ) { + return { + error: { name: 'RecordKeyConflict', message: 'record key conflict' }, + } + } + this.items.unshift({ + space, + invocation, + ...output, + insertedAt: new Date().toISOString(), + }) + return { ok: output } + } + + /** + * @param {Types.DID} space + * @param {Uint8Array} blobMultihash + * @returns {ReturnType} + */ + async exists(space, blobMultihash) { + const item = this.items.find( + (i) => i.space === space && equals(i.blob.content, blobMultihash) + ) + return { ok: !!item } + } + + /** + * @param {Types.DID} space + * @param {Uint8Array} blobMultihash + * @returns {ReturnType} + */ + async remove(space, blobMultihash) { + const item = this.items.find( + (i) => i.space === space && equals(i.blob.content, blobMultihash) + ) + if (!item) { + return { error: { name: 'RecordNotFound', message: 'record not found' } } + } + this.items = this.items.filter((i) => i !== item) + return { + ok: { + size: item.blob.size + } + } + } + + /** + * @param {Types.DID} space + * @param {Types.ListOptions} options + * @returns {ReturnType} + */ + async list( + space, + { cursor = '0', pre = false, size = this.items.length } = {} + ) { + const offset = parseInt(cursor, 10) + const items = pre ? this.items.slice(0, offset) : this.items.slice(offset) + + const matches = [...items.entries()] + .filter(([n, item]) => item.space === space) + .slice(0, size) + + if (matches.length === 0) { + return { ok: { size: 0, results: [] } } + } + + const first = matches[0] + const last = matches[matches.length - 1] + + const start = first[0] || 0 + const end = last[0] || 0 + const values = matches.map(([_, item]) => item) + + const [before, after, results] = pre + ? [`${start}`, `${end + 1}`, values] + : [`${start + offset}`, `${end + 1 + offset}`, values] + + return { + ok: { + size: values.length, + before, + after, + cursor: after, + results, + }, + } + } +} diff --git a/packages/upload-api/test/storage/blob-storage.js b/packages/upload-api/test/storage/blob-storage.js new file mode 100644 index 000000000..3e3eba687 --- /dev/null +++ b/packages/upload-api/test/storage/blob-storage.js @@ -0,0 +1,143 @@ +import * as Types from '../../src/types.js' + +import { base64pad } from 'multiformats/bases/base64' +import { SigV4 } from '@web3-storage/sigv4' +import { base32 } from 'multiformats/bases/base32' +import { sha256 } from 'multiformats/hashes/sha2' + +/** + * @implements {Types.BlobStorage} + */ +export class BlobStorage { + /** + * @param {Types.CarStoreBucketOptions & {http?: import('http')}} options + */ + static async activate({ http, ...options } = {}) { + const content = new Map() + if (http) { + const server = http.createServer(async (request, response) => { + if (request.method === 'PUT') { + const buffer = new Uint8Array( + parseInt(request.headers['content-length'] || '0') + ) + let offset = 0 + + for await (const chunk of request) { + buffer.set(chunk, offset) + offset += chunk.length + } + + const hash = await sha256.digest(buffer) + const checksum = base64pad.baseEncode(hash.digest) + + if (checksum !== request.headers['x-amz-checksum-sha256']) { + response.writeHead(400, `checksum mismatch`) + } else { + const { pathname } = new URL(request.url || '/', url) + content.set(pathname, buffer) + response.writeHead(200) + } + } else { + response.writeHead(405) + } + + response.end() + // otherwise it keep connection lingering + response.destroy() + }) + await new Promise((resolve) => server.listen(resolve)) + + // @ts-ignore - this is actually what it returns on http + const port = server.address().port + const url = new URL(`http://localhost:${port}`) + + return new BlobStorage({ + ...options, + content, + url, + server, + }) + } else { + return new BlobStorage({ + ...options, + content, + url: new URL(`http://localhost:8989`), + }) + } + } + + /** + * @param {Types.CarStoreBucketOptions & { server?: import('http').Server, url: URL, content: Map }} options + */ + constructor({ + content, + url, + server, + accessKeyId = 'id', + secretAccessKey = 'secret', + bucket = 'my-bucket', + region = 'eu-central-1', + expires, + }) { + this.server = server + this.baseURL = url + this.accessKeyId = accessKeyId + this.secretAccessKey = secretAccessKey + this.bucket = bucket + this.region = region + this.expires = expires + this.content = content + } + + /** + * @param {Uint8Array} multihash + */ + async has(multihash) { + const encodedMultihash = base32.encode(multihash) + return { + ok: this.content.has(`/${this.bucket}/${encodedMultihash}/${encodedMultihash}.blob`) + } + } + + /** + * @param {Uint8Array} multihash + * @param {number} size + */ + async createUploadUrl(multihash, size) { + const { bucket, expires, accessKeyId, secretAccessKey, region, baseURL } = this + const encodedMultihash = base32.encode(multihash) + // sigv4 + const sig = new SigV4({ + accessKeyId, + secretAccessKey, + region, + }) + + const checksum = base64pad.baseEncode(multihash) + const { pathname, search, hash } = sig.sign({ + key: `${encodedMultihash}/${encodedMultihash}.blob`, + checksum, + bucket, + expires, + }) + + const url = new URL(baseURL) + url.search = search + url.pathname = `/${bucket}${pathname}` + url.hash = hash + url.searchParams.set( + 'X-Amz-SignedHeaders', + ['content-length', 'host', 'x-amz-checksum-sha256'].join(';') + ) + + return { + ok: { + url, + headers: { + 'x-amz-checksum-sha256': checksum, + 'content-length': String(size), + }, + } + } + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3ed85d28f..eb0e0b7bd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -169,6 +169,9 @@ importers: '@web3-storage/data-segment': specifier: ^3.2.0 version: 3.2.0 + uint8arrays: + specifier: ^5.0.3 + version: 5.0.3 devDependencies: '@types/assert': specifier: ^1.5.6 @@ -406,6 +409,9 @@ importers: p-retry: specifier: ^5.1.2 version: 5.1.2 + uint8arrays: + specifier: ^5.0.3 + version: 5.0.3 devDependencies: '@ipld/car': specifier: ^5.1.1 @@ -9627,6 +9633,10 @@ packages: resolution: {integrity: sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==} engines: {node: '>=16.0.0', npm: '>=7.0.0'} + /multiformats@13.1.0: + resolution: {integrity: sha512-HzdtdBwxsIkzpeXzhQ5mAhhuxcHbjEHH+JQoxt7hG/2HGFjjwyolLo7hbaexcnhoEuV4e0TNJ8kkpMjiEYY4VQ==} + dev: false + /multimatch@5.0.0: resolution: {integrity: sha512-ypMKuglUrZUD99Tk2bUQ+xNQj43lPEfAeX2o9cTteAmShXy2VHDJpuwu1o0xqoKCt9jLVAvwyFKdLTPXKAfJyA==} engines: {node: '>=10'} @@ -12442,6 +12452,12 @@ packages: dependencies: multiformats: 12.1.3 + /uint8arrays@5.0.3: + resolution: {integrity: sha512-6LBuKji28kHjgPJMkQ6GDaBb1lRwIhyOYq6pDGwYMoDPfImE9SkuYENVmR0yu9yGgs2clHUSY9fKDukR+AXfqQ==} + dependencies: + multiformats: 13.1.0 + dev: false + /unbox-primitive@1.0.2: resolution: {integrity: sha512-61pPlCD9h51VoreyJ0BReideM3MDKMKnh6+V9L08331ipq6Q8OFXZYiqP6n/tbHx4s5I9uRhcye6BrbkizkBDw==} dependencies: