Skip to content

Commit

Permalink
feat: integrate aggregation client
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jun 8, 2023
1 parent e4af788 commit e0d5f38
Show file tree
Hide file tree
Showing 26 changed files with 3,593 additions and 12,884 deletions.
7 changes: 7 additions & 0 deletions .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@

# uncomment to set SENTRY_DSN
# SENTRY_DSN = ''

# web3.storage DID
# DID = 'did:web:staging.web3.storage'

# Aggregation service
# AGGREGATION_SERVICE_DID = ''
# AGGREGATION_SERVICE_URL = ''
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,58 @@ Ensure the following variables are set in the env when deploying

The root domain to deploy the API to. e.g `filecoin.web3.storage`. The value should match a hosted zone configured in route53 that your aws account has access to.

#### `DID`

[DID](https://www.w3.org/TR/did-core/) of the ucanto server. e.g. `did:web:web3.storage`. Optional: if omitted, a `did:key` will be derrived from `PRIVATE_KEY`

#### `AGGREGATION_SERVICE_DID`

DID of the aggregation service.

#### `AGGREGATION_SERVICE_URL`

URL of the aggregation service.

### Secrets

Set production secrets in aws SSM via [`sst secrets`](https://docs.sst.dev/config#sst-secrets). The region must be set to the one you deploy that stage to

```sh
# set `PRIVATE_KEY` for prod
$ npx sst secrets set --region us-west-2 --stage prod PRIVATE_KEY "MgCblCY...="
```

To set a fallback value for `staging` or an ephmeral PR build use [`sst secrets set-fallback`](https://docs.sst.dev/config#fallback-values)

```sh
# set `PRIVATE_KEY` for any stage in us-east-2
$ npx sst secrets set --fallback --region us-east-2 PRIVATE_KEY "MgCZG7...="
```

**note** The fallback value can only be inherited by stages deployed in the same AWS account and region.

Confirm the secret value using [`sst secrets list`](https://docs.sst.dev/config#sst-secrets)

```sh
$ npx sst secrets list --region us-east-2
PRIVATE_KEY MgCZG7...= (fallback)

$ npx sst secrets list --region us-west-2 --stage prod
PRIVATE_KEY M...=
```

#### `PRIVATE_KEY`

The [`multibase`](https://github.com/multiformats/multibase) encoded ED25519 keypair used as the signing key for the upload-api.

Generated by [@ucanto/principal `EdSigner`](https://github.com/web3-storage/ucanto) via [`ucan-key`](https://www.npmjs.com/package/ucan-key)

_Example:_ `MgCZG7EvaA...1pX9as=`

## License

Dual-licensed under [MIT + Apache 2.0](LICENSE.md)

</p>

[SST]: https://sst.dev
Expand Down
2 changes: 1 addition & 1 deletion data/functions/add-cars-to-ferry.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as Sentry from '@sentry/serverless'
import { unmarshall } from '@aws-sdk/util-dynamodb'

import { addCarsToFerry } from '../lib/add-cars-to-ferry.js'
import { addCarsToFerry } from '../lib/index.js'
import { parseDynamoDbEvent } from '../utils/parse-dynamodb-event.js'
import { mustGetEnv } from '../lib/utils.js'

Expand Down
72 changes: 72 additions & 0 deletions data/functions/offer-ferry-for-aggregate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import * as Sentry from '@sentry/serverless'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import { Config } from '@serverless-stack/node/config/index.js'

import { setFerryOffer } from '../lib/index.js'
import { parseDynamoDbEvent } from '../utils/parse-dynamodb-event.js'
import { mustGetEnv, getAggregationServiceConnection } from '../lib/utils.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = mustGetEnv('AWS_REGION')

/**
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
async function handler(event) {
const {
CAR_TABLE_NAME,
FERRY_TABLE_NAME,
DID,
AGGREGATION_SERVICE_DID,
AGGREGATION_SERVICE_URL,
} = getEnv()
const { PRIVATE_KEY } = Config

const records = parseDynamoDbEvent(event)
if (records.length > 1) {
throw new Error('Should only receive one ferry to update')
}

// @ts-expect-error can't figure out type of new
const newRecord = unmarshall(records[0].new)

const ctx = {
car: {
region: AWS_REGION,
tableName: CAR_TABLE_NAME
},
ferry: {
region: AWS_REGION,
tableName: FERRY_TABLE_NAME
},
storefront: {
DID,
PRIVATE_KEY
},
aggregationServiceConnection: getAggregationServiceConnection({
DID: AGGREGATION_SERVICE_DID,
URL: AGGREGATION_SERVICE_URL
})
}
await setFerryOffer(newRecord.id, ctx)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)

/**
* Get Env validating it is set.
*/
function getEnv() {
return {
DID: process.env.DID,
CAR_TABLE_NAME: mustGetEnv('CAR_TABLE_NAME'),
FERRY_TABLE_NAME: mustGetEnv('FERRY_TABLE_NAME'),
AGGREGATION_SERVICE_DID: mustGetEnv('AGGREGATION_SERVICE_DID'),
AGGREGATION_SERVICE_URL: mustGetEnv('AGGREGATION_SERVICE_URL'),
}
}
2 changes: 1 addition & 1 deletion data/functions/set-ferry-as-ready.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as Sentry from '@sentry/serverless'
import { unmarshall } from '@aws-sdk/util-dynamodb'

import { setFerryAsReady } from '../lib/set-ferry-as-ready.js'
import { setFerryAsReady } from '../lib/index.js'
import { parseDynamoDbEvent } from '../utils/parse-dynamodb-event.js'
import { mustGetEnv } from '../lib/utils.js'

Expand Down
28 changes: 0 additions & 28 deletions data/lib/add-cars-to-ferry.js

This file was deleted.

45 changes: 45 additions & 0 deletions data/lib/aggregate-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import * as ed25519 from '@ucanto/principal/ed25519'
import * as DID from '@ipld/dag-ucan/did'
import { Aggregate } from '@web3-storage/aggregate-client'

/**
* @param {import('../types').StorefrontSignerCtx} serviceSignerCtx
* @param {ed25519.ConnectionView<any>} aggregationServiceConnection
*/
export async function createAggregateService (serviceSignerCtx, aggregationServiceConnection) {
const issuer = getStorefrontSigner(serviceSignerCtx)
const audience = aggregationServiceConnection.id

/** @type {import('@web3-storage/aggregate-client/types').InvocationConfig} */
const InvocationConfig = {
issuer,
audience,
with: issuer.did(),
}

return {
/**
*
* @param {import('@web3-storage/aggregate-client/types').Offer[]} offers
*/
offer: async function (offers) {
return await Aggregate.aggregateOffer(
InvocationConfig,
offers,
{ connection: aggregationServiceConnection }
)
}
}
}

/**
* @param {import('../types').StorefrontSignerCtx} config
*/
function getStorefrontSigner(config) {
const signer = ed25519.parse(config.PRIVATE_KEY)
if (config.DID) {
const did = DID.parse(config.DID).did()
return signer.withDID(did)
}
return signer
}
139 changes: 139 additions & 0 deletions data/lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { pipe } from 'it-pipe'
import { batch } from 'streaming-iterables'
import { CID } from 'multiformats/cid'

import { MAX_BATCH_GET_ITEMS } from '../tables/constants.js'
import { createCarTable } from '../tables/car.js'
import { createFerryTable } from '../tables/ferry.js'
import { createAggregateService } from './aggregate-service.js'

/**
* @typedef {import('../types.js').FerryOpts} FerryOpts
* @typedef {import('../types.js').CarItem} CarItem
* @typedef {import('../types.js').CarItemFerry} CarItemFerry
*
* @typedef {object} FerryCtx
* @property {string} region
* @property {string} tableName
* @property {FerryOpts} [options]
*
* @typedef {object} CarTableCtx
* @property {string} region
* @property {string} tableName
* @property {import('../types.js').CarOpts} [options]
*/

/**
* Add cars to a loading ferry.
*
* @param {CarItemFerry[]} cars
* @param {FerryCtx} ferryCtx
*/
export async function addCarsToFerry (cars, ferryCtx) {
const ferryTable = createFerryTable(ferryCtx.region, ferryCtx.tableName, ferryCtx.options)

const ferryId = await ferryTable.getFerryLoading()
await ferryTable.addCargo(ferryId, cars)

return {
id: ferryId
}
}

/**
* Sets current Ferry as ready if not previously done
*
* @param {string} ferryId
* @param {FerryCtx} ferryCtx
*/
export async function setFerryAsReady (ferryId, ferryCtx) {
const ferryTable = createFerryTable(ferryCtx.region, ferryCtx.tableName, ferryCtx.options)

// Update state of ferry to ready
try {
await ferryTable.setAsReady(ferryId)
} catch (/** @type {any} */ error) {
// If error is for condition we can safely ignore it given this was changed in a concurrent operation
if (error.name !== 'ConditionalCheckFailedException') {
throw error
}
}
}

/**
* Sets current Ferry offer.
*
* @param {string} ferryId
* @param {object} ctx
* @param {FerryCtx} ctx.car
* @param {FerryCtx} ctx.ferry
* @param {import('../types.js').StorefrontSignerCtx} ctx.storefront
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} ctx.aggregationServiceConnection
*/
export async function setFerryOffer (ferryId, ctx) {
const carTable = createCarTable(ctx.car.region, ctx.car.tableName, ctx.car.options)
const ferryTable = createFerryTable(ctx.ferry.region, ctx.ferry.tableName, ctx.ferry.options)
const aggregateService = await createAggregateService(ctx.storefront, ctx.aggregationServiceConnection)

// Create Offer
/** @type {CarItem[]} */
const offers = await pipe(
ferryTable.getCargo(ferryId, { limit: MAX_BATCH_GET_ITEMS }),
batch(MAX_BATCH_GET_ITEMS),
/**
* @param {AsyncGenerator<CarItemFerry[], any, unknown> | Generator<CarItemFerry[], any, unknown>} source
*/
// @ts-expect-error type not inferred
async function (source) {
/** @type {CarItemFerry[]} */
const cars = []
for await (const items of source) {
const pageCars = await carTable.batchGet(items)
for (const car of pageCars) {
cars.push(car)
}
}

return cars
}
)

// Send offer
const nOffers = offers.map(offer => ({
...offer,
link: CID.parse(offer.link).link()
}))
// @ts-expect-error CID versions
await aggregateService.offer(nOffers)

// Update state of ferry to ready
try {
await ferryTable.setAsDealPending(ferryId)
} catch (/** @type {any} */ error) {
// If error is for condition we can safely ignore it given this was changed in a concurrent operation
if (error.name !== 'ConditionalCheckFailedException') {
throw error
}
}
}

/**
* Sets current Ferry as deal processed
*
* @param {string} ferryId
* @param {string} commP
* @param {FerryCtx} ferryCtx
*/
export async function setFerryAsProcessed (ferryId, commP, ferryCtx) {
const ferryTable = createFerryTable(ferryCtx.region, ferryCtx.tableName, ferryCtx.options)

// Update state of ferry to deal processed
try {
await ferryTable.setAsDealProcessed(ferryId, commP)
} catch (/** @type {any} */ error) {
// If error is for condition we can safely ignore it given this was changed in a concurrent operation
if (error.name !== 'ConditionalCheckFailedException') {
throw error
}
}
}
Loading

0 comments on commit e0d5f38

Please sign in to comment.