diff --git a/db.go b/db.go index 3365fc6fa0..28a57700c6 100644 --- a/db.go +++ b/db.go @@ -745,27 +745,13 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { } switch e := record.Entry.EntryType.(type) { case *walpb.Entry_TableBlockPersisted_: - persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx - if e.TableBlockPersisted.NextTx > snapshotTx { - // The loaded snapshot has data in a table that has been - // persisted. Delete all data in this table, since it has - // already been persisted. - db.mtx.Lock() - if table, ok := db.tables[e.TableBlockPersisted.TableName]; ok { - table.ActiveBlock().index, err = index.NewLSM( - filepath.Join(table.db.indexDir(), table.name, table.ActiveBlock().ulid.String()), // Any index files are found at // - table.schema, - table.IndexConfig(), - db.HighWatermark, - index.LSMWithMetrics(table.metrics.indexMetrics), - index.LSMWithLogger(table.logger), - ) - if err != nil { - return fmt.Errorf("create new lsm index: %w", err) - } - } - db.mtx.Unlock() + var id ulid.ULID + if err := id.UnmarshalBinary(e.TableBlockPersisted.BlockId); err != nil { + return err } + persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx + // The loaded snapshot might have persisted data, this is handled in + // the replay loop below. return nil default: return nil @@ -802,7 +788,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { return err } - if nextNonPersistedTxn, ok := persistedTables[entry.TableName]; ok && tx <= nextNonPersistedTxn { + nextNonPersistedTxn, wasPersisted := persistedTables[entry.TableName] + if wasPersisted && tx < nextNonPersistedTxn { // This block has already been successfully persisted, so we can // skip it. Note that if this new table block is the active // block after persistence tx == nextNonPersistedTxn. @@ -849,15 +836,20 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { return fmt.Errorf("get table: %w", err) } - // If we get to this point it means a block was finished but did - // not get persisted. level.Info(db.logger).Log( "msg", "writing unfinished block in recovery", "table", tableName, "tx", tx, ) - table.pendingBlocks[table.active] = struct{}{} - go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false) + if snapshotTx == 0 || tx != nextNonPersistedTxn { + // If we get to this point it means a block was finished but did + // not get persisted. If a snapshot was loaded, then the table + // already exists but the active block is outdated. If + // tx == nextNonPersistedTxn, we should not persist the active + // block, but just create a new block. + table.pendingBlocks[table.active] = struct{}{} + go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false) + } protoEqual := false switch schema.(type) {