Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: columns to Eth2Processor and BlockProcessor #6862

Draft
wants to merge 35 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6eec230
init: columns to the block/eth2 processor
agnxsh Jan 19, 2025
e67ee1a
add columns to message router
agnxsh Jan 20, 2025
d30e893
add columns to initializers of Eth2 and BlockProcessor
agnxsh Jan 20, 2025
1f222c6
save progress
agnxsh Jan 20, 2025
632f7a2
save progress 2
agnxsh Jan 20, 2025
eac6fa1
add column to block verifier
agnxsh Jan 20, 2025
a5d0362
save progress, need to rework untrusted syncing
agnxsh Jan 21, 2025
24397c8
add column support to light forward sync
agnxsh Jan 21, 2025
5ae6bf1
save progress test sync manager
agnxsh Jan 21, 2025
00d8322
fix createDataColumns
agnxsh Jan 21, 2025
444b915
fix more
agnxsh Jan 21, 2025
f46733a
added fulu message handlers for column subnets
agnxsh Jan 21, 2025
eba1f76
activated data column sidecar processing at Fulu
agnxsh Jan 21, 2025
3f9edd3
fix compilation issues
agnxsh Jan 22, 2025
7104108
added to T list
agnxsh Jan 22, 2025
45fa2fa
other fixes
agnxsh Jan 22, 2025
c2f8ea5
fix test
agnxsh Jan 22, 2025
1196e54
fix result situation in get data column sidecars
agnxsh Jan 24, 2025
26e2fee
fix message router issue
agnxsh Jan 24, 2025
c86cef3
gate blob publishing upto deneb
agnxsh Jan 24, 2025
8369661
fix message router blob and column progressions
agnxsh Jan 24, 2025
4052e6a
drop dataColumnOpt from message router
agnxsh Jan 24, 2025
b31706e
reversing rman blockVerifier order
agnxsh Jan 25, 2025
512a740
fixes
agnxsh Jan 25, 2025
a38be17
several fixes
agnxsh Jan 25, 2025
947a12f
added debug logs for devnet testing
agnxsh Jan 25, 2025
5f169d9
add blobsOpt isSome check
agnxsh Jan 25, 2025
3ea4f12
fix copyright years
agnxsh Jan 25, 2025
2250fab
couple of fixes and debug logs
agnxsh Jan 26, 2025
6251bce
fix issue
agnxsh Jan 27, 2025
2a4f495
resolved review comments, enabled more debug logs, fixed a couple of …
agnxsh Jan 27, 2025
f7728ca
fix indentation
agnxsh Jan 27, 2025
7fa339c
limit processBlobSidecar < Fulu
agnxsh Jan 27, 2025
ea5c2a0
try to gate a few operations to < Fulu
agnxsh Jan 27, 2025
bd0d1b7
gate more
agnxsh Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions beacon_chain/beacon_chain_file.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ const
int(ConsensusFork.Phase0) .. int(high(ConsensusFork))
BlobForkCodeRange =
MaxForksCount .. (MaxForksCount + int(high(ConsensusFork)) - int(ConsensusFork.Deneb))
DataColumnForkCodeRange =
MaxForksCount .. (MaxForksCount + int(high(ConsensusFork)) - int(ConsensusFork.Fulu))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect range because it overlaps with Blobs, the idea is
[blocks range][blobs range][columns range] ... high(int)
Allocated range for blocks is [0 .. 16383]
Allocated range for blobs is [16384 .. 32767]
So for blobs you should allocate [32768 .. 49151].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  DataColumnForkCodeRange =
    (BlobForkCodeRange.high + 1) .. (BlobForkCodeRange.high + 1 + int(high(ConsensusFork)) - int(ConsensusFork.Fulu))

around this value?

Copy link
Contributor

@cheatfate cheatfate Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like sane range, but i already put my recommendation, its better if you start from 32768.

  DataColumnForkCodeRange =
    MaxForksCount * 2 .. (MaxForksCount * 2 + int(high(ConsensusFork)) - int(ConsensusFork.Fulu))


func getBlockForkCode(fork: ConsensusFork): uint64 =
uint64(fork)
Expand All @@ -94,6 +96,13 @@ func getBlobForkCode(fork: ConsensusFork): uint64 =
of ConsensusFork.Phase0 .. ConsensusFork.Capella:
raiseAssert "Blobs are not supported for the fork"

