Skip to content

Commit

Permalink
index: don't sort merge non-overlapping l0 parts (#559)
Browse files Browse the repository at this point in the history
* index: add wg to wait for pending compactions

This avoids leaking goroutines, especially in tests.

* index: don't sort merge non-overlapping l0 parts

This reduces compaction memory usage. Additionally, a check is done before
compaction to avoid compacting a single part since this is a no-op.
  • Loading branch information
asubiotto authored Oct 11, 2023
1 parent c6a06c5 commit 767b419
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 20 deletions.
1 change: 1 addition & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
if table, ok := db.tables[e.TableBlockPersisted.TableName]; ok {
table.ActiveBlock().index, err = index.NewLSM(
table.name,
table.schema,
table.configureLSMLevels(db.columnStore.indexConfig),
index.LSMWithMetrics(table.metrics.indexMetrics),
)
Expand Down
16 changes: 13 additions & 3 deletions index/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
// [L0]->[record]->[record]->[L1]->[record/parquet]->[record/parquet] etc.
type LSM struct {
sync.RWMutex
compacting *atomic.Bool
compacting *atomic.Bool
compactionWg sync.WaitGroup

schema *dynparquet.Schema

prefix string
levels *Node
Expand Down Expand Up @@ -91,12 +94,13 @@ func NewLSMMetrics(reg prometheus.Registerer) *LSMMetrics {
}

// NewLSM returns an LSM-like index of len(levels) levels.
func NewLSM(prefix string, levels []*LevelConfig, options ...LSMOption) (*LSM, error) {
func NewLSM(prefix string, schema *dynparquet.Schema, levels []*LevelConfig, options ...LSMOption) (*LSM, error) {
if err := validateLevels(levels); err != nil {
return nil, err
}

lsm := &LSM{
schema: schema,
prefix: prefix,
levels: NewList(L0),
sizes: make([]atomic.Int64, len(levels)),
Expand Down Expand Up @@ -158,18 +162,24 @@ func (l *LSM) MaxLevel() SentinelType {
func (l *LSM) Add(tx uint64, record arrow.Record) {
record.Retain()
size := util.TotalRecordSize(record)
l.levels.Prepend(parts.NewArrowPart(tx, record, int(size), nil, parts.WithCompactionLevel(int(L0))))
l.levels.Prepend(parts.NewArrowPart(tx, record, int(size), l.schema, parts.WithCompactionLevel(int(L0))))
l0 := l.sizes[L0].Add(int64(size))
l.metrics.LevelSize.WithLabelValues(L0.String()).Set(float64(l0))
if l0 >= l.configs[L0].MaxSize {
if l.compacting.CompareAndSwap(false, true) {
l.compactionWg.Add(1)
go func() {
defer l.compactionWg.Done()
_ = l.compact()
}()
}
}
}

func (l *LSM) WaitForPendingCompactions() {
l.compactionWg.Wait()
}

// InsertPart inserts a part into the LSM tree. It will be inserted into the correct level. It does not check if the insert should cause a compaction.
// This should only be used during snapshot recovery.
func (l *LSM) InsertPart(level SentinelType, part *parts.Part) {
Expand Down
8 changes: 4 additions & 4 deletions index/lsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func check(t *testing.T, lsm *LSM, records, buffers int) {

func Test_LSM_Basic(t *testing.T) {
t.Parallel()
lsm, err := NewLSM("test", []*LevelConfig{
lsm, err := NewLSM("test", nil, []*LevelConfig{
{Level: L0, MaxSize: 1024 * 1024 * 1024, Compact: parquetCompaction},
{Level: L1, MaxSize: 1024 * 1024 * 1024, Compact: parquetCompaction},
{Level: L2, MaxSize: 1024 * 1024 * 1024},
Expand Down Expand Up @@ -135,7 +135,7 @@ func Test_LSM_Basic(t *testing.T) {

func Test_LSM_DuplicateSentinel(t *testing.T) {
t.Parallel()
lsm, err := NewLSM("test", []*LevelConfig{
lsm, err := NewLSM("test", nil, []*LevelConfig{
{Level: L0, MaxSize: 1024 * 1024 * 1024, Compact: parquetCompaction},
{Level: L1, MaxSize: 1024 * 1024 * 1024, Compact: parquetCompaction},
{Level: L2, MaxSize: 1024 * 1024 * 1024},
Expand All @@ -158,7 +158,7 @@ func Test_LSM_DuplicateSentinel(t *testing.T) {

func Test_LSM_Compaction(t *testing.T) {
t.Parallel()
lsm, err := NewLSM("test", []*LevelConfig{
lsm, err := NewLSM("test", nil, []*LevelConfig{
{Level: L0, MaxSize: 1, Compact: parquetCompaction},
{Level: L1, MaxSize: 1024 * 1024 * 1024},
})
Expand All @@ -176,7 +176,7 @@ func Test_LSM_Compaction(t *testing.T) {

func Test_LSM_CascadeCompaction(t *testing.T) {
t.Parallel()
lsm, err := NewLSM("test", []*LevelConfig{
lsm, err := NewLSM("test", nil, []*LevelConfig{
{Level: L0, MaxSize: 257, Compact: parquetCompaction},
{Level: L1, MaxSize: 2281, Compact: parquetCompaction},
{Level: L2, MaxSize: 2281, Compact: parquetCompaction},
Expand Down
4 changes: 2 additions & 2 deletions parts/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (p *Part) most() (*dynparquet.DynamicRow, error) {
return p.maxRow, nil
}

func (p Part) OverlapsWith(schema *dynparquet.Schema, otherPart *Part) (bool, error) {
func (p *Part) OverlapsWith(schema *dynparquet.Schema, otherPart *Part) (bool, error) {
a, err := p.Least()
if err != nil {
return false, err
Expand Down Expand Up @@ -240,7 +240,7 @@ func Tombstone(parts []*Part) {
}
}

func (p Part) HasTombstone() bool {
func (p *Part) HasTombstone() bool {
return p.tx == math.MaxUint64
}

Expand Down
74 changes: 63 additions & 11 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ func newTableBlock(table *Table, prevTx, tx uint64, id ulid.ULID) (*TableBlock,
var err error
tb.index, err = index.NewLSM(
table.name,
table.schema,
table.configureLSMLevels(table.db.columnStore.indexConfig),
index.LSMWithMetrics(table.metrics.indexMetrics),
)
Expand Down Expand Up @@ -1065,6 +1066,7 @@ func (t *Table) close() {

t.active.pendingWritersWg.Wait()
t.closing = true
t.active.index.WaitForPendingCompactions()
}

// configureLSMLevels configures the level configs for this table.
Expand All @@ -1086,17 +1088,36 @@ func (t *Table) configureLSMLevels(levels []*index.LevelConfig) []*index.LevelCo
}

func (t *Table) parquetCompaction(compact []*parts.Part, options ...parts.Option) ([]*parts.Part, int64, int64, error) {
b := &bytes.Buffer{}
size, err := t.compactParts(b, compact)
if err != nil {
return nil, 0, 0, err
var (
buf *dynparquet.SerializedBuffer
preCompactionSize, postCompactionSize int64
err error
)
if len(compact) > 1 {
var b bytes.Buffer
preCompactionSize, err = t.compactParts(&b, compact)
if err != nil {
return nil, 0, 0, err
}
buf, err = dynparquet.ReaderFromBytes(b.Bytes())
if err != nil {
return nil, 0, 0, err
}
postCompactionSize = int64(b.Len())
} else if len(compact) == 1 {
// It's more efficient to skip compactParts if there's only one part.
// The only thing we want to ensure is that this part is converted to
// parquet if it is an arrow part.
singlePart := compact[0]
preCompactionSize = singlePart.Size()
buf, err = compact[0].AsSerializedBuffer(t.schema)
if err != nil {
return nil, 0, 0, err
}
postCompactionSize = buf.ParquetFile().Size()
}

buf, err := dynparquet.ReaderFromBytes(b.Bytes())
if err != nil {
return nil, 0, 0, err
}
return []*parts.Part{parts.NewPart(0, buf, options...)}, size, int64(b.Len()), nil
return []*parts.Part{parts.NewPart(0, buf, options...)}, preCompactionSize, postCompactionSize, nil
}

func (t *Table) externalParquetCompaction(writer io.Writer) func(compact []*parts.Part) (*parts.Part, int64, int64, error) {
Expand All @@ -1113,9 +1134,40 @@ func (t *Table) externalParquetCompaction(writer io.Writer) func(compact []*part
// compactParts will compact the given parts into a Parquet file written to w.
// It returns the size in bytes of the compacted parts.
func (t *Table) compactParts(w io.Writer, compact []*parts.Part) (int64, error) {
bufs := []dynparquet.DynamicRowGroup{}
// To reduce the number of open cursors at the same time (which helps in
// memory usage reduction), find which parts do not overlap with any other
// part. These parts can be sorted and read one by one.
nonOverlappingParts, overlappingParts, err := parts.FindMaximumNonOverlappingSet(t.schema, compact)
if err != nil {
return 0, err
}
bufs := make([]dynparquet.DynamicRowGroup, 0, len(compact))
var size int64
for _, part := range compact {
if len(nonOverlappingParts) == 1 {
// Not worth doing anything if only one part does not overlap.
overlappingParts = append(overlappingParts, nonOverlappingParts[0])
} else if len(nonOverlappingParts) > 0 {
// nonOverlappingParts is already sorted.
rowGroups := make([]dynparquet.DynamicRowGroup, 0, len(nonOverlappingParts))
for _, p := range nonOverlappingParts {
size += p.Size()
buf, err := p.AsSerializedBuffer(t.schema)
if err != nil {
return 0, err
}
rowGroups = append(rowGroups, buf.MultiDynamicRowGroup())
}
// WithAlreadySorted ensures that a parquet.MultiRowGroup is created
// here, which is much cheaper than actually merging all these row
// groups.
merged, err := t.schema.MergeDynamicRowGroups(rowGroups, dynparquet.WithAlreadySorted())
if err != nil {
return 0, err
}
bufs = append(bufs, merged)
}

for _, part := range overlappingParts {
size += part.Size()
buf, err := part.AsSerializedBuffer(t.schema)
if err != nil {
Expand Down

0 comments on commit 767b419

Please sign in to comment.