From 89957561be66313797ebcfb8f193a6ea705d18a7 Mon Sep 17 00:00:00 2001 From: Thor <8681572+thorfour@users.noreply.github.com> Date: Mon, 22 Apr 2024 11:00:09 -0500 Subject: [PATCH] Iceberg storage (#824) * iceberg storage --- db_test.go | 85 +++++++++++++ go.mod | 14 ++- go.sum | 63 ++++------ storage/iceberg.go | 306 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 427 insertions(+), 41 deletions(-) create mode 100644 storage/iceberg.go diff --git a/db_test.go b/db_test.go index e1e8ebbf7..cc12b51e8 100644 --- a/db_test.go +++ b/db_test.go @@ -18,6 +18,7 @@ import ( "github.com/apache/arrow/go/v15/arrow/memory" "github.com/go-kit/log/level" "github.com/google/uuid" + "github.com/polarsignals/iceberg-go/catalog" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" @@ -30,6 +31,7 @@ import ( "github.com/polarsignals/frostdb/query/logicalplan" "github.com/polarsignals/frostdb/query/physicalplan" "github.com/polarsignals/frostdb/recovery" + "github.com/polarsignals/frostdb/storage" ) func TestDBWithWALAndBucket(t *testing.T) { @@ -2962,3 +2964,86 @@ func Test_DB_SnapshotDuplicate_Corrupted(t *testing.T) { validateRows(10) } + +func Test_Iceberg(t *testing.T) { + t.Parallel() + + config := NewTableConfig( + dynparquet.SampleDefinition(), + ) + logger := newTestLogger(t) + bucket := objstore.NewInMemBucket() + iceberg, err := storage.NewIceberg("/", catalog.NewHDFS("/", bucket), bucket) + require.NoError(t, err) + + dir := t.TempDir() + c, err := New( + WithLogger(logger), + WithStoragePath(dir), + WithWAL(), + WithManualBlockRotation(), + WithReadWriteStorage(iceberg), + ) + t.Cleanup(func() { + require.NoError(t, c.Close()) + }) + require.NoError(t, err) + db, err := c.DB(context.Background(), "test") + require.NoError(t, err) + table, err := db.Table("test", config) + require.NoError(t, err) + + ctx := context.Background() + samples := dynparquet.GenerateTestSamples(10) + r, err := samples.ToRecord() + require.NoError(t, err) + _, err = table.InsertRecord(ctx, r) + require.NoError(t, err) + + validateRows := func(expected int64) { + pool := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer pool.AssertSize(t, 0) + rows := int64(0) + engine := query.NewEngine(pool, db.TableProvider()) + err = engine.ScanTable("test"). + Execute(context.Background(), func(ctx context.Context, r arrow.Record) error { + rows += r.NumRows() + return nil + }) + require.NoError(t, err) + require.Equal(t, expected, rows) + } + + validateRows(10) + + require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false)) + require.Eventually(t, func() bool { + info, err := bucket.Attributes(ctx, filepath.Join("test", "test", "metadata", "v1.metadata.json")) + if err != nil { + return false + } + return info.Size > 0 + }, 3*time.Second, 10*time.Millisecond) + + validateRows(10) + + // Insert sampels with a different schema + samples = dynparquet.NewTestSamples() + r, err = samples.ToRecord() + require.NoError(t, err) + _, err = table.InsertRecord(ctx, r) + require.NoError(t, err) + + validateRows(13) + + require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false)) + require.Eventually(t, func() bool { + info, err := bucket.Attributes(ctx, filepath.Join("test", "test", "metadata", "v2.metadata.json")) + if err != nil { + return false + } + return info.Size > 0 + }, 3*time.Second, 10*time.Millisecond) + + validateRows(13) +} diff --git a/go.mod b/go.mod index cafcc8a88..a5ae378af 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 require ( github.com/RoaringBitmap/roaring v1.9.2 github.com/apache/arrow/go/v15 v15.0.2 - github.com/apache/arrow/go/v16 v16.0.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/cockroachdb/datadriven v1.0.2 github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 @@ -16,6 +15,7 @@ require ( github.com/parquet-go/parquet-go v0.20.1 github.com/pingcap/tidb/parser v0.0.0-20231013125129-93a834a6bf8d github.com/planetscale/vtprotobuf v0.6.0 + github.com/polarsignals/iceberg-go v0.0.0-20240418153106-963db29628ae github.com/polarsignals/wal v0.0.0-20231123092250-5d233119cfc9 github.com/prometheus/client_golang v1.19.0 github.com/stretchr/testify v1.9.0 @@ -44,13 +44,19 @@ require ( github.com/efficientgo/core v1.0.0-rc.2 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/goccy/go-json v0.10.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/hamba/avro/v2 v2.20.1 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.7 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect - github.com/mattn/go-runewidth v0.0.9 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect + github.com/oklog/ulid v1.3.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 // indirect @@ -62,6 +68,7 @@ require ( github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/rivo/uniseg v0.4.4 // indirect github.com/segmentio/encoding v0.3.6 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.etcd.io/bbolt v1.3.6 // indirect @@ -74,6 +81,7 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.20.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect + gonum.org/v1/gonum v0.15.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index ae7b4bb27..98ef0c65c 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,12 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= -github.com/RoaringBitmap/roaring v1.9.1 h1:LXcSqGGGMKm+KAzUyWn7ZeREqoOkoMX+KwLOK1thc4I= -github.com/RoaringBitmap/roaring v1.9.1/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/RoaringBitmap/roaring v1.9.2 h1:TjoelXOmLrpjbDTzXwr6F17pusrgqUeBE2lp9N6YHRg= github.com/RoaringBitmap/roaring v1.9.2/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= -github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= -github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/apache/arrow/go/v15 v15.0.2 h1:60IliRbiyTWCWjERBCkO1W4Qun9svcYoZrSLcyOsMLE= github.com/apache/arrow/go/v15 v15.0.2/go.mod h1:DGXsR3ajT524njufqf95822i+KTh+yea1jass9YXgjA= -github.com/apache/arrow/go/v16 v16.0.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -50,29 +45,25 @@ github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrt github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= -github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI= github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hamba/avro/v2 v2.20.1 h1:3WByQiVn7wT7d27WQq6pvBRC00FVOrniP6u67FLA/2E= +github.com/hamba/avro/v2 v2.20.1/go.mod h1:xHiKXbISpb3Ovc809XdzWow+XGTn+Oyf/F9aZbTLAig= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= -github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -82,10 +73,20 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= @@ -93,8 +94,6 @@ github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6 github.com/parquet-go/parquet-go v0.20.1 h1:r5UqeMqyH2DrahZv6dlT41hH2NpS2F8atJWmX1ST1/U= github.com/parquet-go/parquet-go v0.20.1/go.mod h1:4YfUo8TkoGoqwzhA/joZKZ8f77wSMShOLHESY4Ys0bY= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= -github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= -github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= @@ -114,6 +113,8 @@ github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEa github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/polarsignals/iceberg-go v0.0.0-20240418153106-963db29628ae h1:1ysfduQl7FjsM4XCGLCSzAlE+5QsMQUw4SFwrOTPBBQ= +github.com/polarsignals/iceberg-go v0.0.0-20240418153106-963db29628ae/go.mod h1:Pr0quSmxIq62T5G+tEkU/wtnvNIPxpLTWdhhZ4sqxEo= github.com/polarsignals/wal v0.0.0-20231123092250-5d233119cfc9 h1:SwUso/MRikI7aLlEelX4k6N107fT4uTAzmtyMTfjr44= github.com/polarsignals/wal v0.0.0-20231123092250-5d233119cfc9/go.mod h1:EVDHAAe+7GQ33A1/x+/gE+sBPN4toQ0XG5RoLD49xr8= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= @@ -126,6 +127,9 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= +github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg= @@ -166,19 +170,13 @@ go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 h1:ESSUROHIBHg7USnszlcdmjBEwdMj9VUvU+OPk4yl2mc= golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= -golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -188,8 +186,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20211110154304-99a53858aa08/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -198,26 +194,17 @@ golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= -golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY= golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= -gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= -gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +gonum.org/v1/gonum v0.15.0 h1:2lYxjRbTYyxkJxlhC+LvJIx3SsANPdRybu1tGj9/OrQ= +gonum.org/v1/gonum v0.15.0/go.mod h1:xzZVBJBtS+Mz4q0Yl2LJTk+OxOg4jiXZ7qBoM0uISGo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/storage/iceberg.go b/storage/iceberg.go new file mode 100644 index 000000000..09e9d85bb --- /dev/null +++ b/storage/iceberg.go @@ -0,0 +1,306 @@ +package storage + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "path/filepath" + + "github.com/parquet-go/parquet-go" + "github.com/polarsignals/iceberg-go" + "github.com/polarsignals/iceberg-go/catalog" + "github.com/polarsignals/iceberg-go/table" + "github.com/thanos-io/objstore" + + "github.com/polarsignals/frostdb/dynparquet" + "github.com/polarsignals/frostdb/query/expr" + "github.com/polarsignals/frostdb/query/logicalplan" +) + +/* + Iceberg is an Apache Iceberg backed DataSink/DataSource. + + The Iceberg layout is as follows: + ///.metadata.json // Metadata file + //
/data/.parquet // data files + //
/metadata/snap..avro // Manifest list file (snapshot) + //
/metadata/.avro // Manifest file + //
/metadata/version-hint.text // Version hint file + + On Upload a new snapshot is created and the data file is added to a manifest (or a new manifest is created depending on settings). + This manifest is then added to the existing manifest list, and a new version of the metadata file is created. + + Once the new metadata file is written the version hint file is updated with the latest version number of the table. + This version-hint file is used to determine the latest version of the table. + + On Scan the latest snapshot is loaded and the manifest list is read. + If the manifests are partitioned; the manifests are filtered out based on the given filter against the partition data. + The remaining manifests are then read, and the data files are filtered out based on the given filter and the min/max columns of the data file. + + Remaining data files are then read and the filter is applied to each row group in the data file. + +*/ + +// Iceberg is an Apache Iceberg backed DataSink/DataSource. +type Iceberg struct { + catalog catalog.Catalog + bucketURI string // bucketURI is the URI of the bucket i.e gs://, s3:// etc. + bucket objstore.Bucket +} + +// NewIceberg creates a new Iceberg DataSink/DataSource. +// You must provide the URI of the warehouse and the objstore.Bucket that points to that warehouse. +func NewIceberg(uri string, ctlg catalog.Catalog, bucket objstore.Bucket) (*Iceberg, error) { + return &Iceberg{ + catalog: ctlg, + bucketURI: uri, + bucket: catalog.NewIcebucket(uri, bucket), + }, nil +} + +func (i *Iceberg) String() string { + return "Iceberg" +} + +// Scan will load the latest Iceberg table. It will filter out any manifests that do not contain useful data. +// Then it will read the manifests that may contain useful data. It will then filter out the data file that dot not contain useful data. +// Finally it has a set of data files that may contain useful data. It will then read the data files and apply the filter to each row group in the data file. +func (i *Iceberg) Scan(ctx context.Context, prefix string, _ *dynparquet.Schema, filter logicalplan.Expr, _ uint64, callback func(context.Context, any) error) error { + t, err := i.catalog.LoadTable(ctx, []string{i.bucketURI, prefix}, iceberg.Properties{}) + if err != nil { + if errors.Is(catalog.ErrorTableNotFound, err) { + return nil + } + return err + } + + // Get the latest snapshot + snapshot := t.CurrentSnapshot() + list, err := snapshot.Manifests(i.bucket) + if err != nil { + return fmt.Errorf("error reading manifest list: %w", err) + } + + fltr, err := expr.BooleanExpr(filter) + if err != nil { + return err + } + + for _, manifest := range list { + // TODO(thor): manifests can be filtered by partition data + entries, err := manifest.FetchEntries(i.bucket, false) + if err != nil { + return err + } + + for _, e := range entries { + ok, err := manifestEntryMayContainUsefulData(icebergSchemaToParquetSchema(t.Schema()), e, fltr) + if err != nil { + return fmt.Errorf("failed to filter entry: %w", err) + } + if !ok { + continue + } + + // TODO(thor): data files can be processed in parallel + bkt := NewBucketReaderAt(i.bucket) + r, err := bkt.GetReaderAt(ctx, e.DataFile().FilePath()) + if err != nil { + return err + } + + file, err := parquet.OpenFile( + r, + e.DataFile().FileSizeBytes(), + parquet.FileReadMode(parquet.ReadModeAsync), + ) + if err != nil { + return err + } + + // Get a reader from the file bytes + buf, err := dynparquet.NewSerializedBuffer(file) + if err != nil { + return err + } + + for i := 0; i < buf.NumRowGroups(); i++ { + rg := buf.DynamicRowGroup(i) + mayContainUsefulData, err := fltr.Eval(rg) + if err != nil { + return err + } + if mayContainUsefulData { + if err := callback(ctx, rg); err != nil { + return err + } + } + } + } + } + + return nil +} + +// Prefixes lists all the tables found in the warehouse for the given database(prefix). +func (i *Iceberg) Prefixes(ctx context.Context, prefix string) ([]string, error) { + tables, err := i.catalog.ListTables(ctx, []string{filepath.Join(i.bucketURI, prefix)}) + if err != nil { + return nil, err + } + + tableNames := make([]string, 0, len(tables)) + for _, t := range tables { + tableNames = append(tableNames, filepath.Join(t...)) + } + return tableNames, nil +} + +// Upload a parquet file into the Iceberg table. +func (i *Iceberg) Upload(ctx context.Context, name string, r io.Reader) error { + tablePath := filepath.Join(i.bucketURI, filepath.Dir(filepath.Dir(name))) + t, err := i.catalog.LoadTable(ctx, []string{tablePath}, iceberg.Properties{}) + if err != nil { + if !errors.Is(catalog.ErrorTableNotFound, err) { + return err + } + + // Table doesn't exist, create it + t, err = i.catalog.CreateTable(ctx, tablePath, iceberg.NewSchema(0), iceberg.Properties{}) + if err != nil { + return fmt.Errorf("failed to create table: %w", err) + } + } + + w, err := t.SnapshotWriter( + table.WithMergeSchema(), + table.WithManifestSizeBytes(8*1024*1024), // 8MiB manifest size + ) + if err != nil { + return err + } + + if err := w.Append(ctx, r); err != nil { + return err + } + + return w.Close(ctx) +} + +func (i *Iceberg) Delete(_ context.Context, _ string) error { + // Noop + // NOTE: Deletes are used in DataSinks when an upload fails for any reason. Because an Iceberg table is not updated + // until a full upload is successfull there is no risk of partial data being left in the table, or a corrupted file being read. + return nil +} + +func icebergSchemaToParquetSchema(schema *iceberg.Schema) *parquet.Schema { + g := parquet.Group{} + for _, f := range schema.Fields() { + switch f.Type.Type() { + case "long": + g[f.Name] = parquet.Int(64) + case "binary": + g[f.Name] = parquet.String() + } + } + return parquet.NewSchema("iceberg", g) +} + +func manifestEntryMayContainUsefulData(schema *parquet.Schema, entry iceberg.ManifestEntry, filter expr.TrueNegativeFilter) (bool, error) { + return filter.Eval(dataFileToParticulate(schema, entry.DataFile())) +} + +func dataFileToParticulate(schema *parquet.Schema, d iceberg.DataFile) expr.Particulate { + return &dataFileParticulate{ + schema: schema, + data: d, + } +} + +type dataFileParticulate struct { + schema *parquet.Schema + data iceberg.DataFile +} + +func (d *dataFileParticulate) Schema() *parquet.Schema { + return d.schema +} + +func (d *dataFileParticulate) ColumnChunks() []parquet.ColumnChunk { + virtualColumnChunks := make([]parquet.ColumnChunk, 0, len(d.schema.Fields())) + for i := range d.schema.Fields() { + virtualColumnChunks = append(virtualColumnChunks, &virtualColumnChunk{ + pType: d.schema.Fields()[i].Type(), + nulls: d.data.NullValueCounts()[i], + column: i, + lowerBounds: d.data.LowerBoundValues()[i], + upperBounds: d.data.UpperBoundValues()[i], + numValues: d.data.Count(), + }) + } + return virtualColumnChunks +} + +type virtualColumnChunk struct { + pType parquet.Type + column int + numValues int64 + nulls int64 + lowerBounds []byte + upperBounds []byte +} + +func (v *virtualColumnChunk) Type() parquet.Type { return nil } +func (v *virtualColumnChunk) Column() int { return v.column } +func (v *virtualColumnChunk) Pages() parquet.Pages { return nil } +func (v *virtualColumnChunk) ColumnIndex() (parquet.ColumnIndex, error) { + return &virtualColumnIndex{ + pType: v.pType, + nulls: v.nulls, + lowerBounds: v.lowerBounds, + upperBounds: v.upperBounds, + }, nil +} +func (v *virtualColumnChunk) OffsetIndex() (parquet.OffsetIndex, error) { return nil, nil } +func (v *virtualColumnChunk) BloomFilter() parquet.BloomFilter { return nil } +func (v *virtualColumnChunk) NumValues() int64 { return v.numValues } + +type virtualColumnIndex struct { + lowerBounds []byte + upperBounds []byte + nulls int64 + pType parquet.Type +} + +func (v *virtualColumnIndex) NumPages() int { return 1 } +func (v *virtualColumnIndex) NullCount(int) int64 { return v.nulls } +func (v *virtualColumnIndex) NullPage(int) bool { return false } +func (v *virtualColumnIndex) MinValue(int) parquet.Value { + switch v.pType.Kind() { + case parquet.Int64: + i := binary.LittleEndian.Uint64(v.lowerBounds) + return parquet.Int64Value(int64(i)) + case parquet.ByteArray: + return parquet.ByteArrayValue(v.lowerBounds) + default: + return parquet.ByteArrayValue(v.lowerBounds) + } +} + +func (v *virtualColumnIndex) MaxValue(int) parquet.Value { + switch v.pType.Kind() { + case parquet.Int64: + i := binary.LittleEndian.Uint64(v.upperBounds) + return parquet.Int64Value(int64(i)) + case parquet.ByteArray: + return parquet.ByteArrayValue(v.upperBounds) + default: + return parquet.ByteArrayValue(v.upperBounds) + } +} + +func (v *virtualColumnIndex) IsAscending() bool { return true } +func (v *virtualColumnIndex) IsDescending() bool { return false }