Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Jan 8, 2025
1 parent 8a4ffbc commit 2a82065
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 17 deletions.
30 changes: 30 additions & 0 deletions cpp/src/parquet/size_statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "parquet/size_statistics.h"

#include <algorithm>
#include <ostream>
#include <string_view>

#include "arrow/util/logging.h"
#include "parquet/exception.h"
Expand Down Expand Up @@ -98,6 +100,34 @@ std::unique_ptr<SizeStatistics> SizeStatistics::Make(const ColumnDescriptor* des
return size_stats;
}

std::ostream& operator<<(std::ostream& os, const SizeStatistics& size_stats) {
constexpr std::string_view kComma = ", ";
os << "SizeStatistics{";
std::string_view sep = "";
if (size_stats.unencoded_byte_array_data_bytes.has_value()) {
os << "unencoded_byte_array_data_bytes="
<< *size_stats.unencoded_byte_array_data_bytes;
sep = kComma;
}
auto print_histogram = [&](std::string_view name,
const std::vector<int64_t>& histogram) {
if (!histogram.empty()) {
os << sep << name << "={";
sep = kComma;
std::string_view value_sep = "";
for (int64_t v : histogram) {
os << value_sep << v;
value_sep = kComma;
}
os << "}";
}
};
print_histogram("repetition_level_histogram", size_stats.repetition_level_histogram);
print_histogram("definition_level_histogram", size_stats.definition_level_histogram);
os << "}";
return os;
}

void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
::arrow::util::span<int64_t> histogram) {
const int64_t num_levels = static_cast<int64_t>(levels.size());
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/size_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <cstdint>
#include <iosfwd>
#include <optional>
#include <vector>

Expand Down Expand Up @@ -91,6 +92,9 @@ struct PARQUET_EXPORT SizeStatistics {
static std::unique_ptr<SizeStatistics> Make(const ColumnDescriptor* descr);
};

PARQUET_EXPORT
std::ostream& operator<<(std::ostream&, const SizeStatistics&);

PARQUET_EXPORT
void UpdateLevelHistogram(::arrow::util::span<const int16_t> levels,
::arrow::util::span<int64_t> histogram);
Expand Down
104 changes: 87 additions & 17 deletions cpp/src/parquet/size_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
#include "gtest/gtest.h"

#include <algorithm>
#include <ostream>
#include <random>

#include "arrow/buffer.h"
#include "arrow/table.h"
#include "arrow/testing/builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/span.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
#include "parquet/column_writer.h"
Expand All @@ -42,6 +40,29 @@

namespace parquet {

TEST(SizeStatistics, UpdateLevelHistogram) {
{
// max_level = 1
std::vector<int64_t> histogram(2, 0);
UpdateLevelHistogram(std::vector<int16_t>{0, 1, 1, 1, 0}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(2, 3));
UpdateLevelHistogram(std::vector<int16_t>{1, 1, 0}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(3, 5));
UpdateLevelHistogram(std::vector<int16_t>{}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(3, 5));
}
{
// max_level > 1
std::vector<int64_t> histogram(3, 0);
UpdateLevelHistogram(std::vector<int16_t>{0, 1, 2, 2, 0}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(2, 1, 2));
UpdateLevelHistogram(std::vector<int16_t>{1, 1, 0}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(3, 3, 2));
UpdateLevelHistogram(std::vector<int16_t>{}, histogram);
EXPECT_THAT(histogram, ::testing::ElementsAre(3, 3, 2));
}
}

TEST(SizeStatistics, ThriftSerDe) {
const std::vector<int64_t> kDefLevels = {128, 64, 32, 16};
const std::vector<int64_t> kRepLevels = {100, 80, 60, 40, 20};
Expand Down Expand Up @@ -88,13 +109,38 @@ struct PageSizeStatistics {
}
};

std::ostream& operator<<(std::ostream& os, const PageSizeStatistics& page_stats) {
constexpr std::string_view kComma = ", ";
os << "PageSizeStatistics{";
std::string_view sep = "";
auto print_vector = [&](std::string_view name, const std::vector<int64_t>& values) {
if (!values.empty()) {
os << sep << name << "={";
sep = kComma;
std::string_view value_sep = "";
for (int64_t v : values) {
os << value_sep << v;
value_sep = kComma;
}
os << "}";
}
};
print_vector("def_levels", page_stats.def_levels);
print_vector("rep_levels", page_stats.rep_levels);
print_vector("byte_array_bytes", page_stats.byte_array_bytes);
os << "}";
return os;
}

class SizeStatisticsRoundTripTest : public ::testing::Test {
public:
void WriteFile(SizeStatisticsLevel level,
const std::shared_ptr<::arrow::Table>& table) {
void WriteFile(SizeStatisticsLevel level, const std::shared_ptr<::arrow::Table>& table,
int max_row_group_length, int page_size,
int write_batch_size = DEFAULT_WRITE_BATCH_SIZE) {
auto writer_properties = WriterProperties::Builder()
.max_row_group_length(2) /* every row group has 2 rows */
->data_pagesize(1) /* every page has 1 row */
.max_row_group_length(max_row_group_length)
->data_pagesize(page_size)
->write_batch_size(write_batch_size)
->enable_write_page_index()
->enable_statistics()
->set_size_statistics_level(level)
Expand Down Expand Up @@ -127,6 +173,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer_));

// Read row group size statistics in order.
row_group_stats_.clear();
auto metadata = reader->metadata();
for (int i = 0; i < metadata->num_row_groups(); ++i) {
auto row_group_metadata = metadata->RowGroup(i);
Expand All @@ -138,6 +185,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
}

// Read page size statistics in order.
page_stats_.clear();
auto page_index_reader = reader->GetPageIndexReader();
ASSERT_NE(page_index_reader, nullptr);

Expand Down Expand Up @@ -168,11 +216,7 @@ class SizeStatisticsRoundTripTest : public ::testing::Test {
}
}

void Reset() {
buffer_.reset();
row_group_stats_.clear();
page_stats_.clear();
}
void Reset() { buffer_.reset(); }

protected:
std::shared_ptr<Buffer> buffer_;
Expand All @@ -187,7 +231,7 @@ TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) {
::arrow::field("a", ::arrow::list(::arrow::list(::arrow::int32()))),
::arrow::field("b", ::arrow::list(::arrow::list(::arrow::utf8()))),
});
// First two rows are in one row group, and the other two rows are in another row group.
// First two rows will be in one row group, and the other two rows in another row group.
auto table = ::arrow::TableFromJSON(schema, {R"([
[ [[1],[1,1],[1,1,1]], [["a"],["a","a"],["a","a","a"]] ],
[ [[0,1,null]], [["foo","bar",null]] ],
Expand All @@ -198,7 +242,7 @@ TEST_F(SizeStatisticsRoundTripTest, EnableSizeStats) {
for (auto size_stats_level :
{SizeStatisticsLevel::None, SizeStatisticsLevel::ColumnChunk,
SizeStatisticsLevel::PageAndColumnChunk}) {
WriteFile(size_stats_level, table);
WriteFile(size_stats_level, table, /*max_row_group_length=*/2, /*page_size=*/1);
ReadSizeStatistics();

if (size_stats_level == SizeStatisticsLevel::None) {
Expand Down Expand Up @@ -251,8 +295,8 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
{::arrow::field("a", ::arrow::dictionary(::arrow::int16(), ::arrow::utf8()))});
WriteFile(
SizeStatisticsLevel::PageAndColumnChunk,
::arrow::TableFromJSON(schema, {R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}));

::arrow::TableFromJSON(schema, {R"([["aa"],["aaa"],[null],["a"],["aaa"],["a"]])"}),
/*max_row_group_length=*/2, /*page_size=*/1);
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{0, 2},
Expand All @@ -276,6 +320,32 @@ TEST_F(SizeStatisticsRoundTripTest, WriteDictionaryArray) {
/*byte_array_bytes=*/{4}}));
}

