Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions vortex-array/src/arrays/filter/execute/bitbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,50 @@ pub(super) fn filter_bit_buffer(bb: &BitBuffer, mask: &MaskValues) -> BitBuffer
}

fn filter_bitbuffer_by_indices(bb: &BitBuffer, indices: &[usize]) -> BitBuffer {
if indices.is_empty() {
return BitBuffer::empty();
}

let mut out = BitBufferMut::with_capacity(indices.len());
let bools = bb.inner().as_ref();
let bit_offset = bb.offset();

// FIXME(ngates): this is slower than it could be!
BitBufferMut::collect_bool(indices.len(), |idx| {
let idx = *unsafe { indices.get_unchecked(idx) };
get_bit(bools, bit_offset + idx) // Panics if out of bounds.
})
.freeze()
// Scan for contiguous runs in the indices and copy them in bulk.
let mut i = 0;
while i < indices.len() {
let run_start = indices[i];
let mut run_end = run_start + 1;
let mut j = i + 1;
while j < indices.len() && indices[j] == run_end {
run_end += 1;
j += 1;
}

let run_len = j - i;
if run_len >= 64 {
// Bulk copy for long contiguous runs.
out.append_buffer(&bb.slice(run_start..run_end));
} else {
// Gather individual bits for short/scattered indices.
for k in i..j {
let idx = unsafe { *indices.get_unchecked(k) };
out.append(get_bit(bools, bit_offset + idx));
}
}

i = j;
}

out.freeze()
}