func getDataColumnForkCode(fork: ConsensusFork): uint64 =
case fork
of ConsensusFork.Fulu:
uint64(MaxForksCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of invalid code range it provides invalid codes which overlaps with blobs range.

of ConsensusFork.Phase0 .. ConsensusFork.Electra:
raiseAssert "Data columns are not supported for the fork"

proc init(t: typedesc[ChainFileError], k: ChainFileErrorType,
m: string): ChainFileError =
ChainFileError(kind: k, message: m)
Expand Down Expand Up @@ -134,7 +143,8 @@ proc checkKind(kind: uint64): Result[void, string] =
if res > uint64(high(int)):
return err("Unsuppoted chunk kind value")
int(res)
if (hkind in BlockForkCodeRange) or (hkind in BlobForkCodeRange):
if (hkind in BlockForkCodeRange) or (hkind in BlobForkCodeRange) or
(hkind in DataColumnForkCodeRange):
ok()
else:
err("Unsuppoted chunk kind value")
Expand Down Expand Up @@ -260,6 +270,12 @@ template getBlobChunkKind(kind: ConsensusFork, last: bool): uint64 =
else:
getBlobForkCode(kind)

template getDataColumnChunkKind(kind: ConsensusFork,last: bool): uint64 =
if last:
maskKind(getDataColumnForkCode(kind))
else:
getDataColumnForkCode(kind)

proc getBlockConsensusFork(header: ChainFileHeader): ConsensusFork =
let hkind = unmaskKind(header.kind)
if int(hkind) in BlockForkCodeRange:
Expand All @@ -275,6 +291,10 @@ template isBlob(h: ChainFileHeader | ChainFileFooter): bool =
let hkind = unmaskKind(h.kind)
int(hkind) in BlobForkCodeRange

template isDataColumn(h: ChainFileHeader | ChainFileFooter): bool =
let hkind = unmaskKind(h.kind)
int(hkind) in DataColumnForkCodeRange

template isLast(h: ChainFileHeader | ChainFileFooter): bool =
h.kind.isLast()

Expand All @@ -291,7 +311,8 @@ proc setTail*(chandle: var ChainFileHandle, bdata: BlockData) =
chandle.data.tail = Opt.some(bdata)

proc store*(chandle: ChainFileHandle, signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars]): Result[void, string] =
blobs: Opt[BlobSidecars], dataColumns: Opt[DataColumnSidecars]):
Result[void, string] =
let origOffset =
updateFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).valueOr:
return err(ioErrorMsg(error))
Expand Down Expand Up @@ -342,6 +363,36 @@ proc store*(chandle: ChainFileHandle, signedBlock: ForkedSignedBeaconBlock,
discard fsync(chandle.handle)
return err(IncompleteWriteError)

if dataColumns.isSome():
let dataColumnSidecars =
dataColumns.get
for index, dataColumn in dataColumnSidecars.pairs():
let
kind =
getDataColumnChunkKind(signedBlock.kind, (index + 1) ==
len(dataColumnSidecars))
(data, plainSize) =
block:
let res = SSZ.encode(dataColumn[])
(snappy.encode(res), len(res))
slot = dataColumn[].signed_block_header.message.slot
buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data)

setFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).isOkOr:
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(ioErrorMsg(error))

let
wrote = writeFile(chandle.handle, buffer).valueOr:
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(ioErrorMsg(error))
if wrote != uint(len(buffer)):
discard truncate(chandle.handle, origOffset)
discard fsync(chandle.handle)
return err(IncompleteWriteError)

fsync(chandle.handle).isOkOr:
discard truncate(chandle.handle, origOffset)
return err(ioErrorMsg(error))
Expand Down Expand Up @@ -550,6 +601,22 @@ proc decodeBlob(
return err("Incorrect blob format")
ok(blob)

proc decodeDataColumn(
header: ChainFileHeader,
data: openArray[byte],
): Result[DataColumnSidecar, string] =
if header.plainSize > uint32(MaxChunkSize):
return err("Size of data column is enormously big")

let
decompressed = snappy.decode(data, uint32(header.plainSize))
dataColumn =
try:
SSZ.decode(decompressed, DataColumnSidecar)
except SerializationError:
return err("Incorrect data column format")
ok(dataColumn)

proc getChainFileTail*(handle: IoHandle): Result[Opt[BlockData], string] =
var sidecars: BlobSidecars
while true:
Expand Down
1 change: 1 addition & 0 deletions beacon_chain/consensus_object_pools/block_pools_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ type
BlockData* = object
blck*: ForkedSignedBeaconBlock
blob*: Opt[BlobSidecars]
dataColumn*: Opt[DataColumnSidecars]

OnBlockAdded*[T: ForkyTrustedSignedBeaconBlock] = proc(
blckRef: BlockRef, blck: T, epochRef: EpochRef,
Expand Down
47 changes: 38 additions & 9 deletions beacon_chain/consensus_object_pools/blockchain_list.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import std/sequtils, stew/io2, chronicles, chronos, metrics,
../spec/forks,
../spec/peerdas_helpers,
../[beacon_chain_file, beacon_clock],
../sszdump

Expand Down Expand Up @@ -128,16 +129,17 @@ proc setTail*(clist: ChainListRef, bdata: BlockData) =
clist.handle = Opt.some(handle)

proc store*(clist: ChainListRef, signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars]): Result[void, string] =
blobs: Opt[BlobSidecars], dataColumns: Opt[DataColumnSidecars]):
Result[void, string] =
if clist.handle.isNone():
let
filename = clist.path.chainFilePath()
flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways}
handle = ? ChainFileHandle.init(filename, flags)
clist.handle = Opt.some(handle)
store(handle, signedBlock, blobs)
store(handle, signedBlock, blobs, dataColumns)
else:
store(clist.handle.get(), signedBlock, blobs)
store(clist.handle.get(), signedBlock, blobs, dataColumns)

