Skip to content

Commit

Permalink
db: correctly discard table block contained in snapshot if block was …
Browse files Browse the repository at this point in the history
…persisted

Previously, the code attempted to do this by resetting the active block index.
However, the ULID was left untouched. This would cause a previously persisted
block to lose data when the new empty block's rotation happened. This commit
updates the full block when a NewTableBlock entry is found and the table
exists.
  • Loading branch information
asubiotto committed May 21, 2024
1 parent cbb3e63 commit 755f5b3
Showing 1 changed file with 17 additions and 25 deletions.
42 changes: 17 additions & 25 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,27 +745,13 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
}
switch e := record.Entry.EntryType.(type) {
case *walpb.Entry_TableBlockPersisted_:
persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx
if e.TableBlockPersisted.NextTx > snapshotTx {
// The loaded snapshot has data in a table that has been
// persisted. Delete all data in this table, since it has
// already been persisted.
db.mtx.Lock()
if table, ok := db.tables[e.TableBlockPersisted.TableName]; ok {
table.ActiveBlock().index, err = index.NewLSM(
filepath.Join(table.db.indexDir(), table.name, table.ActiveBlock().ulid.String()), // Any index files are found at <db.indexDir>/<table.name>/<block.id>
table.schema,
table.IndexConfig(),
db.HighWatermark,
index.LSMWithMetrics(table.metrics.indexMetrics),
index.LSMWithLogger(table.logger),
)
if err != nil {
return fmt.Errorf("create new lsm index: %w", err)
}
}
db.mtx.Unlock()
var id ulid.ULID
if err := id.UnmarshalBinary(e.TableBlockPersisted.BlockId); err != nil {
return err
}
persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx
// The loaded snapshot might have persisted data, this is handled in
// the replay loop below.
return nil
default:
return nil
Expand Down Expand Up @@ -802,7 +788,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
return err
}

if nextNonPersistedTxn, ok := persistedTables[entry.TableName]; ok && tx <= nextNonPersistedTxn {
nextNonPersistedTxn, wasPersisted := persistedTables[entry.TableName]
if wasPersisted && tx < nextNonPersistedTxn {
// This block has already been successfully persisted, so we can
// skip it. Note that if this new table block is the active
// block after persistence tx == nextNonPersistedTxn.
Expand Down Expand Up @@ -849,15 +836,20 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
return fmt.Errorf("get table: %w", err)
}

// If we get to this point it means a block was finished but did
// not get persisted.
level.Info(db.logger).Log(
"msg", "writing unfinished block in recovery",
"table", tableName,
"tx", tx,
)
table.pendingBlocks[table.active] = struct{}{}
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false)
if snapshotTx == 0 || tx != nextNonPersistedTxn {
// If we get to this point it means a block was finished but did
// not get persisted. If a snapshot was loaded, then the table
// already exists but the active block is outdated. If
// tx == nextNonPersistedTxn, we should not persist the active
// block, but just create a new block.
table.pendingBlocks[table.active] = struct{}{}
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false)
}

protoEqual := false
switch schema.(type) {
Expand Down

0 comments on commit 755f5b3

Please sign in to comment.