Skip to content

Commit

Permalink
*: do not use SortingWriter for compaction (#560)
Browse files Browse the repository at this point in the history
This commit removes SortingWriter for compaction, given it increases our memory
usage. Since we now rely on l0 parts being sorted, we can use a more efficient
non-sorting writer for compaction.
  • Loading branch information
asubiotto authored Oct 11, 2023
1 parent 767b419 commit 8627989
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 42 deletions.
41 changes: 0 additions & 41 deletions dynparquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,47 +1267,6 @@ type ParquetWriter interface {
Reset(writer io.Writer)
}

func (s *Schema) NewSortingWriter(w io.Writer, dynamicColumns map[string][]string) (ParquetWriter, error) {
ps, err := s.GetDynamicParquetSchema(dynamicColumns)
if err != nil {
return nil, err
}
defer s.PutPooledParquetSchema(ps)

cols := s.ParquetSortingColumns(dynamicColumns)
bloomFilterColumns := make([]parquet.BloomFilterColumn, 0, len(cols))
for _, col := range cols {
// Don't add bloom filters to boolean columns
colName := strings.Split(col.Path()[0], ".")[0]
def, ok := s.ColumnByName(colName)
if !ok {
continue
}
if def.StorageLayout.Type().Kind() == parquet.Boolean {
continue
}

bloomFilterColumns = append(
bloomFilterColumns, parquet.SplitBlockFilter(bloomFilterBitsPerValue, col.Path()...),
)
}

return parquet.NewSortingWriter[any](w,
32*1024,
ps.Schema,
parquet.ColumnIndexSizeLimit(ColumnIndexSize),
parquet.BloomFilters(bloomFilterColumns...),
parquet.KeyValueMetadata(
DynamicColumnsKey,
serializeDynamicColumns(dynamicColumns),
),
parquet.SortingWriterConfig(
parquet.SortingColumns(cols...),
parquet.DropDuplicatedRows(s.uniquePrimaryIndex),
),
), nil
}

type PooledWriter struct {
pool *sync.Pool
ParquetWriter
Expand Down
4 changes: 3 additions & 1 deletion table.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ type parquetRowWriterOption func(p *parquetRowWriter)

// rowWriter returns a new Parquet row writer with the given dynamic columns.
func (t *TableBlock) rowWriter(writer io.Writer, dynCols map[string][]string, options ...parquetRowWriterOption) (*parquetRowWriter, error) {
w, err := t.table.schema.NewSortingWriter(writer, dynCols)
w, err := t.table.schema.NewWriter(writer, dynCols)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1137,6 +1137,8 @@ func (t *Table) compactParts(w io.Writer, compact []*parts.Part) (int64, error)
// To reduce the number of open cursors at the same time (which helps in
// memory usage reduction), find which parts do not overlap with any other
// part. These parts can be sorted and read one by one.
// This compaction code assumes the invariant that rows are sorted within
// parts. This helps reduce memory usage in various ways.
nonOverlappingParts, overlappingParts, err := parts.FindMaximumNonOverlappingSet(t.schema, compact)
if err != nil {
return 0, err
Expand Down

0 comments on commit 8627989

Please sign in to comment.