Skip to content

Commit

Permalink
wal: don't wait for ticker to fire to clear pending txns (#563)
Browse files Browse the repository at this point in the history
Previously, if we found a txn below the nextTx, we would pop it but continue
waiting for the timer to fire again before popping the next one. This is not
necessary and can lead to high memory usage.

This commit also avoids pushing these requests to the queue in the first place.
  • Loading branch information
asubiotto authored Oct 13, 2023
1 parent ae16844 commit e2d768b
Showing 1 changed file with 33 additions and 1 deletion.
34 changes: 33 additions & 1 deletion wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ type fileWALMetrics struct {
walQueueSize prometheus.Gauge
}

const dirPerms = os.FileMode(0o750)
const (
dirPerms = os.FileMode(0o750)
progressLogTimeout = 10 * time.Second
)

type FileWAL struct {
logger log.Logger
Expand All @@ -94,6 +97,9 @@ type FileWAL struct {
// segmentSize indicates what the underlying WAL segment size is. This helps
// the run goroutine size batches more or less appropriately.
segmentSize int
// lastTimeProgressWasMade tracks just that and serves to log information in
// case the WAL has not made progress in some time.
lastTimeProgressWasMade time.Time

cancel func()
shutdownCh chan struct{}
Expand Down Expand Up @@ -271,7 +277,19 @@ func (w *FileWAL) run(ctx context.Context) {
)
_ = heap.Pop(&w.protected.queue)
w.metrics.walQueueSize.Sub(1)
// Keep on going since there might be other transactions
// below this one.
continue
}
if sinceProgress := time.Since(w.lastTimeProgressWasMade); sinceProgress > progressLogTimeout {
level.Info(w.logger).Log(
"msg", "wal has not made progress",
"since", sinceProgress,
"next_expected_tx", w.protected.nextTx,
"min_tx", minTx,
)
}
// Next expected tx has not yet been seen.
break
}
r := heap.Pop(&w.protected.queue).(*logRequest)
Expand All @@ -280,6 +298,7 @@ func (w *FileWAL) run(ctx context.Context) {
batchSize += len(r.data)
w.protected.nextTx++
}
w.lastTimeProgressWasMade = time.Now()
// truncateTx will be non-zero if we either are about to log a
// record with a txn past the txn to truncate, or we have logged one
// in the past.
Expand Down Expand Up @@ -434,6 +453,19 @@ func (w *FileWAL) writeRecord(buf *bytes.Buffer, record arrow.Record) error {
}

func (w *FileWAL) LogRecord(tx uint64, txnMetadata []byte, table string, record arrow.Record) error {
w.protected.Lock()
nextTx := w.protected.nextTx
w.protected.Unlock()
if tx < nextTx {
// Transaction should not be logged. This could happen if a truncation
// has been issued simultaneously as logging a WAL record.
level.Warn(w.logger).Log(
"msg", "attempted to log txn below next expected txn",
"tx", tx,
"next_tx", nextTx,
)
return nil
}
buf := w.getArrowBuf()
defer w.putArrowBuf(buf)
if err := w.writeRecord(buf, record); err != nil {
Expand Down

0 comments on commit e2d768b

Please sign in to comment.