Skip to content

Commit

Permalink
Add backfilling transactions bloom to freezer
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Nov 19, 2024
1 parent 230f515 commit 4c9f7be
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 4 deletions.
94 changes: 93 additions & 1 deletion core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
return
}
}

threshold, err := f.freezeThreshold(nfdb)
if err != nil {
backoff = true
Expand All @@ -200,6 +201,50 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
}
frozen, _ := f.Ancients() // no error will occur, safe to ignore

// Back fill transactions bloom untill it catches up then resume normal freezing
txbFrozen, err := f.AncientItems(ChainFreezerTransactionBloomTable)
if err != nil {
log.Error("Failed to check frozen transaction bloom", "err", err)
backoff = true
continue
}

if txbFrozen < frozen {
var (
first = txbFrozen
last = threshold
)
if last-first+1 > freezerBatchLimit {
last = freezerBatchLimit + first - 1
}
// Don't go ahead of the rest of the frozen datas
if last > frozen {
last = frozen - 1
}

log.Debug("Freezing historical tx bloom", "from", first, "to", last)
txAncients, err := f.freezeTxBloomRange(nfdb, first, last)
if err != nil {
log.Error("Error in tx bloom freeze operation", "err", err)
backoff = true
continue
}

// Wipe out all data from the active database
batch := db.NewBatch()
for i := 0; i < len(txAncients); i++ {
// Always keep the genesis block in active database
if first+uint64(i) != 0 {
DeleteTxBloom(batch, txAncients[i], first+uint64(i))
}
}
if err := batch.Write(); err != nil {
log.Crit("Failed to delete frozen canonical blocks", "err", err)
}
batch.Reset()
continue
}

// Short circuit if the blocks below threshold are already frozen.
if frozen != 0 && frozen-1 >= threshold {
backoff = true
Expand Down Expand Up @@ -357,7 +402,7 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
// TODO this can throw an error when rewinding to a block
txBloom := ReadTxBloomRLP(nfdb, hash, number)
if len(txBloom) == 0 && !outOfShard {
return fmt.Errorf("total transaction bloom, can't freeze block %d", number)
return fmt.Errorf("total transaction bloom missing, can't freeze block %d", number)
}

// Write to the batch.
Expand Down Expand Up @@ -386,3 +431,50 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
})
return hashes, err
}

// Back fill transactions bloom data
func (f *chainFreezer) freezeTxBloomRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
hashes = make([]common.Hash, 0, limit-number+1)

dataConfig := ReadChainDataConfig(nfdb)

_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for ; number <= limit; number++ {

// If the data is out of the shard range then we allow writing empty data, this will allow truncating the tail of the freezer later
outOfShard := false
if dataConfig != nil && dataConfig.DesiredChainDataStart != nil {
outOfShard = number < *dataConfig.DesiredChainDataStart
}

// Retrieve all the components of the canonical block.
hash := ReadCanonicalHash(nfdb, number)
if hash == (common.Hash{}) {
// Get the hash from the freezer, its probably already frozen
data, err := f.AncientStore.Ancient(ChainFreezerHashTable, number)
if err != nil || len(data) == 0 {
return fmt.Errorf("canonical hash missing from freezer, can't freeze block %d", number)
}
hash = common.BytesToHash(data)
if hash == (common.Hash{}) && !outOfShard {
return fmt.Errorf("canonical hash missing, can't freeze block %d", number)
}
}
// TODO this can throw an error when rewinding to a block
// This can happen when the tx bloom indexer has not yet indexed the block, it will abort the current batch but eventually complete
txBloom := ReadTxBloomRLP(nfdb, hash, number)
if len(txBloom) == 0 && !outOfShard {
return fmt.Errorf("total transaction bloom missing, can't freeze block %d", number)
}

// Write to the batch.
if err := op.AppendRaw(ChainFreezerTransactionBloomTable, number, txBloom); err != nil {
return fmt.Errorf("can't write transaction bloom to Freezer: %v", err)
}

hashes = append(hashes, hash)
}
return nil
})
return hashes, err
}
5 changes: 5 additions & 0 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (db *nofreezedb) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}

// AncientItems returns the number of items the ancient of the specified category.
func (db *nofreezedb) AncientItems(kind string) (uint64, error) {
return 0, errNotSupported
}