proc checkBlobs(signedBlock: ForkedSignedBeaconBlock,
blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] =
Expand Down Expand Up @@ -167,9 +169,31 @@ proc checkBlobs(signedBlock: ForkedSignedBeaconBlock,
return err(VerifierError.Invalid)
ok()

proc checkDataColumns*(signedBlock: ForkedSignedBeaconBlock,
dataColumnsOpt: Opt[DataColumnSidecars]):
Result[void, VerifierError] =
withBlck(signedBlock):
when consensusFork >= ConsensusFork.Fulu:
if dataColumnsOpt.isSome:
let dataColumns = dataColumnsOpt.get()
if dataColumns.len > 0:
for i in 0..<dataColumns.len:
let r =
verify_data_column_sidecar_kzg_proofs(dataColumns[i][])
if r.isErr:
debug "Data column validation failed",
blockRoot = shortLog(forkyBlck.root),
dataColumn = shortLog(dataColumns[i][]),
blck = shortLog(forkyBlck.message),
signature = shortLog(forkyBlck.signature),
msg = r.error()
return err(VerifierError.Invalid)


proc addBackfillBlockData*(
clist: ChainListRef, signedBlock: ForkedSignedBeaconBlock,
blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] =
blobsOpt: Opt[BlobSidecars], dataColumnsOpt: Opt[DataColumnSidecars]):
Result[void, VerifierError] =
doAssert(not(isNil(clist)))

logScope:
Expand All @@ -182,15 +206,17 @@ proc addBackfillBlockData*(

if clist.tail.isNone():
? checkBlobs(signedBlock, blobsOpt)
? checkDataColumns(signedBlock, dataColumnsOpt)

let storeBlockTick = Moment.now()

store(clist, signedBlock, blobsOpt).isOkOr:
store(clist, signedBlock, blobsOpt, dataColumnsOpt).isOkOr:
fatal "Unexpected failure while trying to store data",
filename = chainFilePath(clist.path), reason = error
quit 1

let bdata = BlockData(blck: signedBlock, blob: blobsOpt)
let bdata = BlockData(blck: signedBlock, blob: blobsOpt,
dataColumn: dataColumnsOpt)
clist.setTail(bdata)
if clist.head.isNone():
clist.setHead(bdata)
Expand Down Expand Up @@ -219,10 +245,11 @@ proc addBackfillBlockData*(
return err(VerifierError.MissingParent)

? checkBlobs(signedBlock, blobsOpt)
? checkDataColumns(signedBlock, dataColumnsOpt)

let storeBlockTick = Moment.now()

store(clist, signedBlock, blobsOpt).isOkOr:
store(clist, signedBlock, blobsOpt, dataColumnsOpt).isOkOr:
fatal "Unexpected failure while trying to store data",
filename = chainFilePath(clist.path), reason = error
quit 1
Expand All @@ -231,17 +258,19 @@ proc addBackfillBlockData*(
verify_block_duration = shortLog(storeBlockTick - verifyBlockTick),
store_block_duration = shortLog(Moment.now() - storeBlockTick)

clist.setTail(BlockData(blck: signedBlock, blob: blobsOpt))
clist.setTail(BlockData(blck: signedBlock, blob: blobsOpt, dataColumn: dataColumnsOpt))

ok()

proc untrustedBackfillVerifier*(
clist: ChainListRef,
signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
dataColumns: Opt[DataColumnSidecars],
maybeFinalized: bool
): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError], raw: true).} =
let retFuture = newFuture[Result[void, VerifierError]]()
retFuture.complete(clist.addBackfillBlockData(signedBlock, blobs))
retFuture.complete(clist.addBackfillBlockData(signedBlock, blobs,
dataColumns))
retFuture
11 changes: 5 additions & 6 deletions beacon_chain/consensus_object_pools/data_column_quarantine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,12 @@ func hasMissingDataColumns*(quarantine: DataColumnQuarantine,
index: idx)
if dc_identifier notin quarantine.data_columns:
inc col_counter
if quarantine.supernode and col_counter != NUMBER_OF_COLUMNS:
return false
elif quarantine.supernode == false and
col_counter != max(SAMPLES_PER_SLOT, CUSTODY_REQUIREMENT):
return false
else:
if quarantine.supernode and col_counter == NUMBER_OF_COLUMNS:
return true
if quarantine.supernode == false and
col_counter == max(SAMPLES_PER_SLOT, CUSTODY_REQUIREMENT):
return true
false

func hasEnoughDataColumns*(quarantine: DataColumnQuarantine,
blck: fulu.SignedBeaconBlock): bool =
Expand Down
Loading
Loading