From 755f5b355c44330bb07fa2e14b035cff026f7df0 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Tue, 21 May 2024 17:32:27 +0200 Subject: [PATCH] db: correctly discard table block contained in snapshot if block was persisted Previously, the code attempted to do this by resetting the active block index. However, the ULID was left untouched. This would cause a previously persisted block to lose data when the new empty block's rotation happened. This commit updates the full block when a NewTableBlock entry is found and the table exists. --- db.go | 42 +++++++++++++++++------------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/db.go b/db.go index 3365fc6fa0..28a57700c6 100644 --- a/db.go +++ b/db.go @@ -745,27 +745,13 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { } switch e := record.Entry.EntryType.(type) { case *walpb.Entry_TableBlockPersisted_: - persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx - if e.TableBlockPersisted.NextTx > snapshotTx { - // The loaded snapshot has data in a table that has been - // persisted. Delete all data in this table, since it has - // already been persisted. - db.mtx.Lock() - if table, ok := db.tables[e.TableBlockPersisted.TableName]; ok { - table.ActiveBlock().index, err = index.NewLSM( - filepath.Join(table.db.indexDir(), table.name, table.ActiveBlock().ulid.String()), // Any index files are found at // - table.schema, - table.IndexConfig(), - db.HighWatermark, - index.LSMWithMetrics(table.metrics.indexMetrics), - index.LSMWithLogger(table.logger), - ) - if err != nil { - return fmt.Errorf("create new lsm index: %w", err) - } - } - db.mtx.Unlock() + var id ulid.ULID + if err := id.UnmarshalBinary(e.TableBlockPersisted.BlockId); err != nil { + return err } + persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx + // The loaded snapshot might have persisted data, this is handled in + // the replay loop below. return nil default: return nil @@ -802,7 +788,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { return err } - if nextNonPersistedTxn, ok := persistedTables[entry.TableName]; ok && tx <= nextNonPersistedTxn { + nextNonPersistedTxn, wasPersisted := persistedTables[entry.TableName] + if wasPersisted && tx < nextNonPersistedTxn { // This block has already been successfully persisted, so we can // skip it. Note that if this new table block is the active // block after persistence tx == nextNonPersistedTxn. @@ -849,15 +836,20 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { return fmt.Errorf("get table: %w", err) } - // If we get to this point it means a block was finished but did - // not get persisted. level.Info(db.logger).Log( "msg", "writing unfinished block in recovery", "table", tableName, "tx", tx, ) - table.pendingBlocks[table.active] = struct{}{} - go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false) + if snapshotTx == 0 || tx != nextNonPersistedTxn { + // If we get to this point it means a block was finished but did + // not get persisted. If a snapshot was loaded, then the table + // already exists but the active block is outdated. If + // tx == nextNonPersistedTxn, we should not persist the active + // block, but just create a new block. + table.pendingBlocks[table.active] = struct{}{} + go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false) + } protoEqual := false switch schema.(type) {