Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dst: bug and test fixes #874

Merged
merged 6 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 13 additions & 25 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,26 +746,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
switch e := record.Entry.EntryType.(type) {
case *walpb.Entry_TableBlockPersisted_:
persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx
if e.TableBlockPersisted.NextTx > snapshotTx {
// The loaded snapshot has data in a table that has been
// persisted. Delete all data in this table, since it has
// already been persisted.
db.mtx.Lock()
if table, ok := db.tables[e.TableBlockPersisted.TableName]; ok {
table.ActiveBlock().index, err = index.NewLSM(
filepath.Join(table.db.indexDir(), table.name, table.ActiveBlock().ulid.String()), // Any index files are found at <db.indexDir>/<table.name>/<block.id>
table.schema,
table.IndexConfig(),
db.HighWatermark,
index.LSMWithMetrics(table.metrics.indexMetrics),
index.LSMWithLogger(table.logger),
)
if err != nil {
return fmt.Errorf("create new lsm index: %w", err)
}
}
db.mtx.Unlock()
}
// The loaded snapshot might have persisted data, this is handled in
// the replay loop below.
return nil
default:
return nil
Expand Down Expand Up @@ -802,7 +784,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
return err
}

if nextNonPersistedTxn, ok := persistedTables[entry.TableName]; ok && tx <= nextNonPersistedTxn {
nextNonPersistedTxn, wasPersisted := persistedTables[entry.TableName]
if wasPersisted && tx < nextNonPersistedTxn {
// This block has already been successfully persisted, so we can
// skip it. Note that if this new table block is the active
// block after persistence tx == nextNonPersistedTxn.
Expand Down Expand Up @@ -849,15 +832,20 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
return fmt.Errorf("get table: %w", err)
}

// If we get to this point it means a block was finished but did
// not get persisted.
level.Info(db.logger).Log(
"msg", "writing unfinished block in recovery",
"table", tableName,
"tx", tx,
)
table.pendingBlocks[table.active] = struct{}{}
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false)
if snapshotTx == 0 || tx != nextNonPersistedTxn {
// If we get to this point it means a block was finished but did
// not get persisted. If a snapshot was loaded, then the table
// already exists but the active block is outdated. If
// tx == nextNonPersistedTxn, we should not persist the active
// block, but just create a new block.
table.pendingBlocks[table.active] = struct{}{}
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false)
}

protoEqual := false
switch schema.(type) {
Expand Down
25 changes: 24 additions & 1 deletion dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/polarsignals/wal/types"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"go.uber.org/goleak"
"golang.org/x/sync/errgroup"

"github.com/polarsignals/frostdb"
Expand Down Expand Up @@ -337,7 +338,6 @@ func TestDST(t *testing.T) {
}, walTicker,
)
require.NoError(t, err)
defer c.Close()

ctx := context.Background()
var db atomic.Pointer[frostdb.DB]
Expand Down Expand Up @@ -371,6 +371,8 @@ func TestDST(t *testing.T) {
errg := &errgroup.Group{}
errg.SetLimit(32)
commandDistribution := make(map[command]int)

ignoreGoroutinesAtStartOfTest := goleak.IgnoreCurrent()
for i := 0; i < numCommands; i++ {
cmd := genCommand()
commandDistribution[cmd]++
Expand Down Expand Up @@ -417,6 +419,23 @@ func TestDST(t *testing.T) {
time.Sleep(1 * time.Millisecond)
// Graceful shutdown.
require.NoError(t, c.Close())
_ = errg.Wait()

// Unfortunately frostdb doesn't have goroutine lifecycle management
// and adding it could lead to subtle issues (e.g. on Close with
// many DBs). Instead, this test simply verifies all goroutines
// spawned up until this restart eventually exit after n retries.
const maxRetries = 10
for i := 0; i < maxRetries; i++ {
if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil {
break
} else if i == maxRetries-1 {
t.Fatalf("leaked goroutines found on Close: %v", err)
} else {
time.Sleep(1 * time.Millisecond)
}
}

storeID++
c, err = newStore(
storageDir,
Expand Down Expand Up @@ -462,6 +481,10 @@ func TestDST(t *testing.T) {
t.Log("snapshot files:", listFiles("snapshots"))
t.Log("WAL files:", listFiles("wal"))

// Defer a close here. This is not done at the start of the test because
// the test run itself may close the store.
defer c.Close()

timestampSum := &int64checksum{}
readTimestamps := make(map[int64]int)
expectedTimestamps := make(map[int64]struct{})
Expand Down
2 changes: 1 addition & 1 deletion index/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (l *LSM) merge(level SentinelType) error {
// Find the first part that is <= the watermark and reset the compact list to that part.
wm := l.watermark()
compact.Iterate(func(node *Node) bool {
if node.part != nil && node.sentinel != L0 {
if node.part == nil && node.sentinel != L0 {
return false
}
if node.part.TX() <= wm {
Expand Down
15 changes: 9 additions & 6 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,12 @@ func (t *Table) dropPendingBlock(block *TableBlock) {
}

func (t *Table) writeBlock(block *TableBlock, nextTxn uint64, skipPersist, snapshotDB bool) {
level.Debug(t.logger).Log("msg", "syncing block")
level.Debug(t.logger).Log("msg", "syncing block", "ulid", block.ulid, "size", block.index.Size())
block.pendingWritersWg.Wait()

// from now on, the block will no longer be modified, we can persist it to disk

level.Debug(t.logger).Log("msg", "done syncing block")
level.Debug(t.logger).Log("msg", "done syncing block", "ulid", block.ulid, "size", block.index.Size())

// Persist the block
var err error
Expand Down Expand Up @@ -614,10 +614,13 @@ func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bo
return nil
}

level.Debug(t.logger).Log("msg", "rotating block", "blockSize", block.Size(), "skipPersist", skipPersist)
defer func() {
level.Debug(t.logger).Log("msg", "done rotating block")
}()
level.Debug(t.logger).Log(
"msg", "rotating block",
"ulid", block.ulid,
"size", block.Size(),
"skip_persist", skipPersist,
)
defer level.Debug(t.logger).Log("msg", "done rotating block", "ulid", block.ulid)

tx, _, commit := t.db.begin()
defer commit()
Expand Down
6 changes: 3 additions & 3 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math"
"os"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -261,8 +260,9 @@ func Open(
}

func (w *FileWAL) run(ctx context.Context) {
const defaultTickTime = 50 * time.Millisecond
if w.ticker == nil {
w.ticker = realTicker{Ticker: time.NewTicker(50 * time.Millisecond)}
w.ticker = realTicker{Ticker: time.NewTicker(defaultTickTime)}
}
defer w.ticker.Stop()
// lastQueueSize is only used on shutdown to reduce debug logging verbosity.
Expand Down Expand Up @@ -291,7 +291,7 @@ func (w *FileWAL) run(ctx context.Context) {

if n == lastQueueSize {
// No progress made.
runtime.Gosched()
time.Sleep(defaultTickTime)
continue
}

Expand Down
Loading