Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate aggregation client #22

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@

# uncomment to set SENTRY_DSN
# SENTRY_DSN = ''

# web3.storage DID
# DID = 'did:web:staging.web3.storage'

# Aggregation service
# AGGREGATION_SERVICE_DID = ''
# AGGREGATION_SERVICE_URL = ''
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,58 @@ Ensure the following variables are set in the env when deploying

The root domain to deploy the API to. e.g `filecoin.web3.storage`. The value should match a hosted zone configured in route53 that your aws account has access to.

#### `DID`

[DID](https://www.w3.org/TR/did-core/) of the ucanto server. e.g. `did:web:web3.storage`. Optional: if omitted, a `did:key` will be derrived from `PRIVATE_KEY`

#### `AGGREGATION_SERVICE_DID`

DID of the aggregation service.

#### `AGGREGATION_SERVICE_URL`

URL of the aggregation service.

### Secrets

Set production secrets in aws SSM via [`sst secrets`](https://docs.sst.dev/config#sst-secrets). The region must be set to the one you deploy that stage to

```sh
# set `PRIVATE_KEY` for prod
$ npx sst secrets set --region us-west-2 --stage prod PRIVATE_KEY "MgCblCY...="
```

To set a fallback value for `staging` or an ephmeral PR build use [`sst secrets set-fallback`](https://docs.sst.dev/config#fallback-values)

```sh
# set `PRIVATE_KEY` for any stage in us-east-2
$ npx sst secrets set --fallback --region us-east-2 PRIVATE_KEY "MgCZG7...="
```

**note** The fallback value can only be inherited by stages deployed in the same AWS account and region.

Confirm the secret value using [`sst secrets list`](https://docs.sst.dev/config#sst-secrets)

```sh
$ npx sst secrets list --region us-east-2
PRIVATE_KEY MgCZG7...= (fallback)

$ npx sst secrets list --region us-west-2 --stage prod
PRIVATE_KEY M...=
```

#### `PRIVATE_KEY`

The [`multibase`](https://github.com/multiformats/multibase) encoded ED25519 keypair used as the signing key for the upload-api.

Generated by [@ucanto/principal `EdSigner`](https://github.com/web3-storage/ucanto) via [`ucan-key`](https://www.npmjs.com/package/ucan-key)

_Example:_ `MgCZG7EvaA...1pX9as=`

## License

Dual-licensed under [MIT + Apache 2.0](LICENSE.md)

</p>

[SST]: https://sst.dev
Expand Down
2 changes: 1 addition & 1 deletion data/functions/add-cars-to-ferry.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as Sentry from '@sentry/serverless'
import { unmarshall } from '@aws-sdk/util-dynamodb'

import { addCarsToFerry } from '../lib/add-cars-to-ferry.js'
import { addCarsToFerry } from '../lib/index.js'
import { parseDynamoDbEvent } from '../utils/parse-dynamodb-event.js'
import { mustGetEnv } from '../lib/utils.js'

Expand Down
77 changes: 77 additions & 0 deletions data/functions/offer-ferry-for-aggregate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import * as Sentry from '@sentry/serverless'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import { Config } from '@serverless-stack/node/config/index.js'

import { setFerryOffer } from '../lib/index.js'
import { parseDynamoDbEvent } from '../utils/parse-dynamodb-event.js'
import { mustGetEnv, getAggregationServiceConnection } from '../lib/utils.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const AWS_REGION = mustGetEnv('AWS_REGION')

/**
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
async function handler(event) {
const {
CAR_TABLE_NAME,
CARGO_TABLE_NAME,
FERRY_TABLE_NAME,
DID,
AGGREGATION_SERVICE_DID,
AGGREGATION_SERVICE_URL,
} = getEnv()
const { PRIVATE_KEY } = Config

const records = parseDynamoDbEvent(event)
if (records.length > 1) {
throw new Error('Should only receive one ferry to update')
}

// @ts-expect-error can't figure out type of new
const newRecord = unmarshall(records[0].new)

const ctx = {
car: {
region: AWS_REGION,
tableName: CAR_TABLE_NAME
},
ferry: {
region: AWS_REGION,
tableName: FERRY_TABLE_NAME,
options: {
cargoTableName: CARGO_TABLE_NAME
}
},
storefront: {
DID,
PRIVATE_KEY
},
aggregationServiceConnection: getAggregationServiceConnection({
DID: AGGREGATION_SERVICE_DID,
URL: AGGREGATION_SERVICE_URL
})
}
await setFerryOffer(newRecord.id, ctx)
}

export const consumer = Sentry.AWSLambda.wrapHandler(handler)

/**
* Get Env validating it is set.
*/
function getEnv() {
return {
DID: mustGetEnv('DID'),
CAR_TABLE_NAME: mustGetEnv('CAR_TABLE_NAME'),
CARGO_TABLE_NAME: mustGetEnv('CARGO_TABLE_NAME'),
FERRY_TABLE_NAME: mustGetEnv('FERRY_TABLE_NAME'),
AGGREGATION_SERVICE_DID: mustGetEnv('AGGREGATION_SERVICE_DID'),
AGGREGATION_SERVICE_URL: mustGetEnv('AGGREGATION_SERVICE_URL'),
}
}
2 changes: 1 addition & 1 deletion data/functions/set-ferry-as-ready.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as Sentry from '@sentry/serverless'
import { unmarshall } from '@aws-sdk/util-dynamodb'

import { setFerryAsReady } from '../lib/set-ferry-as-ready.js'
import { setFerryAsReady } from '../lib/index.js'
import { parseDynamoDbEvent } from '../utils/parse-dynamodb-event.js'
import { mustGetEnv } from '../lib/utils.js'

Expand Down
28 changes: 0 additions & 28 deletions data/lib/add-cars-to-ferry.js

This file was deleted.

45 changes: 45 additions & 0 deletions data/lib/aggregate-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import * as ed25519 from '@ucanto/principal/ed25519'
import * as DID from '@ipld/dag-ucan/did'
import { Aggregate } from '@web3-storage/aggregate-client'

/**
* @param {import('../types').StorefrontSignerCtx} serviceSignerCtx
* @param {ed25519.ConnectionView<any>} aggregationServiceConnection
*/
export async function createAggregateService (serviceSignerCtx, aggregationServiceConnection) {
const issuer = getStorefrontSigner(serviceSignerCtx)
const audience = aggregationServiceConnection.id

/** @type {import('@web3-storage/aggregate-client/types').InvocationConfig} */
const InvocationConfig = {
issuer,
audience,
with: issuer.did(),
}

return {
/**
*
* @param {import('@web3-storage/aggregate-client/types').Offer[]} offers
*/
offer: async function (offers) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am bit confused isn't this supposed to implement aggregate service API ?

Copy link
Contributor Author

@vasco-santos vasco-santos Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR web3-storage/dealer#2 implementes aggregate service API. w3filecoin uses the client to create offers, in other words this side is the Storefront

return await Aggregate.aggregateOffer(
InvocationConfig,
offers,
{ connection: aggregationServiceConnection }
)
}
}
}

/**
* @param {import('../types').StorefrontSignerCtx} config
*/
function getStorefrontSigner(config) {
const signer = ed25519.parse(config.PRIVATE_KEY)
if (config.DID) {
const did = DID.parse(config.DID).did()
return signer.withDID(did)
}
return signer
}
142 changes: 142 additions & 0 deletions data/lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { pipe } from 'it-pipe'
import { batch } from 'streaming-iterables'
import { CID } from 'multiformats/cid'

import { MAX_BATCH_GET_ITEMS } from '../tables/constants.js'
import { createCarTable } from '../tables/car.js'
import { createFerryTable } from '../tables/ferry.js'
import { createAggregateService } from './aggregate-service.js'

/**
* @typedef {import('../types.js').FerryOpts} FerryOpts
* @typedef {import('../types.js').CarItem} CarItem
* @typedef {import('../types.js').CarItemFerry} CarItemFerry
*
* @typedef {object} FerryCtx
* @property {string} region
* @property {string} tableName
* @property {FerryOpts} [options]
*
* @typedef {object} CarTableCtx
* @property {string} region
* @property {string} tableName
* @property {import('../types.js').CarOpts} [options]
*/

/**
* Add cars to a loading ferry.
*
* @param {CarItemFerry[]} cars
* @param {FerryCtx} ferryCtx
*/
export async function addCarsToFerry (cars, ferryCtx) {
const ferryTable = createFerryTable(ferryCtx.region, ferryCtx.tableName, ferryCtx.options)

const ferryId = await ferryTable.getFerryLoading()
await ferryTable.addCargo(ferryId, cars)
Comment on lines +35 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm struggling to keep up with all that is going on in the PR and maybe I'm also tired so I apologize if some of my comments aren't on point. Here specifically doing async read and then async write makes me uncomfortable without full picture in my head. Specifically I worry of the race condition because ferryID could change on concurrent calls and I'm not sure of the consequences.

Perhaps code comment addressing concurrency concern is all that's needed here.

With that said I personally would go for design (which may not be applicable here) where writer simply appends to the queue (well known ferry id) and something else than moves things off the batch from queue into actual ferry. Such approach tends fare better given that no writes depend on read state that can change in the meantime.

Here we deal with tables however so things may not exactly apply. That said in relational dbs it's not uncommon to do similar approach where in one table you simply record things and in the other you record which record from first table is in which group. That also ends up doing appends as instead of updates and consequently avoiding coordination overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate the concerns. So, all dynamoDB stream consumers (the case of this) should only have one mutation to avoid partial failure.

You are right that we need to be careful with read + write, and that is where the DynamoDB ConditionExpression come in play. Could be interesting for you to go through the table abstraction code https://github.com/web3-storage/w3filecoin/blob/main/data/tables/ferry.js

So, let's go into this case:

  1. CAR details is written into car table
  2. DynamoDB stream consumer kicks in from this insert https://github.com/web3-storage/w3filecoin/blob/main/stacks/data-stack.js#L57 with a batch of CARs to be added into an aggregate
  3. addCarsToFerry function is called with batch
  4. Get a ferry with state loading and attempt to add given batch into ferry
  5. In above mutation is where things can succeed or fail, so let's see them in different cases

First important to look into the addCargo function https://github.com/web3-storage/w3filecoin/blob/main/data/tables/ferry.js#L54 . It is a TransactWrite command, which means all or nothing. It also includes ConditionExpression that essentially is what solves this concern "because ferryID could change on concurrent calls and I'm not sure of the consequences.". Basically, parallel calls might put the ferry full, or change it state to ready (not accepting more cargo). In that case, this dynamoDb stream consumer fails and will be retried (and with half the load, to make sure use cases where ferry is almost full eventually will get filled in and a new created https://github.com/web3-storage/w3filecoin/blob/main/stacks/data-stack.js#L74)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code comment would of course help out, but is also difficult to specify where is the borderline of adding comments or not. Table abstraction code already covers these conditions, so also adding these code comments in callers would make us need to maintain a lot of duplicated documentation.


return {
id: ferryId
}
}

/**
* Sets current Ferry as ready if not previously done
*
* @param {string} ferryId
* @param {FerryCtx} ferryCtx
*/
export async function setFerryAsReady (ferryId, ferryCtx) {
const ferryTable = createFerryTable(ferryCtx.region, ferryCtx.tableName, ferryCtx.options)

// Update state of ferry to ready
try {
await ferryTable.setAsReady(ferryId)
} catch (/** @type {any} */ error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please catch this error in the setAsReady instead so that it's caller doesn't need to figure which errors it must handle and which one it can ignore. Reading this code I have no idea what conditional ferryTable is using so it's implementation details end up leaking here.

Ideally all the IO ops should return Result types with self-describing error types, that way here you'd be able to do if result.error....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setAsReady IMHO should not be opinated to catch error if it fails to set as ready because its ConditionExpression fails... It will only succeed if - ferry stat is in loading state and its size is enough (this will likely need to change with commD calc).

Therefore, the consumer of this (lambda function calling this lib function setFerryAsReady) is the one that should decide if this is a ok error to tolerate. Given consumer of this comes from DynamoDB event streams that can run in parallel, it is possible that multiple calls could happen concurrently. So, first would succeed and second fail because state would not be loading anymore. And consumer is ok with this failure and won't need to retry it.

// If error is for condition we can safely ignore it given this was changed in a concurrent operation
if (error.name !== 'ConditionalCheckFailedException') {
throw error
}
}
}

/**
* Sets current Ferry offer.
*
* @param {string} ferryId
* @param {object} ctx
* @param {FerryCtx} ctx.car
* @param {FerryCtx} ctx.ferry
* @param {import('../types.js').StorefrontSignerCtx} ctx.storefront
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} ctx.aggregationServiceConnection
*/
export async function setFerryOffer (ferryId, ctx) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that how this is implemented, there are guarantees that if (sadly is possible) dynamoDB stream handler is called multiple times for same thing, that two different offers won't be created.

const carTable = createCarTable(ctx.car.region, ctx.car.tableName, ctx.car.options)
const ferryTable = createFerryTable(ctx.ferry.region, ctx.ferry.tableName, ctx.ferry.options)
const aggregateService = await createAggregateService(ctx.storefront, ctx.aggregationServiceConnection)

// Given ferry is ready to get unloaded to invoke `aggregate/offer`.
// Ferry cargo (reference to CARs in ferry) is read via async iterable
// followed by getting the CAR information of each cargo item in batches.
// Batches are limited by DynamoDB maximum batch sizes.
/** @type {CarItem[]} */
const offers = await pipe(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to get CAR details of all the cargo in ferry to include in aggregate/offer

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add some code comments describing what's going on here. It have read through this block several times before I got what's going on. In particular as reader I don't exactly know what referenced tables hold and how they relate, so I need to build up that context before I can comprehend what is going on.

Ideally there will be a code comment providing that context. E.g. Here we pull CARs that were batched into "ferry" with a given id so we can create an aggregate offer for it....

Copy link
Contributor Author

@vasco-santos vasco-santos Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the comment for this. There are comments in data-stack with information about role of each table in the picture. The architecture document that was shared and presented in Demos could also be a good read https://www.notion.so/w3filecoin-pipeline-architecture-204bb37b550b4e46ab9ea0799396d96b . This document can be moved into Github as well.

ferryTable.getCargo(ferryId, { limit: MAX_BATCH_GET_ITEMS }),
batch(MAX_BATCH_GET_ITEMS),
/**
* @param {AsyncGenerator<CarItemFerry[], any, unknown> | Generator<CarItemFerry[], any, unknown>} source
*/
// @ts-expect-error type not inferred
async function (source) {
/** @type {CarItemFerry[]} */
const cars = []
for await (const items of source) {
const pageCars = await carTable.batchGet(items)
for (const car of pageCars) {
cars.push(car)
}
}

return cars
}
)

// Send offer
const nOffers = offers.map(offer => ({
...offer,
link: CID.parse(offer.link)
}))
// @ts-expect-error CID versions
await aggregateService.offer(nOffers)

// Update state of ferry to ready
try {
await ferryTable.setAsDealPending(ferryId)
} catch (/** @type {any} */ error) {
// If error is for condition we can safely ignore it given this was changed in a concurrent operation
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like setAsDealPending is leaking implementation details. Why would it expect caller to know hat conditionals are used by the function ? If those conditions change callers will start ignoring errors they may have to handle.

I would argue that either:

  1. setAsDealPending catches the errors that callers can ignore.
  2. It returns Return type with well defined errors types that tell caller who's responsibility it is to handle them (and I a lot more concrete error types than ConditionalCheckFailed)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same answer as above. We can add return types to make it easier if that is helpful.

Would like to see opinion from @alanshaw given this code was just moved around to individual file but we already covered it before in previous PRs

if (error.name !== 'ConditionalCheckFailedException') {
throw error
}
}
}

/**
* Sets current Ferry as deal processed
*
* @param {string} ferryId
* @param {string} commP
* @param {FerryCtx} ferryCtx
*/
export async function setFerryAsProcessed (ferryId, commP, ferryCtx) {
const ferryTable = createFerryTable(ferryCtx.region, ferryCtx.tableName, ferryCtx.options)

// Update state of ferry to deal processed
try {
await ferryTable.setAsDealProcessed(ferryId, commP)
} catch (/** @type {any} */ error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same sentiment as with setAsDealPending.

// If error is for condition we can safely ignore it given this was changed in a concurrent operation
if (error.name !== 'ConditionalCheckFailedException') {
throw error
}
}
}
Loading