Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynparquet: remove rowWriter #561

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dynparquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ const bloomFilterBitsPerValue = 10

// NewWriter returns a new parquet writer with a concrete parquet schema
// generated using the given concrete dynamic column names.
func (s *Schema) NewWriter(w io.Writer, dynamicColumns map[string][]string) (ParquetWriter, error) {
func (s *Schema) NewWriter(w io.Writer, dynamicColumns map[string][]string, opts ...parquet.WriterOption) (ParquetWriter, error) {
ps, err := s.GetDynamicParquetSchema(dynamicColumns)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1248,6 +1248,7 @@ func (s *Schema) NewWriter(w io.Writer, dynamicColumns map[string][]string) (Par
parquet.DropDuplicatedRows(s.uniquePrimaryIndex),
),
}
writerOptions = append(writerOptions, opts...)
if s.uniquePrimaryIndex {
return parquet.NewSortingWriter[any](
w,
Expand Down
92 changes: 6 additions & 86 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,89 +895,6 @@ type ParquetWriter interface {
io.Closer
}

// parquetRowWriter is a stateful parquet row group writer.
type parquetRowWriter struct {
schema *dynparquet.Schema
w ParquetWriter

rowGroupSize int
maxNumRows int

rowGroupRowsWritten int
totalRowsWritten int
rowsBuf []parquet.Row
}

type parquetRowWriterOption func(p *parquetRowWriter)

// rowWriter returns a new Parquet row writer with the given dynamic columns.
func (t *TableBlock) rowWriter(writer io.Writer, dynCols map[string][]string, options ...parquetRowWriterOption) (*parquetRowWriter, error) {
w, err := t.table.schema.NewWriter(writer, dynCols)
if err != nil {
return nil, err
}

buffSize := 256
if t.table.config.RowGroupSize > 0 {
buffSize = int(t.table.config.RowGroupSize)
}

p := &parquetRowWriter{
w: w,
schema: t.table.schema,
rowsBuf: make([]parquet.Row, buffSize),
rowGroupSize: int(t.table.config.RowGroupSize),
}

for _, option := range options {
option(p)
}

return p, nil
}

// WriteRows will write the given rows to the underlying Parquet writer. It returns the number of rows written.
func (p *parquetRowWriter) writeRows(rows parquet.Rows) (int, error) {
written := 0
for p.maxNumRows == 0 || p.totalRowsWritten < p.maxNumRows {
if p.rowGroupSize > 0 && p.rowGroupRowsWritten+len(p.rowsBuf) > p.rowGroupSize {
// Read only as many rows as we need to complete the row group size limit.
p.rowsBuf = p.rowsBuf[:p.rowGroupSize-p.rowGroupRowsWritten]
}
if p.maxNumRows != 0 && p.totalRowsWritten+len(p.rowsBuf) > p.maxNumRows {
// Read only as many rows as we need to write if they would bring
// us over the limit.
p.rowsBuf = p.rowsBuf[:p.maxNumRows-p.totalRowsWritten]
}
n, err := rows.ReadRows(p.rowsBuf)
if err != nil && err != io.EOF {
return 0, err
}
if n == 0 {
break
}

if _, err = p.w.WriteRows(p.rowsBuf[:n]); err != nil {
return 0, err
}
written += n
p.rowGroupRowsWritten += n
p.totalRowsWritten += n
if p.rowGroupSize > 0 && p.rowGroupRowsWritten >= p.rowGroupSize {
if err := p.w.Flush(); err != nil {
return 0, err
}
p.rowGroupRowsWritten = 0
}
}

return written, nil
}

func (p *parquetRowWriter) close() error {
return p.w.Close()
}

// memoryBlocks collects the active and pending blocks that are currently resident in memory.
// The pendingReadersWg.Done() function must be called on all blocks returned once processing is finished.
func (t *Table) memoryBlocks() ([]*TableBlock, uint64) {
Expand Down Expand Up @@ -1182,15 +1099,18 @@ func (t *Table) compactParts(w io.Writer, compact []*parts.Part) (int64, error)
return 0, err
}
err = func() error {
p, err := t.active.rowWriter(w, merged.DynamicColumns())
pw, err := t.schema.NewWriter(
w, merged.DynamicColumns(), parquet.MaxRowsPerRowGroup(int64(t.config.RowGroupSize)),
)
if err != nil {
return err
}
defer p.close()
defer pw.Close()

rows := merged.Rows()
defer rows.Close()
if _, err := p.writeRows(rows); err != nil {

if _, err := parquet.CopyRows(pw, rows); err != nil {
return err
}

Expand Down
57 changes: 0 additions & 57 deletions table_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package frostdb

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -721,62 +720,6 @@ func Test_Serialize_DisparateDynamicColumns(t *testing.T) {
require.NoError(t, table.active.Serialize(io.Discard))
}

func Test_RowWriter(t *testing.T) {
config := NewTableConfig(
dynparquet.SampleDefinition(),
WithRowGroupSize(5),
)

logger := newTestLogger(t)

c, err := New(WithLogger(logger))
require.NoError(t, err)

db, err := c.DB(context.Background(), "test")
require.NoError(t, err)
table, err := db.Table("test", config)
require.NoError(t, err)
defer c.Close()

b := &bytes.Buffer{}
rowWriter, err := table.ActiveBlock().rowWriter(b, map[string][]string{
"labels": {"node"},
})
require.NoError(t, err)

// Write 17(8,9) rows, expect 3 row groups of 5 rows and 1 row group of 2 rows
samples := dynparquet.GenerateTestSamples(8)
buf, err := samples.ToBuffer(table.Schema())
require.NoError(t, err)
rows := buf.Rows()
_, err = rowWriter.writeRows(rows)
require.NoError(t, err)
require.NoError(t, rows.Close())

samples = dynparquet.GenerateTestSamples(9)
buf, err = samples.ToBuffer(table.Schema())
require.NoError(t, err)
rows = buf.Rows()
_, err = rowWriter.writeRows(rows)
require.NoError(t, err)
require.NoError(t, rows.Close())

require.NoError(t, rowWriter.close())

f, err := parquet.OpenFile(bytes.NewReader(b.Bytes()), int64(b.Len()))
require.NoError(t, err)

require.Equal(t, 4, len(f.Metadata().RowGroups))
for i, rg := range f.Metadata().RowGroups {
switch i {
case 3:
require.Equal(t, int64(2), rg.NumRows)
default:
require.Equal(t, int64(5), rg.NumRows)
}
}
}

// Test_Table_Size ensures the size of the table increases by the size of the inserted data.
func Test_Table_Size(t *testing.T) {
c, table := basicTable(t)
Expand Down