// ModifyAncients is not supported.
func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) {
return 0, errNotSupported
Expand Down
19 changes: 18 additions & 1 deletion core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,19 @@ func (f *Freezer) AncientSize(kind string) (uint64, error) {
return 0, errUnknownTable
}

// AncientItems returns the number of items the ancient of the specified category.
func (f *Freezer) AncientItems(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
// Speed doesn't matter here, AncientSize is for debugging.
f.writeLock.RLock()
defer f.writeLock.RUnlock()

if table := f.tables[kind]; table != nil {
return table.items.Load(), nil
}
return 0, errUnknownTable
}

// ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer.
func (f *Freezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
Expand Down Expand Up @@ -369,9 +382,13 @@ func (f *Freezer) repair() error {
head = uint64(math.MaxUint64)
tail = uint64(0)
)
for _, table := range f.tables {
for kind, table := range f.tables {
items := table.items.Load()
if head > items {
// Allow the transactions bloom to be behind, this will be backfilled later on
if kind == ChainFreezerTransactionBloomTable {
continue
}
head = items
}
hidden := table.itemHidden.Load()
Expand Down
19 changes: 17 additions & 2 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,26 @@ func (batch *freezerBatch) reset() {
func (batch *freezerBatch) commit() (item uint64, writeSize int64, err error) {
// Check that count agrees on all batches.
item = uint64(math.MaxUint64)
maxData := uint64(math.MaxUint64)

// This has been modified for transactions bloom back filling
// Check that tables with data have the same count
for name, tb := range batch.tables {
if tb.totalBytes > 0 {
if maxData == uint64(math.MaxUint64) {
maxData = tb.curItem
} else if maxData != tb.curItem {
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
}
}
}
// Find the max value
for name, tb := range batch.tables {
if item < math.MaxUint64 && tb.curItem != item {
if item == uint64(math.MaxUint64) || item < tb.curItem {
item = tb.curItem
} else if item != tb.curItem && tb.totalBytes == 0 {
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, tb.curItem, item)
}
item = tb.curItem
}

// Commit all table batches.
Expand Down
11 changes: 11 additions & 0 deletions core/rawdb/freezer_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ func (f *MemoryFreezer) AncientSize(kind string) (uint64, error) {
return 0, errUnknownTable
}

// AncientItems returns the number of items the ancient of the specified category.
func (f *MemoryFreezer) AncientItems(kind string) (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()

if table := f.tables[kind]; table != nil {
return table.items, nil
}
return 0, errUnknownTable
}

// ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer.
func (f *MemoryFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
Expand Down
8 changes: 8 additions & 0 deletions core/rawdb/freezer_resettable.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ func (f *resettableFreezer) AncientSize(kind string) (uint64, error) {
return f.freezer.AncientSize(kind)
}

// AncientItems returns the number of items the ancient of the specified category.
func (f *resettableFreezer) AncientItems(kind string) (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()

return f.freezer.AncientItems(kind)
}

// ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer.
func (f *resettableFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
Expand Down
6 changes: 6 additions & 0 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (t *table) AncientSize(kind string) (uint64, error) {
return t.db.AncientSize(kind)
}

// AncientItems is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) AncientItems(kind string) (uint64, error) {
return t.db.AncientItems(kind)
}

// ModifyAncients runs an ancient write operation on the underlying database.
func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, error) {
return t.db.ModifyAncients(fn)
Expand Down
3 changes: 3 additions & 0 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type AncientReaderOp interface {

// AncientSize returns the ancient size of the specified category.
AncientSize(kind string) (uint64, error)

// AncientItems returns the number of items the ancient of the specified category.
AncientItems(kind string) (uint64, error)
}

// AncientReader is the extended ancient reader interface including 'batched' or 'atomic' reading.
Expand Down
4 changes: 4 additions & 0 deletions ethdb/remotedb/remotedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (db *Database) AncientSize(kind string) (uint64, error) {
panic("not supported")
}

func (db *Database) AncientItems(kind string) (uint64, error) {
panic("not supported")
}

func (db *Database) ReadAncients(fn func(op ethdb.AncientReaderOp) error) (err error) {
return fn(db)
}
Expand Down

0 comments on commit 4c9f7be

Please sign in to comment.