Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions vortex-python/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

fn main() {
let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default();

if target_os == "macos" {
#[cfg(target_os = "macos")]
{
// For pyo3 to successfully link on macOS.
// See https://stackoverflow.com/a/77382609
println!("cargo:rustc-link-arg=-undefined");
Expand Down
22 changes: 13 additions & 9 deletions vortex-python/src/arrays/from_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow_data::ArrayData as ArrowArrayData;
use arrow_schema::DataType;
use arrow_schema::Field;
use pyo3::exceptions::PyValueError;
use pyo3::intern;
use pyo3::prelude::*;
use vortex::array::ArrayRef;
use vortex::array::IntoArray;
Expand All @@ -20,23 +21,26 @@ use vortex::error::VortexResult;

use crate::arrays::PyArrayRef;
use crate::arrow::FromPyArrow;
use crate::classes::array_class;
use crate::classes::chunked_array_class;
use crate::classes::table_class;
use crate::error::PyVortexError;
use crate::error::PyVortexResult;

/// Convert an Arrow object to a Vortex array.
pub(super) fn from_arrow(obj: &Borrowed<'_, '_, PyAny>) -> PyVortexResult<PyArrayRef> {
let pa = obj.py().import("pyarrow")?;
let pa_array = pa.getattr("Array")?;
let chunked_array = pa.getattr("ChunkedArray")?;
let table = pa.getattr("Table")?;
let py = obj.py();
let pa_array = array_class(py)?;
let chunked_array = chunked_array_class(py)?;
let table = table_class(py)?;

if obj.is_instance(&pa_array)? {
if obj.is_instance(pa_array)? {
let arrow_array = ArrowArrayData::from_pyarrow(&obj.as_borrowed()).map(make_array)?;
let is_nullable = arrow_array.is_nullable();
let enc_array = ArrayRef::from_arrow(arrow_array.as_ref(), is_nullable)?;
Ok(PyArrayRef::from(enc_array))
} else if obj.is_instance(&chunked_array)? {
let chunks: Vec<Bound<PyAny>> = obj.getattr("chunks")?.extract()?;
} else if obj.is_instance(chunked_array)? {
let chunks: Vec<Bound<PyAny>> = obj.getattr(intern!(py, "chunks"))?.extract()?;
let encoded_chunks = chunks
.iter()
.map(|a| {
Expand All @@ -45,13 +49,13 @@ pub(super) fn from_arrow(obj: &Borrowed<'_, '_, PyAny>) -> PyVortexResult<PyArra
})
.collect::<PyVortexResult<Vec<_>>>()?;
let dtype: DType = obj
.getattr("type")
.getattr(intern!(py, "type"))
.and_then(|v| DataType::from_pyarrow(&v.as_borrowed()))
.map(|dt| DType::from_arrow(&Field::new("_", dt, false)))?;
Ok(PyArrayRef::from(
ChunkedArray::try_new(encoded_chunks, dtype)?.into_array(),
))
} else if obj.is_instance(&table)? {
} else if obj.is_instance(table)? {
let array_stream = ArrowArrayStreamReader::from_pyarrow(&obj.as_borrowed())?;
let dtype = DType::from_arrow(array_stream.schema());
let chunks = array_stream
Expand Down
7 changes: 4 additions & 3 deletions vortex-python/src/arrays/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use pyo3::IntoPyObjectExt;
use pyo3::exceptions::PyIndexError;
use pyo3::exceptions::PyTypeError;
use pyo3::exceptions::PyValueError;
use pyo3::intern;
use pyo3::prelude::*;
use pyo3::types::PyDict;
use pyo3::types::PyList;
Expand Down Expand Up @@ -742,7 +743,7 @@ impl PyArray {
let dtype_buffers: Vec<Vec<u8>> = dtype_buffers.iter().map(|b| b.to_vec()).collect();

let vortex_module = PyModule::import(py, "vortex")?;
let unpickle_fn = vortex_module.getattr("_unpickle_array")?;
let unpickle_fn = vortex_module.getattr(intern!(py, "_unpickle_array"))?;

let args = (array_buffers, dtype_buffers).into_pyobject(py)?;
Ok((unpickle_fn, args.into_any()))
Expand All @@ -769,7 +770,7 @@ impl PyArray {
let dtype_buffers = encoder.encode(EncoderMessage::DType(array.dtype()))?;

let pickle_module = PyModule::import(py, "pickle")?;
let pickle_buffer_class = pickle_module.getattr("PickleBuffer")?;
let pickle_buffer_class = pickle_module.getattr(intern!(py, "PickleBuffer"))?;

let mut pickle_buffers = Vec::new();
for buf in array_buffers.into_iter() {
Expand All @@ -788,7 +789,7 @@ impl PyArray {
}

let vortex_module = PyModule::import(py, "vortex")?;
let unpickle_fn = vortex_module.getattr("_unpickle_array")?;
let unpickle_fn = vortex_module.getattr(intern!(py, "_unpickle_array"))?;

let args = (pickle_buffers, dtype_pickle_buffers).into_pyobject(py)?;
Ok((unpickle_fn, args.into_any()))
Expand Down
3 changes: 2 additions & 1 deletion vortex-python/src/arrays/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(crate) use array::*;
use pyo3::Bound;
use pyo3::PyAny;
use pyo3::exceptions::PyValueError;
use pyo3::intern;
use pyo3::prelude::PyAnyMethods;
pub(crate) use python::*;
use vortex::array::vtable::ArrayId;
Expand All @@ -19,7 +20,7 @@ use crate::error::PyVortexResult;
/// Extract the array id from a Python class `id` attribute.
pub fn id_from_obj(cls: &Bound<PyAny>) -> PyVortexResult<ArrayId> {
Ok(ArrayId::new_arc(
cls.getattr("id")
cls.getattr(intern!(cls.py(), "id"))
.map_err(|_| {
PyValueError::new_err(format!(
"PyEncoding subclass {cls:?} must have an 'id' attribute"
Expand Down
2 changes: 1 addition & 1 deletion vortex-python/src/arrays/py/vtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl VTable for PythonVTable {
}

let bytes = obj
.call_method("__vx_metadata__", (), None)
.call_method(intern!(py, "__vx_metadata__"), (), None)
.map_err(|e| vortex_err!("{}", e))?
.cast::<PyBytes>()
.map_err(|_| vortex_err!("Expected array metadata to be Python bytes"))?
Expand Down
72 changes: 42 additions & 30 deletions vortex-python/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ use pyo3::exceptions::PyValueError;
use pyo3::ffi::Py_uintptr_t;
use pyo3::ffi::c_str;
use pyo3::import_exception;
use pyo3::intern;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use pyo3::types::PyTuple;

use crate::classes::array_class;
use crate::classes::data_type_class;
use crate::classes::field_class;
use crate::classes::record_batch_reader_class;
use crate::classes::schema_class;

const SCHEMA_NAME: &CStr = c_str!("arrow_schema");
const ARRAY_NAME: &CStr = c_str!("arrow_array");
const ARRAY_STREAM_NAME: &CStr = c_str!("arrow_array_stream");
Expand Down Expand Up @@ -69,13 +76,14 @@ pub trait IntoPyArrow {

impl<'py> FromPyArrow<'_, 'py> for DataType {
fn from_pyarrow(value: &Borrowed<'_, 'py, PyAny>) -> PyResult<Self> {
if !value.hasattr("__arrow_c_schema__")? {
let py = value.py();
if !value.hasattr(intern!(py, "__arrow_c_schema__"))? {
return Err(PyValueError::new_err(
"Expected __arrow_c_schema__ attribute to be set.",
));
}

let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = value.getattr(intern!(py, "__arrow_c_schema__"))?.call0()?;
let capsule = capsule.cast::<PyCapsule>()?;

let schema_ptr = unsafe {
Expand All @@ -92,22 +100,24 @@ impl<'py> FromPyArrow<'_, 'py> for DataType {
impl ToPyArrow for DataType {
fn to_pyarrow(&self, py: Python) -> PyResult<Py<PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let module = py.import("pyarrow")?;
let class = module.getattr("DataType")?;
let dtype = class.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))?;
let dtype = data_type_class(py)?.call_method1(
intern!(py, "_import_from_c"),
(&raw const c_schema as Py_uintptr_t,),
)?;
Ok(dtype.into())
}
}

impl<'py> FromPyArrow<'_, 'py> for Field {
fn from_pyarrow(value: &Borrowed<'_, 'py, PyAny>) -> PyResult<Self> {
if !value.hasattr("__arrow_c_schema__")? {
let py = value.py();
if !value.hasattr(intern!(py, "__arrow_c_schema__"))? {
return Err(PyValueError::new_err(
"Expected __arrow_c_schema__ attribute to be set.",
));
}

let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = value.getattr(intern!(py, "__arrow_c_schema__"))?.call0()?;
let capsule = capsule.cast::<PyCapsule>()?;

let schema_ptr = unsafe {
Expand All @@ -124,22 +134,24 @@ impl<'py> FromPyArrow<'_, 'py> for Field {
impl ToPyArrow for Field {
fn to_pyarrow(&self, py: Python) -> PyResult<Py<PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let module = py.import("pyarrow")?;
let class = module.getattr("Field")?;
let dtype = class.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))?;
let dtype = field_class(py)?.call_method1(
intern!(py, "_import_from_c"),
(&raw const c_schema as Py_uintptr_t,),
)?;
Ok(dtype.into())
}
}

impl<'py> FromPyArrow<'_, 'py> for Schema {
fn from_pyarrow(value: &Borrowed<'_, 'py, PyAny>) -> PyResult<Self> {
if !value.hasattr("__arrow_c_schema__")? {
let py = value.py();
if !value.hasattr(intern!(py, "__arrow_c_schema__"))? {
return Err(PyValueError::new_err(
"Expected __arrow_c_schema__ attribute to be set.",
));
}

let capsule = value.getattr("__arrow_c_schema__")?.call0()?;
let capsule = value.getattr(intern!(py, "__arrow_c_schema__"))?.call0()?;
let capsule = capsule.cast::<PyCapsule>()?;

let schema_ptr = unsafe {
Expand All @@ -157,23 +169,24 @@ impl<'py> FromPyArrow<'_, 'py> for Schema {
impl ToPyArrow for Schema {
fn to_pyarrow(&self, py: Python) -> PyResult<Py<PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let module = py.import("pyarrow")?;
let class = module.getattr("Schema")?;
let schema =
class.call_method1("_import_from_c", (&raw const c_schema as Py_uintptr_t,))?;
let schema = schema_class(py)?.call_method1(
intern!(py, "_import_from_c"),
(&raw const c_schema as Py_uintptr_t,),
)?;
Ok(schema.into())
}
}

impl<'py> FromPyArrow<'_, 'py> for ArrayData {
fn from_pyarrow(value: &Borrowed<'_, 'py, PyAny>) -> PyResult<Self> {
if !value.hasattr("__arrow_c_array__")? {
let py = value.py();
if !value.hasattr(intern!(py, "__arrow_c_array__"))? {
return Err(PyValueError::new_err(
"Expected __arrow_c_array__ attribute to be set.",
));
}

let tuple = value.getattr("__arrow_c_array__")?.call0()?;
let tuple = value.getattr(intern!(py, "__arrow_c_array__"))?.call0()?;

if !tuple.is_instance_of::<PyTuple>() {
return Err(PyTypeError::new_err(
Expand Down Expand Up @@ -207,10 +220,8 @@ impl ToPyArrow for ArrayData {
let array = FFI_ArrowArray::new(self);
let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;

let module = py.import("pyarrow")?;
let class = module.getattr("Array")?;
let array = class.call_method1(
"_import_from_c",
let array = array_class(py)?.call_method1(
intern!(py, "_import_from_c"),
(
addr_of!(array) as Py_uintptr_t,
addr_of!(schema) as Py_uintptr_t,
Expand All @@ -222,13 +233,14 @@ impl ToPyArrow for ArrayData {

impl<'py> FromPyArrow<'_, 'py> for RecordBatch {
fn from_pyarrow(value: &Borrowed<'_, 'py, PyAny>) -> PyResult<Self> {
if !value.hasattr("__arrow_c_array__")? {
let py = value.py();
if !value.hasattr(intern!(py, "__arrow_c_array__"))? {
return Err(PyValueError::new_err(
"Expected __arrow_c_array__ attribute to be set.",
));
}

let tuple = value.getattr("__arrow_c_array__")?.call0()?;
let tuple = value.getattr(intern!(py, "__arrow_c_array__"))?.call0()?;

if !tuple.is_instance_of::<PyTuple>() {
return Err(PyTypeError::new_err(
Expand Down Expand Up @@ -286,20 +298,21 @@ impl ToPyArrow for RecordBatch {
let reader = RecordBatchIterator::new(vec![Ok(self.clone())], self.schema());
let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
let py_reader = reader.into_pyarrow(py)?;
py_reader.call_method0(py, "read_next_batch")
py_reader.call_method0(py, intern!(py, "read_next_batch"))
}
}

/// Supports conversion from `pyarrow.RecordBatchReader` to [ArrowArrayStreamReader].
impl<'py> FromPyArrow<'_, 'py> for ArrowArrayStreamReader {
fn from_pyarrow(value: &Borrowed<'_, 'py, PyAny>) -> PyResult<Self> {
if !value.hasattr("__arrow_c_stream__")? {
let py = value.py();
if !value.hasattr(intern!(py, "__arrow_c_stream__"))? {
return Err(PyValueError::new_err(
"Expected __arrow_c_stream__ attribute to be set.",
));
}

let capsule = value.getattr("__arrow_c_stream__")?.call0()?;
let capsule = value.getattr(intern!(py, "__arrow_c_stream__"))?.call0()?;
let capsule = capsule.cast::<PyCapsule>()?;

let array_ptr = capsule
Expand All @@ -323,10 +336,9 @@ impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
fn into_pyarrow(self, py: Python) -> PyResult<Py<PyAny>> {
let mut stream = FFI_ArrowArrayStream::new(self);

let module = py.import("pyarrow")?;
let class = module.getattr("RecordBatchReader")?;
let args = PyTuple::new(py, [&raw mut stream as Py_uintptr_t])?;
let reader = class.call_method1("_import_from_c", args)?;
let reader =
record_batch_reader_class(py)?.call_method1(intern!(py, "_import_from_c"), args)?;

Ok(Py::from(reader))
}
Expand Down
59 changes: 59 additions & 0 deletions vortex-python/src/classes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Caching often accesses classes that are accessed across the C ABI

use pyo3::Bound;
use pyo3::Py;
use pyo3::PyResult;
use pyo3::Python;
use pyo3::sync::PyOnceLock;
use pyo3::types::PyType;

/// Returns the pyarrow.DataType class
pub fn data_type_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "DataType")
}

/// Returns the pyarrow.Field class
pub fn field_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "Field")
}

/// Returns the pyarrow.Schema class
pub fn schema_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "Schema")
}

/// Returns the pyarrow.Array class
pub fn array_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "Array")
}

/// Returns the pyarrow.ChunkedArray class
pub fn chunked_array_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "ChunkedArray")
}

/// Returns the pyarrow.RecordBatchReader class
pub fn record_batch_reader_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "RecordBatchReader")
}

/// Returns the pyarrow.Table class
pub fn table_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "Table")
}

/// Returns the pyarrow.Decimal class
pub fn decimal_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "decimal", "Decimal")
}
Loading
Loading