-
Notifications
You must be signed in to change notification settings - Fork 145
Expand file tree
/
Copy pathconversions.rs
More file actions
229 lines (200 loc) · 7.75 KB
/
conversions.rs
File metadata and controls
229 lines (200 loc) · 7.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use futures::StreamExt;
use futures::TryStreamExt;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::async_reader::ParquetRecordBatchStream;
use sysinfo::System;
use tokio::fs::File;
use tokio::fs::OpenOptions;
use tokio::fs::create_dir_all;
use tokio::io::AsyncWriteExt;
use tracing::Instrument;
use tracing::info;
use tracing::trace;
use vortex::VortexSessionDefault;
use vortex::array::ArrayRef;
use vortex::array::IntoArray;
use vortex::array::VortexSessionExecute;
use vortex::array::arrays::ChunkedArray;
use vortex::array::arrow::FromArrowArray;
use vortex::array::builders::builder_with_capacity;
use vortex::array::stream::ArrayStreamAdapter;
use vortex::array::stream::ArrayStreamExt;
use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex::error::VortexResult;
use vortex::error::vortex_err;
use vortex::file::WriteOptionsSessionExt;
use vortex::session::VortexSession;
use crate::CompactionStrategy;
use crate::Format;
use crate::SESSION;
use crate::utils::file::idempotent_async;
/// Memory budget per concurrent conversion stream in GB. This is somewhat arbitary.
const MEMORY_PER_STREAM_GB: u64 = 4;
/// Minimum number of concurrent conversion streams.
const MIN_CONCURRENCY: u64 = 1;
/// Maximum number of concurrent conversion streams. This is somewhat arbitary.
const MAX_CONCURRENCY: u64 = 16;
/// Returns the available system memory in bytes.
fn available_memory_bytes() -> u64 {
System::new_all().available_memory()
}
/// Calculate appropriate concurrency based on available memory.
fn calculate_concurrency() -> usize {
let available_gb = available_memory_bytes() / (1024 * 1024 * 1024);
let concurrency = (available_gb / MEMORY_PER_STREAM_GB).clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);
info!(
"Available memory: {}GB, maximum concurrency is: {}",
available_gb, concurrency
);
concurrency as usize
}
/// Read a Parquet file and return it as a Vortex [`ChunkedArray`].
///
/// Note: This loads the entire file into memory. For large files, use the streaming conversion like
/// in [`parquet_to_vortex_stream`] instead.
pub async fn parquet_to_vortex_chunks(parquet_path: PathBuf) -> anyhow::Result<ChunkedArray> {
let file = File::open(parquet_path).await?;
let builder = ParquetRecordBatchStreamBuilder::new(file).await?;
let reader = builder.build()?;
let chunks: Vec<ArrayRef> = parquet_to_vortex_stream(reader)
.map(|r| r.map_err(anyhow::Error::from))
.try_collect()
.await?;
Ok(ChunkedArray::from_iter(chunks))
}
/// Create a streaming Vortex array from a Parquet reader.
///
/// Streams record batches and converts them to Vortex arrays on-the-fly, avoiding loading the
/// entire file into memory.
pub fn parquet_to_vortex_stream(
reader: ParquetRecordBatchStream<File>,
) -> impl futures::Stream<Item = VortexResult<ArrayRef>> {
reader.map(move |result| {
result.map_err(|e| vortex_err!(External: e)).and_then(|rb| {
let chunk = ArrayRef::from_arrow(rb, false)?;
let mut builder = builder_with_capacity(chunk.dtype(), chunk.len());
// Canonicalize the chunk.
chunk.append_to_builder(
builder.as_mut(),
&mut VortexSession::default().create_execution_ctx(),
)?;
Ok(builder.finish())
})
})
}
/// Convert a single Parquet file to Vortex format using streaming.
///
/// Streams data directly from Parquet to Vortex without loading the entire file into memory.
pub async fn convert_parquet_file_to_vortex(
parquet_path: &Path,
output_path: &Path,
compaction: CompactionStrategy,
) -> anyhow::Result<()> {
let file = File::open(parquet_path).await?;
let builder = ParquetRecordBatchStreamBuilder::new(file).await?;
let dtype = DType::from_arrow(builder.schema().as_ref());
let stream = parquet_to_vortex_stream(builder.build()?);
let mut output_file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(output_path)
.await?;
compaction
.apply_options(SESSION.write_options())
.write(
&mut output_file,
ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, stream)),
)
.await?;
Ok(())
}
/// Convert all Parquet files in a directory to Vortex format.
///
/// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to
/// `{input_path}/vortex-file-compressed/` (for Default compaction) or
/// `{input_path}/vortex-compact/` (for Compact compaction).
///
/// The conversion is idempotent: existing Vortex files will not be regenerated.
pub async fn convert_parquet_directory_to_vortex(
input_path: &Path,
compaction: CompactionStrategy,
) -> anyhow::Result<()> {
let (format, dir_name) = match compaction {
CompactionStrategy::Compact => (Format::VortexCompact, Format::VortexCompact.name()),
CompactionStrategy::Default => (Format::OnDiskVortex, Format::OnDiskVortex.name()),
};
let vortex_dir = input_path.join(dir_name);
let parquet_path = input_path.join(Format::Parquet.name());
create_dir_all(&vortex_dir).await?;
let parquet_inputs = fs::read_dir(&parquet_path)?.collect::<std::io::Result<Vec<_>>>()?;
trace!(
"Found {} parquet files in {}",
parquet_inputs.len(),
parquet_path.to_str().unwrap()
);
let iter = parquet_inputs
.iter()
.filter(|entry| entry.path().extension().is_some_and(|e| e == "parquet"));
let concurrency = calculate_concurrency();
futures::stream::iter(iter)
.map(|dir_entry| {
let filename = {
let mut temp = dir_entry.path();
temp.set_extension("");
temp.file_name().unwrap().to_str().unwrap().to_string()
};
let parquet_file_path = parquet_path.join(format!("{filename}.parquet"));
let output_path = vortex_dir.join(format!("{filename}.{}", format.ext()));
let span = tracing::info_span!(
"compress_file",
file = %filename,
strategy = ?compaction,
);
tokio::spawn(
async move {
idempotent_async(output_path.as_path(), move |vtx_file| async move {
info!(
"Processing file '{filename}' with {:?} strategy",
compaction
);
convert_parquet_file_to_vortex(&parquet_file_path, &vtx_file, compaction)
.await
})
.await
.expect("Failed to write Vortex file")
}
.instrument(span),
)
})
.buffer_unordered(concurrency)
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
/// Convert a Parquet file to Vortex format with the specified compaction strategy.
///
/// Uses `idempotent_async` to skip conversion if the output file already exists.
pub async fn write_parquet_as_vortex(
parquet_path: PathBuf,
vortex_path: &str,
compaction: CompactionStrategy,
) -> anyhow::Result<PathBuf> {
idempotent_async(vortex_path, |output_fname| async move {
let mut output_file = File::create(&output_fname).await?;
let data = parquet_to_vortex_chunks(parquet_path).await?;
let write_options = compaction.apply_options(SESSION.write_options());
write_options
.write(&mut output_file, data.into_array().to_array_stream())
.await?;
output_file.flush().await?;
Ok(())
})
.await
}