From ae168445d83add0a76a7b3a176571328520141f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alfonso=20Subiotto=20Marqu=C3=A9s?= Date: Thu, 12 Oct 2023 18:13:37 +0200 Subject: [PATCH] wal: reduce in-use bytes in scenarios with many (and possibly large) writes (#562) * wal: remove reference to logRequest when popping from slice This allows the GC to collect logRequests that are not in use. * wal: size batches according to segment size and track WAL queue Previously, we weren't controlling the total size of WAL entries written at once to the WAL. The underlying WAL does not rotate a file until after a batch is written, so could allocate a lot (we've seen >GB in production) for the commit buffer. This commit should alleviate in-use bytes for cases where lots of [large] entries are written. --- wal/wal.go | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index 97b4d45b3..3e6458c90 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -64,6 +64,7 @@ type fileWALMetrics struct { walRepairs prometheus.Counter walRepairsLostRecords prometheus.Counter walCloseTimeouts prometheus.Counter + walQueueSize prometheus.Gauge } const dirPerms = os.FileMode(0o750) @@ -90,6 +91,10 @@ type FileWAL struct { nextTx uint64 } + // segmentSize indicates what the underlying WAL segment size is. This helps + // the run goroutine size batches more or less appropriately. + segmentSize int + cancel func() shutdownCh chan struct{} closeTimeout time.Duration @@ -118,6 +123,10 @@ func (q *logRequestQueue) Pop() any { old := *q n := len(old) x := old[n-1] + // Remove this reference to a logRequest since the GC considers the popped + // element still accessible otherwise. Since these are sync pooled, we want + // to defer object lifetime management to the pool without interfering. + old[n-1] = nil *q = old[0 : n-1] return x } @@ -132,7 +141,8 @@ func Open( } reg = prometheus.WrapRegistererWithPrefix("frostdb_wal_", reg) - logStore, err := wal.Open(path, wal.WithLogger(logger), wal.WithMetricsRegisterer(reg)) + segmentSize := wal.DefaultSegmentSize + logStore, err := wal.Open(path, wal.WithLogger(logger), wal.WithMetricsRegisterer(reg), wal.WithSegmentSize(segmentSize)) if err != nil { return nil, err } @@ -181,8 +191,13 @@ func Open( Name: "close_timeouts_total", Help: "The number of times the WAL failed to close due to a timeout", }), + walQueueSize: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "queue_size", + Help: "The number of unprocessed requests in the WAL queue", + }), }, - shutdownCh: make(chan struct{}), + segmentSize: segmentSize, + shutdownCh: make(chan struct{}), } w.protected.nextTx = lastIndex + 1 @@ -242,7 +257,8 @@ func (w *FileWAL) run(ctx context.Context) { case <-ticker.C: batch := batch[:0] w.protected.Lock() - for w.protected.queue.Len() > 0 { + batchSize := 0 + for w.protected.queue.Len() > 0 && batchSize < w.segmentSize { if minTx := w.protected.queue[0].tx; minTx != w.protected.nextTx { if minTx < w.protected.nextTx { // The next entry must be dropped otherwise progress @@ -254,11 +270,14 @@ func (w *FileWAL) run(ctx context.Context) { "found", minTx, ) _ = heap.Pop(&w.protected.queue) + w.metrics.walQueueSize.Sub(1) } break } r := heap.Pop(&w.protected.queue).(*logRequest) + w.metrics.walQueueSize.Sub(1) batch = append(batch, r) + batchSize += len(r.data) w.protected.nextTx++ } // truncateTx will be non-zero if we either are about to log a @@ -319,7 +338,16 @@ func (w *FileWAL) run(ctx context.Context) { } } - for _, r := range batch { + // Remove references to a logRequest since the GC considers the + // popped element still accessible otherwise. Since these are sync + // pooled, we want to defer object lifetime management to the pool + // without interfering. + for i := range walBatch { + walBatch[i].Data = nil + } + + for i, r := range batch { + batch[i] = nil w.logRequestPool.Put(r) } @@ -380,6 +408,7 @@ func (w *FileWAL) Log(tx uint64, record *walpb.Record) error { w.protected.Lock() heap.Push(&w.protected.queue, r) + w.metrics.walQueueSize.Add(1) w.protected.Unlock() return nil @@ -438,6 +467,7 @@ func (w *FileWAL) LogRecord(tx uint64, txnMetadata []byte, table string, record w.protected.Lock() heap.Push(&w.protected.queue, r) + w.metrics.walQueueSize.Add(1) w.protected.Unlock() return nil