Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a columnar implementation for Row #31022

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/repr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ arrow = { version = "53.3.0", default-features = false }
bitflags = "1.3.2"
bytes = "1.3.0"
cfg-if = "1.0.0"
columnar = "0.2.0"
columnation = "0.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["serde", "std"] }
compact_bytes = "0.1.2"
Expand Down
152 changes: 149 additions & 3 deletions src/repr/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ include!(concat!(env!("OUT_DIR"), "/mz_repr.row.rs"));
/// Rows are dynamically sized, but up to a fixed size their data is stored in-line.
/// It is best to re-use a `Row` across multiple `Row` creation calls, as this
/// avoids the allocations involved in `Row::new()`.
#[derive(Default, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[derive(Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct Row {
data: CompactBytes,
}
Expand Down Expand Up @@ -162,7 +162,7 @@ impl Row {
/// This method clears the existing contents of the row, but retains the
/// allocation.
pub fn packer(&mut self) -> RowPacker<'_> {
self.data.clear();
self.clear();
RowPacker { row: self }
}

Expand Down Expand Up @@ -247,6 +247,12 @@ impl Row {
pub fn as_row_ref(&self) -> &RowRef {
RowRef::from_slice(self.data.as_slice())
}

/// Clear the contents of the [`Row`], leaving any allocation in place.
#[inline]
fn clear(&mut self) {
self.data.clear();
}
}

impl Borrow<RowRef> for Row {
Expand Down Expand Up @@ -287,6 +293,13 @@ impl Clone for Row {
}
}

// Row's `Hash` implementation defers to `RowRef` to ensure they hash equivalently.
impl std::hash::Hash for Row {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.as_row_ref().hash(state)
}
}

impl Arbitrary for Row {
type Parameters = prop::collection::SizeRange;
type Strategy = BoxedStrategy<Row>;
Expand Down Expand Up @@ -402,10 +415,143 @@ mod columnation {
}
}

mod columnar {
use columnar::{
AsBytes, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
};
use mz_ore::cast::CastFrom;

use crate::{Row, RowRef};

#[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
antiguru marked this conversation as resolved.
Show resolved Hide resolved
/// Bounds container; provides indexed access to offsets.
pub bounds: BC,
/// Values container; provides slice access to bytes.
pub values: VC,
}

impl Columnar for Row {
type Ref<'a> = &'a RowRef;
fn copy_from(&mut self, other: Self::Ref<'_>) {
self.clear();
self.data.extend_from_slice(other.data());
}
fn into_owned(other: Self::Ref<'_>) -> Self {
other.to_owned()
}
type Container = Rows;
}

impl<'b, BC: Container<u64>> Container<Row> for Rows<BC, &'b [u8]> {
type Borrowed<'a>
= Rows<BC::Borrowed<'a>, &'a [u8]>
where
Self: 'a;
fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
Rows {
bounds: self.bounds.borrow(),
values: self.values,
}
}
}
impl<BC: Container<u64>> Container<Row> for Rows<BC, Vec<u8>> {
type Borrowed<'a>
= Rows<BC::Borrowed<'a>, &'a [u8]>
where
BC: 'a;
fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
Rows {
bounds: self.bounds.borrow(),
values: self.values.borrow(),
}
}
}

impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
self.bounds.as_bytes().chain(self.values.as_bytes())
}
}
impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
Self {
bounds: FromBytes::from_bytes(bytes),
values: FromBytes::from_bytes(bytes),
}
}
}

impl<BC: Len, VC> Len for Rows<BC, VC> {
#[inline(always)]
fn len(&self) -> usize {
self.bounds.len()
}
}

impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
type Ref = &'a RowRef;
#[inline(always)]
fn get(&self, index: usize) -> Self::Ref {
let lower = if index == 0 {
0
} else {
self.bounds.index_as(index - 1)
};
let upper = self.bounds.index_as(index);
let lower = usize::cast_from(lower);
let upper = usize::cast_from(upper);
RowRef::from_slice(&self.values[lower..upper])
antiguru marked this conversation as resolved.
Show resolved Hide resolved
}
}
impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
type Ref = &'a RowRef;
#[inline(always)]
fn get(&self, index: usize) -> Self::Ref {
let lower = if index == 0 {
0
} else {
self.bounds.index_as(index - 1)
};
let upper = self.bounds.index_as(index);
let lower = usize::cast_from(lower);
let upper = usize::cast_from(upper);
RowRef::from_slice(&self.values[lower..upper])
}
}

impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
#[inline(always)]
fn push(&mut self, item: &Row) {
self.values.extend_from_slice(item.data.as_slice());
self.bounds.push(u64::cast_from(self.values.len()));
}
}
impl<BC: Push<u64>> Push<&RowRef> for Rows<BC> {
fn push(&mut self, item: &RowRef) {
self.values.extend_from_slice(item.data());
self.bounds.push(u64::cast_from(self.values.len()));
}
}
impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
fn clear(&mut self) {
self.bounds.clear();
self.values.clear();
}
}
impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
fn heap_size(&self) -> (usize, usize) {
let (l0, c0) = self.bounds.heap_size();
let (l1, c1) = self.values.heap_size();
(l0 + l1, c0 + c1)
}
}
}

/// A contiguous slice of bytes that are row data.
///
/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
#[derive(PartialEq, Eq)]
#[derive(PartialEq, Eq, Hash)]
#[repr(transparent)]
pub struct RowRef([u8]);

Expand Down
Loading