diff --git a/go/parquet/file/file_reader_test.go b/go/parquet/file/file_reader_test.go index 35f4da4e8667c..74926c958e2f7 100644 --- a/go/parquet/file/file_reader_test.go +++ b/go/parquet/file/file_reader_test.go @@ -452,6 +452,55 @@ func TestRleBooleanEncodingFileRead(t *testing.T) { assert.Equal(t, expected, values[:len(expected)]) } +type mockBadReader struct { + cnt int + reader *os.File +} + +func (m *mockBadReader) Seek(offset int64, whence int) (int64, error) { + return m.reader.Seek(offset, whence) +} + +func (m *mockBadReader) ReadAt(p []byte, off int64) (n int, err error) { + if m.cnt == 0 { + return 0, fmt.Errorf("mock error") + } + m.cnt-- + return m.reader.ReadAt(p, off) +} + +func TestBadReader(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_DATA") + } + require.DirExists(t, dir) + + filePath := path.Join(dir, "byte_stream_split_extended.gzip.parquet") + f, err := os.Open(filePath) + assert.NoError(t, err) + defer f.Close() + + reader := &mockBadReader{ + cnt: 2, + reader: f, + } + r, err := file.NewParquetReader(reader, file.WithReadProps(&parquet.ReaderProperties{ + BufferSize: int64(1024), + BufferedStreamEnabled: true, + })) + assert.NoError(t, err) + + fileReader, err := pqarrow.NewFileReader(r, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + assert.NoError(t, err) + + columnReader, err := fileReader.GetColumn(context.Background(), 0) + assert.NoError(t, err) + + _, err = columnReader.NextBatch(1) + assert.ErrorContains(t, err, "mock error") // Expect an error to occur. +} + func TestByteStreamSplitEncodingFileRead(t *testing.T) { dir := os.Getenv("PARQUET_TEST_DATA") if dir == "" { diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go index 667ffca77a8d1..765f4a9d34b33 100755 --- a/go/parquet/file/record_reader.go +++ b/go/parquet/file/record_reader.go @@ -645,7 +645,7 @@ func (rr *recordReader) ReadRecords(numRecords int64) (int64, error) { } } - return recordsRead, nil + return recordsRead, rr.Err() } func (rr *recordReader) ReleaseValidBits() *memory.Buffer {