-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: integrate aggregation client #22
Conversation
cdd7e7b
to
5f3f874
Compare
5f3f874
to
74eadfd
Compare
74eadfd
to
4e1ee62
Compare
4e1ee62
to
e0d5f38
Compare
e0d5f38
to
58e5229
Compare
58e5229
to
85d4dfa
Compare
85d4dfa
to
7b0b5fc
Compare
View stack outputs
|
7b0b5fc
to
f78015f
Compare
* @param {import('../types.js').StorefrontSignerCtx} ctx.storefront | ||
* @param {import('@ucanto/principal/ed25519').ConnectionView<any>} ctx.aggregationServiceConnection | ||
*/ | ||
export async function setFerryOffer (ferryId, ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that how this is implemented, there are guarantees that if (sadly is possible) dynamoDB stream handler is called multiple times for same thing, that two different offers won't be created.
|
||
// Create Offer | ||
/** @type {CarItem[]} */ | ||
const offers = await pipe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to get CAR details of all the cargo in ferry to include in aggregate/offer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add some code comments describing what's going on here. It have read through this block several times before I got what's going on. In particular as reader I don't exactly know what referenced tables hold and how they relate, so I need to build up that context before I can comprehend what is going on.
Ideally there will be a code comment providing that context. E.g. Here we pull CARs that were batched into "ferry" with a given id so we can create an aggregate offer for it....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add the comment for this. There are comments in data-stack
with information about role of each table in the picture. The architecture document that was shared and presented in Demos could also be a good read https://www.notion.so/w3filecoin-pipeline-architecture-204bb37b550b4e46ab9ea0799396d96b . This document can be moved into Github as well.
@@ -9,7 +9,7 @@ export default function (app) { | |||
app.setDefaultFunctionProps({ | |||
runtime: 'nodejs16.x', | |||
environment: { | |||
NODE_OPTIONS: "--enable-source-maps", | |||
NODE_OPTIONS: "--enable-source-maps --experimental-fetch", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed so that we can use aggregate-client
with ucanto client without passing a fetch implementation manually over the place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the above a code comment so that reader outside of PR can also know why :P
import { CID } from 'multiformats/cid' | ||
import * as raw from 'multiformats/codecs/raw' | ||
import { sha256 } from 'multiformats/hashes/sha2' | ||
|
||
/** @param {Uint8Array} bytes */ | ||
export async function toBlock(bytes) { | ||
const hash = await sha256.digest(bytes) | ||
const cid = CID.createV1(raw.code, hash) | ||
return { cid, bytes } | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copied over from w3infra
import { CarWriter } from '@ipld/car' | ||
import * as CAR from '@ucanto/transport/car' | ||
import { toBlock } from './block.js' | ||
|
||
/** | ||
* @param {Uint8Array} bytes | ||
**/ | ||
export async function toCAR(bytes) { | ||
const block = await toBlock(bytes) | ||
const { writer, out } = CarWriter.create(block.cid) | ||
writer.put(block) | ||
writer.close() | ||
|
||
const chunks = [] | ||
for await (const chunk of out) { | ||
chunks.push(chunk) | ||
} | ||
const blob = new Blob(chunks) | ||
const cid = await CAR.codec.link(new Uint8Array(await blob.arrayBuffer())) | ||
|
||
return Object.assign(blob, { cid, roots: [block.cid] }) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copied over from w3infra
import { toCAR } from './car.js' | ||
|
||
/** @param {number} size */ | ||
export async function randomBytes(size) { | ||
const bytes = new Uint8Array(size) | ||
while (size) { | ||
const chunk = new Uint8Array(Math.min(size, 65_536)) | ||
if (!globalThis.crypto) { | ||
try { | ||
const { webcrypto } = await import('node:crypto') | ||
webcrypto.getRandomValues(chunk) | ||
} catch (error) { | ||
throw new Error( | ||
'unknown environment - no global crypto and not Node.js', | ||
{ cause: error } | ||
) | ||
} | ||
} else { | ||
crypto.getRandomValues(chunk) | ||
} | ||
size -= bytes.length | ||
bytes.set(chunk, size) | ||
} | ||
return bytes | ||
} | ||
|
||
/** @param {number} size */ | ||
export async function randomCAR(size) { | ||
const bytes = await randomBytes(size) | ||
return toCAR(bytes) | ||
} | ||
|
||
/** | ||
* @param {number} length | ||
* @param {number} size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copied over from w3infra
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vasco-santos I tried my best to provide a helpful feedback here, but I'm afraid I've failed to do so. There is just too much context and design decisions (which I think were made while back) that I'm not versed in to do a good job here.
Which shows in my incoherent comments that go all over and sometimes poke at things that perhaps have settled for a while. I am afraid I would have to defer to @alanshaw here or otherwise maybe we can do review over the call so it's easy to build up the context I felt I was missing.
|
||
// Create Offer | ||
/** @type {CarItem[]} */ | ||
const offers = await pipe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add some code comments describing what's going on here. It have read through this block several times before I got what's going on. In particular as reader I don't exactly know what referenced tables hold and how they relate, so I need to build up that context before I can comprehend what is going on.
Ideally there will be a code comment providing that context. E.g. Here we pull CARs that were batched into "ferry" with a given id so we can create an aggregate offer for it....
data/lib/index.js
Outdated
// Send offer | ||
const nOffers = offers.map(offer => ({ | ||
...offer, | ||
link: CID.parse(offer.link).link() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very confusing, I assume we have links stored as string, so we have to parse them. But then why do we .link()
? If there is a reason plz add code comment.
If it's just types Link
is an interface that CID
implements so it should be unnecessary, furthermore there is also multiformats/link
module which I would recommend using unless CID is necessary for interop with libraries that have not loosened their requirements to Link
yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like it is redundant. I think there is no reason for it, good catch
* | ||
* @param {import('@web3-storage/aggregate-client/types').Offer[]} offers | ||
*/ | ||
offer: async function (offers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am bit confused isn't this supposed to implement aggregate service API ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR web3-storage/dealer#2 implementes aggregate service API. w3filecoin uses the client to create offers, in other words this side is the Storefront
const ferryId = await ferryTable.getFerryLoading() | ||
await ferryTable.addCargo(ferryId, cars) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm struggling to keep up with all that is going on in the PR and maybe I'm also tired so I apologize if some of my comments aren't on point. Here specifically doing async read and then async write makes me uncomfortable without full picture in my head. Specifically I worry of the race condition because ferryID could change on concurrent calls and I'm not sure of the consequences.
Perhaps code comment addressing concurrency concern is all that's needed here.
With that said I personally would go for design (which may not be applicable here) where writer simply appends to the queue (well known ferry id) and something else than moves things off the batch from queue into actual ferry. Such approach tends fare better given that no writes depend on read state that can change in the meantime.
Here we deal with tables however so things may not exactly apply. That said in relational dbs it's not uncommon to do similar approach where in one table you simply record things and in the other you record which record from first table is in which group. That also ends up doing appends as instead of updates and consequently avoiding coordination overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate the concerns. So, all dynamoDB stream consumers (the case of this) should only have one mutation to avoid partial failure.
You are right that we need to be careful with read + write, and that is where the DynamoDB ConditionExpression
come in play. Could be interesting for you to go through the table abstraction code https://github.com/web3-storage/w3filecoin/blob/main/data/tables/ferry.js
So, let's go into this case:
- CAR details is written into
car
table - DynamoDB stream consumer kicks in from this insert https://github.com/web3-storage/w3filecoin/blob/main/stacks/data-stack.js#L57 with a batch of CARs to be added into an aggregate
- addCarsToFerry function is called with batch
- Get a ferry with state loading and attempt to add given batch into ferry
- In above mutation is where things can succeed or fail, so let's see them in different cases
First important to look into the addCargo
function https://github.com/web3-storage/w3filecoin/blob/main/data/tables/ferry.js#L54 . It is a TransactWrite command, which means all or nothing. It also includes ConditionExpression that essentially is what solves this concern "because ferryID could change on concurrent calls and I'm not sure of the consequences.". Basically, parallel calls might put the ferry full, or change it state to ready (not accepting more cargo). In that case, this dynamoDb stream consumer fails and will be retried (and with half the load, to make sure use cases where ferry is almost full eventually will get filled in and a new created https://github.com/web3-storage/w3filecoin/blob/main/stacks/data-stack.js#L74)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code comment would of course help out, but is also difficult to specify where is the borderline of adding comments or not. Table abstraction code already covers these conditions, so also adding these code comments in callers would make us need to maintain a lot of duplicated documentation.
// Update state of ferry to ready | ||
try { | ||
await ferryTable.setAsReady(ferryId) | ||
} catch (/** @type {any} */ error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please catch this error in the setAsReady
instead so that it's caller doesn't need to figure which errors it must handle and which one it can ignore. Reading this code I have no idea what conditional ferryTable
is using so it's implementation details end up leaking here.
Ideally all the IO ops should return Result
types with self-describing error types, that way here you'd be able to do if result.error...
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setAsReady
IMHO should not be opinated to catch error if it fails to set as ready because its ConditionExpression
fails... It will only succeed if - ferry stat is in loading state
and its size is enough
(this will likely need to change with commD calc).
Therefore, the consumer of this (lambda function calling this lib function setFerryAsReady
) is the one that should decide if this is a ok error to tolerate. Given consumer of this comes from DynamoDB event streams that can run in parallel, it is possible that multiple calls could happen concurrently. So, first would succeed and second fail because state would not be loading
anymore. And consumer is ok with this failure and won't need to retry it.
try { | ||
await ferryTable.setAsDealPending(ferryId) | ||
} catch (/** @type {any} */ error) { | ||
// If error is for condition we can safely ignore it given this was changed in a concurrent operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like setAsDealPending
is leaking implementation details. Why would it expect caller to know hat conditionals are used by the function ? If those conditions change callers will start ignoring errors they may have to handle.
I would argue that either:
setAsDealPending
catches the errors that callers can ignore.- It returns
Return
type with well defined errors types that tell caller who's responsibility it is to handle them (and I a lot more concrete error types thanConditionalCheckFailed
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same answer as above. We can add return types to make it easier if that is helpful.
Would like to see opinion from @alanshaw given this code was just moved around to individual file but we already covered it before in previous PRs
// Update state of ferry to deal processed | ||
try { | ||
await ferryTable.setAsDealProcessed(ferryId, commP) | ||
} catch (/** @type {any} */ error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same sentiment as with setAsDealPending
.
@@ -120,6 +120,38 @@ export function createFerryTable (region, tableName, options = {}) { | |||
|
|||
await dynamoDb.send(cmd) | |||
}, | |||
/** | |||
* Get cargo from a ferry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would helpful to call out what the cargo is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pointers I can give are in table schema and construct
- https://github.com/web3-storage/w3filecoin/blob/main/stacks/data-stack.js#L35
- https://github.com/web3-storage/w3filecoin/blob/main/data/tables/index.js#L38
Basically we have CARs, we have Ferries, and cargo is a mapping table, that we rely to say which CARs go into a given ferry
@@ -9,7 +9,7 @@ export default function (app) { | |||
app.setDefaultFunctionProps({ | |||
runtime: 'nodejs16.x', | |||
environment: { | |||
NODE_OPTIONS: "--enable-source-maps", | |||
NODE_OPTIONS: "--enable-source-maps --experimental-fetch", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the above a code comment so that reader outside of PR can also know why :P
bdb3d9b
to
e31f20e
Compare
This was added into other PRs as the approach changed for the SQS Based architecture |
This PR integrates aggregation client within w3filecoin. It is integrated in the dynamoDB stream handler that is triggered once Ferry table has entries changing its
stat
toready
. The change toready
happens when an aggregate is ready to be offered, i.e it has enough size (please note that follow up PR needs to address how we calculate enough size, given it takes into account CAR sizes, and not sizes with padding by commD algorithm).Aggregation client is configured with
spade-proxy
endpoints via ENV vars, while w3filecoin hasdid:web:web3.storage
and therefore has its private key as secret.A few extra details for review:
lib/index.js
instead of a file within the lib for each thing given they have super small context.aggregate/offer
Note that
commP
is for now still not in place as we need #11 to land, as well as to havecommP
started to be computed first.Closes #27