Skip to content

Commit

Permalink
Provide a columnar implementation for Row
Browse files Browse the repository at this point in the history
With no functional changes, this PR adds a `Columnar` implementation for
`Row`. It's not used yet, but will be in subsequent changes. The
implementation follows the `String` implementation in columnar, both types
are very similar.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jan 13, 2025
1 parent 6d39009 commit 980b098
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/repr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ arrow = { version = "53.3.0", default-features = false }
bitflags = "1.3.2"
bytes = "1.3.0"
cfg-if = "1.0.0"
columnar = "0.2.0"
columnation = "0.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["serde", "std"] }
chrono-tz = { version = "0.8.1", features = ["serde", "case-insensitive"] }
Expand Down
148 changes: 147 additions & 1 deletion src/repr/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::cell::RefCell;
use std::cmp::Ordering;
use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Debug};
use std::hash::{Hash, Hasher};
use std::mem::{size_of, transmute};
use std::ops::Deref;
use std::rc::Rc;
Expand Down Expand Up @@ -162,7 +163,7 @@ impl Row {
/// This method clears the existing contents of the row, but retains the
/// allocation.
pub fn packer(&mut self) -> RowPacker<'_> {
self.data.clear();
self.clear();
RowPacker { row: self }
}

Expand Down Expand Up @@ -247,6 +248,12 @@ impl Row {
pub fn as_row_ref(&self) -> &RowRef {
RowRef::from_slice(self.data.as_slice())
}

/// Clear the contents of the [`Row`], leaving any allocation in place.
#[inline]
fn clear(&mut self) {
self.data.clear();
}
}

impl Borrow<RowRef> for Row {
Expand Down Expand Up @@ -402,6 +409,138 @@ mod columnation {
}
}

mod columnar {
use crate::{Row, RowRef};
use columnar::{
AsBytes, Clear, Columnar, Container, FromBytes, HeapSize, Index, IndexAs, Len, Push,
};
use mz_ore::cast::CastFrom;

#[derive(Copy, Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct Rows<BC = Vec<u64>, VC = Vec<u8>> {
/// Bounds container; provides indexed access to offsets.
pub bounds: BC,
/// Values container; provides slice access to bytes.
pub values: VC,
}

impl Columnar for Row {
type Ref<'a> = &'a RowRef;
fn copy_from(&mut self, other: Self::Ref<'_>) {
self.clear();
self.data.extend_from_slice(other.data());
}
fn into_owned(other: Self::Ref<'_>) -> Self {
other.to_owned()
}
type Container = Rows;
}

impl<'b, BC: Container<u64>> Container<Row> for Rows<BC, &'b [u8]> {
type Borrowed<'a>
= Rows<BC::Borrowed<'a>, &'a [u8]>
where
Self: 'a;
fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
Rows {
bounds: self.bounds.borrow(),
values: self.values,
}
}
}
impl<BC: Container<u64>> Container<Row> for Rows<BC, Vec<u8>> {
type Borrowed<'a>
= Rows<BC::Borrowed<'a>, &'a [u8]>
where
BC: 'a;
fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
Rows {
bounds: self.bounds.borrow(),
values: self.values.borrow(),
}
}
}

impl<'a, BC: AsBytes<'a>, VC: AsBytes<'a>> AsBytes<'a> for Rows<BC, VC> {
fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
self.bounds.as_bytes().chain(self.values.as_bytes())
}
}
impl<'a, BC: FromBytes<'a>, VC: FromBytes<'a>> FromBytes<'a> for Rows<BC, VC> {
fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
Self {
bounds: FromBytes::from_bytes(bytes),
values: FromBytes::from_bytes(bytes),
}
}
}

impl<BC: Len, VC> Len for Rows<BC, VC> {
#[inline(always)]
fn len(&self) -> usize {
self.bounds.len()
}
}

impl<'a, BC: Len + IndexAs<u64>> Index for Rows<BC, &'a [u8]> {
type Ref = &'a RowRef;
#[inline(always)]
fn get(&self, index: usize) -> Self::Ref {
let lower = if index == 0 {
0
} else {
self.bounds.index_as(index - 1)
};
let upper = self.bounds.index_as(index);
let lower: usize = lower.try_into().unwrap();
let upper: usize = upper.try_into().unwrap();
RowRef::from_slice(&self.values[lower..upper])
}
}
impl<'a, BC: Len + IndexAs<u64>> Index for &'a Rows<BC, Vec<u8>> {
type Ref = &'a RowRef;
#[inline(always)]
fn get(&self, index: usize) -> Self::Ref {
let lower = if index == 0 {
0
} else {
self.bounds.index_as(index - 1)
};
let upper = self.bounds.index_as(index);
let lower: usize = lower.try_into().unwrap();
let upper: usize = upper.try_into().unwrap();
RowRef::from_slice(&self.values[lower..upper])
}
}

impl<BC: Push<u64>> Push<&Row> for Rows<BC> {
#[inline(always)]
fn push(&mut self, item: &Row) {
self.values.extend_from_slice(item.data.as_slice());
self.bounds.push(u64::cast_from(self.values.len()));
}
}
impl<BC: Push<u64>> Push<&RowRef> for Rows<BC> {
fn push(&mut self, item: &RowRef) {
self.values.extend_from_slice(item.data());
self.bounds.push(u64::cast_from(self.values.len()));
}
}
impl<BC: Clear, VC: Clear> Clear for Rows<BC, VC> {
fn clear(&mut self) {
self.bounds.clear();
self.values.clear();
}
}
impl<BC: HeapSize, VC: HeapSize> HeapSize for Rows<BC, VC> {
fn heap_size(&self) -> (usize, usize) {
let (l0, c0) = self.bounds.heap_size();
let (l1, c1) = self.values.heap_size();
(l0 + l1, c0 + c1)
}
}
}

/// A contiguous slice of bytes that are row data.
///
/// A [`RowRef`] is to [`Row`] as [`prim@str`] is to [`String`].
Expand Down Expand Up @@ -501,6 +640,13 @@ impl Ord for RowRef {
}
}

// Note that the Hash implementation must strictly follow `Row`'s hash implementation.
impl Hash for RowRef {
fn hash<H: Hasher>(&self, state: &mut H) {
self.data().hash(state)
}
}

impl fmt::Debug for RowRef {
/// Debug representation using the internal datums
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down

0 comments on commit 980b098

Please sign in to comment.