diff --git a/dynparquet/schema.go b/dynparquet/schema.go index d2fbc831c..52f730c26 100644 --- a/dynparquet/schema.go +++ b/dynparquet/schema.go @@ -1267,47 +1267,6 @@ type ParquetWriter interface { Reset(writer io.Writer) } -func (s *Schema) NewSortingWriter(w io.Writer, dynamicColumns map[string][]string) (ParquetWriter, error) { - ps, err := s.GetDynamicParquetSchema(dynamicColumns) - if err != nil { - return nil, err - } - defer s.PutPooledParquetSchema(ps) - - cols := s.ParquetSortingColumns(dynamicColumns) - bloomFilterColumns := make([]parquet.BloomFilterColumn, 0, len(cols)) - for _, col := range cols { - // Don't add bloom filters to boolean columns - colName := strings.Split(col.Path()[0], ".")[0] - def, ok := s.ColumnByName(colName) - if !ok { - continue - } - if def.StorageLayout.Type().Kind() == parquet.Boolean { - continue - } - - bloomFilterColumns = append( - bloomFilterColumns, parquet.SplitBlockFilter(bloomFilterBitsPerValue, col.Path()...), - ) - } - - return parquet.NewSortingWriter[any](w, - 32*1024, - ps.Schema, - parquet.ColumnIndexSizeLimit(ColumnIndexSize), - parquet.BloomFilters(bloomFilterColumns...), - parquet.KeyValueMetadata( - DynamicColumnsKey, - serializeDynamicColumns(dynamicColumns), - ), - parquet.SortingWriterConfig( - parquet.SortingColumns(cols...), - parquet.DropDuplicatedRows(s.uniquePrimaryIndex), - ), - ), nil -} - type PooledWriter struct { pool *sync.Pool ParquetWriter diff --git a/table.go b/table.go index b66316de1..68cc4af23 100644 --- a/table.go +++ b/table.go @@ -912,7 +912,7 @@ type parquetRowWriterOption func(p *parquetRowWriter) // rowWriter returns a new Parquet row writer with the given dynamic columns. func (t *TableBlock) rowWriter(writer io.Writer, dynCols map[string][]string, options ...parquetRowWriterOption) (*parquetRowWriter, error) { - w, err := t.table.schema.NewSortingWriter(writer, dynCols) + w, err := t.table.schema.NewWriter(writer, dynCols) if err != nil { return nil, err } @@ -1137,6 +1137,8 @@ func (t *Table) compactParts(w io.Writer, compact []*parts.Part) (int64, error) // To reduce the number of open cursors at the same time (which helps in // memory usage reduction), find which parts do not overlap with any other // part. These parts can be sorted and read one by one. + // This compaction code assumes the invariant that rows are sorted within + // parts. This helps reduce memory usage in various ways. nonOverlappingParts, overlappingParts, err := parts.FindMaximumNonOverlappingSet(t.schema, compact) if err != nil { return 0, err