diff --git a/db.go b/db.go index 35d4b4090..d53fa6772 100644 --- a/db.go +++ b/db.go @@ -650,6 +650,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { if table, ok := db.tables[e.TableBlockPersisted.TableName]; ok { table.ActiveBlock().index, err = index.NewLSM( table.name, + table.schema, table.configureLSMLevels(db.columnStore.indexConfig), index.LSMWithMetrics(table.metrics.indexMetrics), ) diff --git a/index/lsm.go b/index/lsm.go index 002c6ec26..21d01701a 100644 --- a/index/lsm.go +++ b/index/lsm.go @@ -28,7 +28,10 @@ import ( // [L0]->[record]->[record]->[L1]->[record/parquet]->[record/parquet] etc. type LSM struct { sync.RWMutex - compacting *atomic.Bool + compacting *atomic.Bool + compactionWg sync.WaitGroup + + schema *dynparquet.Schema prefix string levels *Node @@ -91,12 +94,13 @@ func NewLSMMetrics(reg prometheus.Registerer) *LSMMetrics { } // NewLSM returns an LSM-like index of len(levels) levels. -func NewLSM(prefix string, levels []*LevelConfig, options ...LSMOption) (*LSM, error) { +func NewLSM(prefix string, schema *dynparquet.Schema, levels []*LevelConfig, options ...LSMOption) (*LSM, error) { if err := validateLevels(levels); err != nil { return nil, err } lsm := &LSM{ + schema: schema, prefix: prefix, levels: NewList(L0), sizes: make([]atomic.Int64, len(levels)), @@ -158,18 +162,24 @@ func (l *LSM) MaxLevel() SentinelType { func (l *LSM) Add(tx uint64, record arrow.Record) { record.Retain() size := util.TotalRecordSize(record) - l.levels.Prepend(parts.NewArrowPart(tx, record, int(size), nil, parts.WithCompactionLevel(int(L0)))) + l.levels.Prepend(parts.NewArrowPart(tx, record, int(size), l.schema, parts.WithCompactionLevel(int(L0)))) l0 := l.sizes[L0].Add(int64(size)) l.metrics.LevelSize.WithLabelValues(L0.String()).Set(float64(l0)) if l0 >= l.configs[L0].MaxSize { if l.compacting.CompareAndSwap(false, true) { + l.compactionWg.Add(1) go func() { + defer l.compactionWg.Done() _ = l.compact() }() } } } +func (l *LSM) WaitForPendingCompactions() { + l.compactionWg.Wait() +} + // InsertPart inserts a part into the LSM tree. It will be inserted into the correct level. It does not check if the insert should cause a compaction. // This should only be used during snapshot recovery. func (l *LSM) InsertPart(level SentinelType, part *parts.Part) { diff --git a/index/lsm_test.go b/index/lsm_test.go index b855859b7..cf6008e4d 100644 --- a/index/lsm_test.go +++ b/index/lsm_test.go @@ -102,7 +102,7 @@ func check(t *testing.T, lsm *LSM, records, buffers int) { func Test_LSM_Basic(t *testing.T) { t.Parallel() - lsm, err := NewLSM("test", []*LevelConfig{ + lsm, err := NewLSM("test", nil, []*LevelConfig{ {Level: L0, MaxSize: 1024 * 1024 * 1024, Compact: parquetCompaction}, {Level: L1, MaxSize: 1024 * 1024 * 1024, Compact: parquetCompaction}, {Level: L2, MaxSize: 1024 * 1024 * 1024}, @@ -135,7 +135,7 @@ func Test_LSM_Basic(t *testing.T) { func Test_LSM_DuplicateSentinel(t *testing.T) { t.Parallel() - lsm, err := NewLSM("test", []*LevelConfig{ + lsm, err := NewLSM("test", nil, []*LevelConfig{ {Level: L0, MaxSize: 1024 * 1024 * 1024, Compact: parquetCompaction}, {Level: L1, MaxSize: 1024 * 1024 * 1024, Compact: parquetCompaction}, {Level: L2, MaxSize: 1024 * 1024 * 1024}, @@ -158,7 +158,7 @@ func Test_LSM_DuplicateSentinel(t *testing.T) { func Test_LSM_Compaction(t *testing.T) { t.Parallel() - lsm, err := NewLSM("test", []*LevelConfig{ + lsm, err := NewLSM("test", nil, []*LevelConfig{ {Level: L0, MaxSize: 1, Compact: parquetCompaction}, {Level: L1, MaxSize: 1024 * 1024 * 1024}, }) @@ -176,7 +176,7 @@ func Test_LSM_Compaction(t *testing.T) { func Test_LSM_CascadeCompaction(t *testing.T) { t.Parallel() - lsm, err := NewLSM("test", []*LevelConfig{ + lsm, err := NewLSM("test", nil, []*LevelConfig{ {Level: L0, MaxSize: 257, Compact: parquetCompaction}, {Level: L1, MaxSize: 2281, Compact: parquetCompaction}, {Level: L2, MaxSize: 2281, Compact: parquetCompaction}, diff --git a/parts/part.go b/parts/part.go index 6df7e3098..bb017fe04 100644 --- a/parts/part.go +++ b/parts/part.go @@ -210,7 +210,7 @@ func (p *Part) most() (*dynparquet.DynamicRow, error) { return p.maxRow, nil } -func (p Part) OverlapsWith(schema *dynparquet.Schema, otherPart *Part) (bool, error) { +func (p *Part) OverlapsWith(schema *dynparquet.Schema, otherPart *Part) (bool, error) { a, err := p.Least() if err != nil { return false, err @@ -240,7 +240,7 @@ func Tombstone(parts []*Part) { } } -func (p Part) HasTombstone() bool { +func (p *Part) HasTombstone() bool { return p.tx == math.MaxUint64 } diff --git a/table.go b/table.go index a241fdaf0..b66316de1 100644 --- a/table.go +++ b/table.go @@ -839,6 +839,7 @@ func newTableBlock(table *Table, prevTx, tx uint64, id ulid.ULID) (*TableBlock, var err error tb.index, err = index.NewLSM( table.name, + table.schema, table.configureLSMLevels(table.db.columnStore.indexConfig), index.LSMWithMetrics(table.metrics.indexMetrics), ) @@ -1065,6 +1066,7 @@ func (t *Table) close() { t.active.pendingWritersWg.Wait() t.closing = true + t.active.index.WaitForPendingCompactions() } // configureLSMLevels configures the level configs for this table. @@ -1086,17 +1088,36 @@ func (t *Table) configureLSMLevels(levels []*index.LevelConfig) []*index.LevelCo } func (t *Table) parquetCompaction(compact []*parts.Part, options ...parts.Option) ([]*parts.Part, int64, int64, error) { - b := &bytes.Buffer{} - size, err := t.compactParts(b, compact) - if err != nil { - return nil, 0, 0, err + var ( + buf *dynparquet.SerializedBuffer + preCompactionSize, postCompactionSize int64 + err error + ) + if len(compact) > 1 { + var b bytes.Buffer + preCompactionSize, err = t.compactParts(&b, compact) + if err != nil { + return nil, 0, 0, err + } + buf, err = dynparquet.ReaderFromBytes(b.Bytes()) + if err != nil { + return nil, 0, 0, err + } + postCompactionSize = int64(b.Len()) + } else if len(compact) == 1 { + // It's more efficient to skip compactParts if there's only one part. + // The only thing we want to ensure is that this part is converted to + // parquet if it is an arrow part. + singlePart := compact[0] + preCompactionSize = singlePart.Size() + buf, err = compact[0].AsSerializedBuffer(t.schema) + if err != nil { + return nil, 0, 0, err + } + postCompactionSize = buf.ParquetFile().Size() } - buf, err := dynparquet.ReaderFromBytes(b.Bytes()) - if err != nil { - return nil, 0, 0, err - } - return []*parts.Part{parts.NewPart(0, buf, options...)}, size, int64(b.Len()), nil + return []*parts.Part{parts.NewPart(0, buf, options...)}, preCompactionSize, postCompactionSize, nil } func (t *Table) externalParquetCompaction(writer io.Writer) func(compact []*parts.Part) (*parts.Part, int64, int64, error) { @@ -1113,9 +1134,40 @@ func (t *Table) externalParquetCompaction(writer io.Writer) func(compact []*part // compactParts will compact the given parts into a Parquet file written to w. // It returns the size in bytes of the compacted parts. func (t *Table) compactParts(w io.Writer, compact []*parts.Part) (int64, error) { - bufs := []dynparquet.DynamicRowGroup{} + // To reduce the number of open cursors at the same time (which helps in + // memory usage reduction), find which parts do not overlap with any other + // part. These parts can be sorted and read one by one. + nonOverlappingParts, overlappingParts, err := parts.FindMaximumNonOverlappingSet(t.schema, compact) + if err != nil { + return 0, err + } + bufs := make([]dynparquet.DynamicRowGroup, 0, len(compact)) var size int64 - for _, part := range compact { + if len(nonOverlappingParts) == 1 { + // Not worth doing anything if only one part does not overlap. + overlappingParts = append(overlappingParts, nonOverlappingParts[0]) + } else if len(nonOverlappingParts) > 0 { + // nonOverlappingParts is already sorted. + rowGroups := make([]dynparquet.DynamicRowGroup, 0, len(nonOverlappingParts)) + for _, p := range nonOverlappingParts { + size += p.Size() + buf, err := p.AsSerializedBuffer(t.schema) + if err != nil { + return 0, err + } + rowGroups = append(rowGroups, buf.MultiDynamicRowGroup()) + } + // WithAlreadySorted ensures that a parquet.MultiRowGroup is created + // here, which is much cheaper than actually merging all these row + // groups. + merged, err := t.schema.MergeDynamicRowGroups(rowGroups, dynparquet.WithAlreadySorted()) + if err != nil { + return 0, err + } + bufs = append(bufs, merged) + } + + for _, part := range overlappingParts { size += part.Size() buf, err := part.AsSerializedBuffer(t.schema) if err != nil {