// TODO add tests for UpdateLevelHistogram
TEST_F(SizeStatisticsRoundTripTest, WritePageInBatches) {
// Rep/def level histograms are updates in batches of `write_batch_size` levels
// inside a single page. Exercise the logic with more than one batch per page.
auto schema = ::arrow::schema({::arrow::field("a", ::arrow::list(::arrow::utf8()))});
auto table = ::arrow::TableFromJSON(schema, {R"([
[ [null,"a","ab"] ],
[ null ],
[ [] ],
[ [null,"d","de"] ],
[ ["g","gh",null] ],
[ ["j","jk",null] ]
])"});
for (int write_batch_size : {100, 5, 4, 3, 2, 1}) {
ARROW_SCOPED_TRACE("write_batch_size = ", write_batch_size);
WriteFile(SizeStatisticsLevel::PageAndColumnChunk, table,
/*max_row_group_length=*/1000, /*page_size=*/1000, write_batch_size);
ReadSizeStatistics();
EXPECT_THAT(row_group_stats_,
::testing::ElementsAre(SizeStatistics{/*def_levels=*/{1, 1, 4, 8},
/*rep_levels=*/{6, 8},
/*byte_array_bytes=*/12}));
EXPECT_THAT(page_stats_,
::testing::ElementsAre(PageSizeStatistics{/*def_levels=*/{1, 1, 4, 8},
/*rep_levels=*/{6, 8},
/*byte_array_bytes=*/{12}}));
}
}

} // namespace parquet

0 comments on commit 2a82065

Please sign in to comment.