Skip to content

Commit

Permalink
apacheGH-43860: [Go][Parquet] Handle the error correctly (apache#43861)
Browse files Browse the repository at this point in the history
### Rationale for this change
Fixes: apache#43860

### What changes are included in this PR?
Return error correctly

### Are these changes tested?
Yes

### Are there any user-facing changes?
Nope

* GitHub Issue: apache#43860

Authored-by: bigsheeper <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
bigsheeper authored and khwilson committed Sep 14, 2024
1 parent a48f69b commit 2fc8423
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
49 changes: 49 additions & 0 deletions go/parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,55 @@ func TestRleBooleanEncodingFileRead(t *testing.T) {
assert.Equal(t, expected, values[:len(expected)])
}

type mockBadReader struct {
cnt int
reader *os.File
}

func (m *mockBadReader) Seek(offset int64, whence int) (int64, error) {
return m.reader.Seek(offset, whence)
}

func (m *mockBadReader) ReadAt(p []byte, off int64) (n int, err error) {
if m.cnt == 0 {
return 0, fmt.Errorf("mock error")
}
m.cnt--
return m.reader.ReadAt(p, off)
}

func TestBadReader(t *testing.T) {
dir := os.Getenv("PARQUET_TEST_DATA")
if dir == "" {
t.Skip("no path supplied with PARQUET_TEST_DATA")
}
require.DirExists(t, dir)

filePath := path.Join(dir, "byte_stream_split_extended.gzip.parquet")
f, err := os.Open(filePath)
assert.NoError(t, err)
defer f.Close()

reader := &mockBadReader{
cnt: 2,
reader: f,
}
r, err := file.NewParquetReader(reader, file.WithReadProps(&parquet.ReaderProperties{
BufferSize: int64(1024),
BufferedStreamEnabled: true,
}))
assert.NoError(t, err)

fileReader, err := pqarrow.NewFileReader(r, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
assert.NoError(t, err)

columnReader, err := fileReader.GetColumn(context.Background(), 0)
assert.NoError(t, err)

_, err = columnReader.NextBatch(1)
assert.ErrorContains(t, err, "mock error") // Expect an error to occur.
}

func TestByteStreamSplitEncodingFileRead(t *testing.T) {
dir := os.Getenv("PARQUET_TEST_DATA")
if dir == "" {
Expand Down
2 changes: 1 addition & 1 deletion go/parquet/file/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (rr *recordReader) ReadRecords(numRecords int64) (int64, error) {
}
}

return recordsRead, nil
return recordsRead, rr.Err()
}

func (rr *recordReader) ReleaseValidBits() *memory.Buffer {
Expand Down

0 comments on commit 2fc8423

Please sign in to comment.