#[allow(unused)]
fn filter_bitbuffer_by_slices(bb: &BitBuffer, slices: &[(usize, usize)]) -> BitBuffer {
let bools = bb.inner().as_ref();
let bit_offset = bb.offset();
let output_len: usize = slices.iter().map(|(start, end)| end - start).sum();

let mut out = BitBufferMut::with_capacity(output_len);

// FIXME(ngates): this is slower than it could be!
for &(start, end) in slices {
for idx in start..end {
out.append(get_bit(bools, bit_offset + idx)); // Panics if out of bounds.
}
out.append_buffer(&bb.slice(start..end));
}

out.freeze()
Expand Down
99 changes: 57 additions & 42 deletions vortex-buffer/src/bit/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::bit::UnalignedBitChunk;
use crate::bit::count_ones::count_ones;
use crate::bit::get_bit_unchecked;
use crate::bit::ops::bitwise_binary_op;
use crate::bit::ops::bitwise_binary_op_lhs_owned;
use crate::bit::ops::bitwise_unary_op;
use crate::bit::ops::bitwise_unary_op_copy;
use crate::buffer;

/// An immutable bitset stored as a packed byte buffer.
Expand Down Expand Up @@ -57,6 +59,29 @@ impl PartialEq for BitBuffer {
return false;
}

if self.len == 0 {
return true;
}

// Fast path: both byte-aligned and same length — direct byte comparison.
if self.offset == 0 && other.offset == 0 {
let full_bytes = self.len / 8;
let self_bytes = &self.buffer.as_slice()[..full_bytes];
let other_bytes = &other.buffer.as_slice()[..full_bytes];
if self_bytes != other_bytes {
return false;
}
// Compare remaining bits in the last partial byte.
let rem = self.len % 8;
if rem != 0 {
let mask = (1u8 << rem) - 1;
let a = self.buffer.as_slice()[full_bytes] & mask;
let b = other.buffer.as_slice()[full_bytes] & mask;
return a == b;
}
return true;
}

self.chunks()
.iter_padded()
.zip(other.chunks().iter_padded())
Expand Down Expand Up @@ -315,11 +340,13 @@ impl BitBuffer {
}

/// Get the number of set bits in the buffer.
#[inline]
pub fn true_count(&self) -> usize {
count_ones(self.buffer.as_slice(), self.offset, self.len)
}

/// Get the number of unset bits in the buffer.
#[inline]
pub fn false_count(&self) -> usize {
self.len - self.true_count()
}
Expand All @@ -343,12 +370,14 @@ impl BitBuffer {
pub fn sliced(&self) -> Self {
if self.offset.is_multiple_of(8) {
return Self::new(
self.buffer.slice(self.offset / 8..self.len.div_ceil(8)),
self.buffer
.slice(self.offset / 8..(self.offset + self.len).div_ceil(8)),
self.len,
);
}

bitwise_unary_op(self.clone(), |a| a)
// Allocate directly rather than clone + identity op which would fail try_into_mut.
bitwise_unary_op_copy(self, |a| a)
}
}

Expand Down Expand Up @@ -392,7 +421,7 @@ impl BitOr for BitBuffer {

#[inline]
fn bitor(self, rhs: Self) -> Self::Output {
BitOr::bitor(&self, &rhs)
bitwise_binary_op_lhs_owned(self, &rhs, |a, b| a | b)
}
}

Expand All @@ -410,7 +439,7 @@ impl BitOr<&BitBuffer> for BitBuffer {

#[inline]
fn bitor(self, rhs: &BitBuffer) -> Self::Output {
(&self).bitor(rhs)
bitwise_binary_op_lhs_owned(self, rhs, |a, b| a | b)
}
}

Expand All @@ -437,7 +466,7 @@ impl BitAnd<&BitBuffer> for BitBuffer {

#[inline]
fn bitand(self, rhs: &BitBuffer) -> Self::Output {
(&self).bitand(rhs)
bitwise_binary_op_lhs_owned(self, rhs, |a, b| a & b)
}
}

Expand All @@ -446,7 +475,7 @@ impl BitAnd<BitBuffer> for BitBuffer {

#[inline]
fn bitand(self, rhs: BitBuffer) -> Self::Output {
(&self).bitand(&rhs)
bitwise_binary_op_lhs_owned(self, &rhs, |a, b| a & b)
}
}

Expand All @@ -455,7 +484,9 @@ impl Not for &BitBuffer {

#[inline]
fn not(self) -> Self::Output {
!self.clone()
// Allocate directly rather than clone+try_into_mut, which always fails
// since the clone shares the Arc with the original reference.
bitwise_unary_op_copy(self, |a| !a)
}
}

Expand All @@ -482,7 +513,7 @@ impl BitXor<&BitBuffer> for BitBuffer {

#[inline]
fn bitxor(self, rhs: &BitBuffer) -> Self::Output {
(&self).bitxor(rhs)
bitwise_binary_op_lhs_owned(self, rhs, |a, b| a ^ b)
}
}

Expand All @@ -495,6 +526,11 @@ impl BitBuffer {
bitwise_binary_op(self, rhs, |a, b| a & !b)
}

/// Owned variant of [`bitand_not`](Self::bitand_not) that can mutate in-place when possible.
pub fn into_bitand_not(self, rhs: &BitBuffer) -> BitBuffer {
bitwise_binary_op_lhs_owned(self, rhs, |a, b| a & !b)
}

/// Iterate through bits in a buffer.
///
/// # Arguments
Expand All @@ -514,44 +550,23 @@ impl BitBuffer {
return;
}

let is_bit_set = |byte: u8, bit_idx: usize| (byte & (1 << bit_idx)) != 0;
let bit_offset = self.offset % 8;
let mut buffer_ptr = unsafe { self.buffer.as_ptr().add(self.offset / 8) };
let mut callback_idx = 0;

// Handle incomplete first byte.
if bit_offset > 0 {
let bits_in_first_byte = (8 - bit_offset).min(total_bits);
let byte = unsafe { *buffer_ptr };

for bit_idx in 0..bits_in_first_byte {
f(callback_idx, is_bit_set(byte, bit_offset + bit_idx));
callback_idx += 1;
}

buffer_ptr = unsafe { buffer_ptr.add(1) };
}

// Process complete bytes.
let complete_bytes = (total_bits - callback_idx) / 8;
for _ in 0..complete_bytes {
let byte = unsafe { *buffer_ptr };
// Process in 64-bit chunks for better ILP and fewer loop iterations.
let chunks = self.chunks();
let chunks_count = total_bits / 64;
let remainder = total_bits % 64;

for bit_idx in 0..8 {
f(callback_idx, is_bit_set(byte, bit_idx));
callback_idx += 1;
for (chunk_idx, chunk) in chunks.iter().enumerate() {
let base = chunk_idx * 64;
for bit_idx in 0..64 {
f(base + bit_idx, (chunk >> bit_idx) & 1 == 1);
}
buffer_ptr = unsafe { buffer_ptr.add(1) };
}

// Handle remaining bits at the end.
let remaining_bits = total_bits - callback_idx;
if remaining_bits > 0 {
let byte = unsafe { *buffer_ptr };

for bit_idx in 0..remaining_bits {
f(callback_idx, is_bit_set(byte, bit_idx));
callback_idx += 1;
if remainder != 0 {
let rem_chunk = chunks.remainder_bits();
let base = chunks_count * 64;
for bit_idx in 0..remainder {
f(base + bit_idx, (rem_chunk >> bit_idx) & 1 == 1);
}
}
}
Expand Down
Loading
Loading