Skip to content

Commit

Permalink
Iceberg storage (#824)
Browse files Browse the repository at this point in the history
* iceberg storage
  • Loading branch information
thorfour authored Apr 22, 2024
1 parent 205ebed commit 8995756
Show file tree
Hide file tree
Showing 4 changed files with 427 additions and 41 deletions.
85 changes: 85 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/polarsignals/iceberg-go/catalog"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/polarsignals/frostdb/query/logicalplan"
"github.com/polarsignals/frostdb/query/physicalplan"
"github.com/polarsignals/frostdb/recovery"
"github.com/polarsignals/frostdb/storage"
)

func TestDBWithWALAndBucket(t *testing.T) {
Expand Down Expand Up @@ -2962,3 +2964,86 @@ func Test_DB_SnapshotDuplicate_Corrupted(t *testing.T) {

validateRows(10)
}

func Test_Iceberg(t *testing.T) {
t.Parallel()

config := NewTableConfig(
dynparquet.SampleDefinition(),
)
logger := newTestLogger(t)
bucket := objstore.NewInMemBucket()
iceberg, err := storage.NewIceberg("/", catalog.NewHDFS("/", bucket), bucket)
require.NoError(t, err)

dir := t.TempDir()
c, err := New(
WithLogger(logger),
WithStoragePath(dir),
WithWAL(),
WithManualBlockRotation(),
WithReadWriteStorage(iceberg),
)
t.Cleanup(func() {
require.NoError(t, c.Close())
})
require.NoError(t, err)
db, err := c.DB(context.Background(), "test")
require.NoError(t, err)
table, err := db.Table("test", config)
require.NoError(t, err)

ctx := context.Background()
samples := dynparquet.GenerateTestSamples(10)
r, err := samples.ToRecord()
require.NoError(t, err)
_, err = table.InsertRecord(ctx, r)
require.NoError(t, err)

validateRows := func(expected int64) {
pool := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer pool.AssertSize(t, 0)
rows := int64(0)
engine := query.NewEngine(pool, db.TableProvider())
err = engine.ScanTable("test").
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
rows += r.NumRows()
return nil
})
require.NoError(t, err)
require.Equal(t, expected, rows)
}

validateRows(10)

require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
require.Eventually(t, func() bool {
info, err := bucket.Attributes(ctx, filepath.Join("test", "test", "metadata", "v1.metadata.json"))
if err != nil {
return false
}
return info.Size > 0
}, 3*time.Second, 10*time.Millisecond)

validateRows(10)

// Insert sampels with a different schema
samples = dynparquet.NewTestSamples()
r, err = samples.ToRecord()
require.NoError(t, err)
_, err = table.InsertRecord(ctx, r)
require.NoError(t, err)

validateRows(13)

require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
require.Eventually(t, func() bool {
info, err := bucket.Attributes(ctx, filepath.Join("test", "test", "metadata", "v2.metadata.json"))
if err != nil {
return false
}
return info.Size > 0
}, 3*time.Second, 10*time.Millisecond)

validateRows(13)
}
14 changes: 11 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
github.com/RoaringBitmap/roaring v1.9.2
github.com/apache/arrow/go/v15 v15.0.2
github.com/apache/arrow/go/v16 v16.0.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/cockroachdb/datadriven v1.0.2
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140
Expand All @@ -16,6 +15,7 @@ require (
github.com/parquet-go/parquet-go v0.20.1
github.com/pingcap/tidb/parser v0.0.0-20231013125129-93a834a6bf8d
github.com/planetscale/vtprotobuf v0.6.0
github.com/polarsignals/iceberg-go v0.0.0-20240418153106-963db29628ae
github.com/polarsignals/wal v0.0.0-20231123092250-5d233119cfc9
github.com/prometheus/client_golang v1.19.0
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -44,13 +44,19 @@ require (
github.com/efficientgo/core v1.0.0-rc.2 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/hamba/avro/v2 v2.20.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 // indirect
Expand All @@ -62,6 +68,7 @@ require (
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/segmentio/encoding v0.3.6 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
Expand All @@ -74,6 +81,7 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.20.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit 8995756

Please sign in to comment.