Skip to content

Commit 414149e

Browse files
committed
clean up
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 2ca61ed commit 414149e

File tree

9 files changed

+494
-93
lines changed

9 files changed

+494
-93
lines changed

benchmarks/compress-bench/src/main.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,19 @@ struct Args {
7070
output_path: Option<PathBuf>,
7171
#[arg(long)]
7272
tracing: bool,
73+
/// Format for the primary stderr log sink. `text` is the default human-readable format;
74+
/// `json` emits one JSON object per event, suitable for piping into `jq`.
75+
///
76+
/// See [`LogFormat`] for the full details.
77+
#[arg(long, value_enum, default_value_t = LogFormat::Text)]
78+
log_format: LogFormat,
7379
}
7480

7581
#[tokio::main]
7682
async fn main() -> anyhow::Result<()> {
7783
let args = Args::parse();
7884

79-
setup_logging_and_tracing(args.verbose, args.tracing, LogFormat::Text)?;
85+
setup_logging_and_tracing(args.verbose, args.tracing, args.log_format)?;
8086

8187
run_compress(
8288
args.iterations,

vortex-bench/src/bin/data-gen.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ struct Args {
3838
#[arg(long)]
3939
tracing: bool,
4040

41-
/// Format for the primary stderr log sink. `text` is the default human
42-
/// readable format; `json` emits newline-delimited JSON suitable for `jq`.
41+
/// Format for the primary stderr log sink. `text` is the default
42+
/// human-readable format; `json` emits one JSON object per event, suitable
43+
/// for piping into `jq`. See [`LogFormat`] for the full details.
4344
#[arg(long, value_enum, default_value_t = LogFormat::Text)]
4445
log_format: LogFormat,
4546

vortex-btrblocks/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ num-traits = { workspace = true }
2020
pco = { workspace = true, optional = true }
2121
rand = { workspace = true }
2222
rustc-hash = { workspace = true }
23-
tracing = { workspace = true, features = ["std", "attributes"] }
23+
tracing = { workspace = true }
2424
vortex-alp = { workspace = true }
2525
vortex-array = { workspace = true }
2626
vortex-buffer = { workspace = true }

vortex-btrblocks/src/canonical_compressor.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,9 @@ pub struct BtrBlocksCompressor(
3939
impl BtrBlocksCompressor {
4040
/// Compresses an array using BtrBlocks-inspired compression.
4141
///
42-
/// This call is instrumented with a `vortex_compressor::cascade` span named
43-
/// `BtrBlocksCompressor::compress` so that downstream tracing consumers
44-
/// (e.g. `tracing-perfetto`) have a distinct entry frame to pivot on, nested
45-
/// above the generic [`CascadingCompressor::compress`] span that actually
46-
/// runs the pipeline. See the `Observability` section of the
47-
/// [`vortex_compressor`] crate docs for the full tracing reference.
48-
#[tracing::instrument(
49-
target = "vortex_compressor::cascade",
50-
name = "BtrBlocksCompressor::compress",
51-
level = "trace",
52-
skip_all,
53-
fields(
54-
len = array.len(),
55-
nbytes = array.nbytes(),
56-
dtype = %array.dtype(),
57-
),
58-
)]
42+
/// This is a thin delegate to [`CascadingCompressor::compress`], which owns the tracing
43+
/// instrumentation. See the `Observability` section of the [`vortex_compressor`] crate
44+
/// docs for the full tracing reference.
5945
pub fn compress(&self, array: &ArrayRef) -> VortexResult<ArrayRef> {
6046
self.0.compress(array)
6147
}

vortex-btrblocks/src/lib.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,12 @@
5454
//!
5555
//! # Observability
5656
//!
57-
//! [`BtrBlocksCompressor`] participates in the [`vortex_compressor`] tracing target system.
58-
//! See the [`vortex_compressor` crate docs](vortex_compressor#observability) for the full
59-
//! reference on targets, spans, and events.
57+
//! [`BtrBlocksCompressor`] is a thin delegate to [`CascadingCompressor`], which owns all
58+
//! tracing instrumentation. See the
59+
//! [`vortex_compressor` crate docs](vortex_compressor#observability) for the full reference
60+
//! on targets, spans, and events.
6061
//!
61-
//! The top-level [`BtrBlocksCompressor::compress`] call adds its own
62-
//! `vortex_compressor::cascade` span (named `BtrBlocksCompressor::compress`) that nests
63-
//! above the generic cascading-compressor pipeline, giving downstream trace consumers a
64-
//! distinct entry frame.
65-
//!
66-
//! Quick start — one line per leaf with scheme, estimated ratio, actual ratio, accepted?:
62+
//! Quick start: one line per leaf with scheme, estimated ratio, actual ratio, accepted?:
6763
//!
6864
//! ```text
6965
//! RUST_LOG=vortex_compressor::encode=debug cargo test -p vortex-btrblocks

vortex-btrblocks/src/schemes/integer.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,10 +294,6 @@ impl Scheme for ZigZagScheme {
294294

295295
let compressed = compressor.compress_child(&encoded.into_array(), &ctx, self.id(), 0)?;
296296

297-
// NOTE: scheme-level compression results are emitted centrally as the
298-
// `scheme.compress_result` event on the `vortex_compressor::encode`
299-
// target. See the `Observability` section of the `vortex_compressor` crate docs.
300-
301297
Ok(ZigZag::try_new(compressed)?.into_array())
302298
}
303299
}

vortex-compressor/src/compressor.rs

Lines changed: 82 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::builtins::IntDictScheme;
3838
use crate::ctx::CompressorContext;
3939
use crate::estimate::CompressionEstimate;
4040
use crate::estimate::DeferredEstimate;
41+
use crate::estimate::EstimateFn;
4142
use crate::estimate::EstimateVerdict;
4243
use crate::estimate::estimate_compression_ratio_with_sampling;
4344
use crate::estimate::is_better_ratio;
@@ -72,13 +73,15 @@ const TARGET_CASCADE: &str = "vortex_compressor::cascade";
7273
/// Emits a structured `scheme.evaluated` trace event on [`TARGET_SELECT`] for one scheme's
7374
/// initial estimation verdict.
7475
///
75-
/// For `Ratio(r)` the numeric estimate is recorded directly. For `Sample` and `Estimate`
76-
/// the ratio is not yet known at this point; a follow-up `scheme.evaluated.resolved` event
77-
/// is emitted by the caller after the deferred computation finishes.
76+
/// For [`CompressionEstimate::Verdict(EstimateVerdict::Ratio)`] the numeric estimate is recorded
77+
/// directly as a typed `f64`, so JSON subscribers get a proper number. For all other variants the
78+
/// `ratio` field is omitted entirely. The `kind` field distinguishes the variants. For deferred
79+
/// estimates a follow-up `scheme.evaluated.resolved` event is emitted by the caller once the
80+
/// deferred computation finishes.
7881
///
79-
/// Defined as a standalone helper (rather than inlined) because the `match` expression that
80-
/// extracts `kind` and the optional `ratio` field is the only repetition worth factoring out
81-
/// of [`CascadingCompressor::choose_best_scheme`].
82+
/// Defined as a standalone helper (rather than inlined) because the `match` expression
83+
/// that extracts `kind` is the only repetition worth factoring out of
84+
/// [`CascadingCompressor::choose_best_scheme`].
8285
fn emit_scheme_evaluated(scheme: &'static dyn Scheme, estimate: &CompressionEstimate) {
8386
let kind: &'static str = match estimate {
8487
CompressionEstimate::Verdict(EstimateVerdict::Skip) => "Skip",
@@ -542,7 +545,6 @@ impl CascadingCompressor {
542545
/// registration order (earlier in the list wins).
543546
///
544547
/// [`expected_compression_ratio`]: Scheme::expected_compression_ratio
545-
#[allow(clippy::cognitive_complexity, reason = "tracing sometimes enabled")]
546548
fn choose_best_scheme(
547549
&self,
548550
schemes: &[&'static dyn Scheme],
@@ -571,53 +573,16 @@ impl CascadingCompressor {
571573
}
572574
}
573575
CompressionEstimate::Deferred(DeferredEstimate::Sample) => {
574-
let sample_ratio = estimate_compression_ratio_with_sampling(
575-
scheme,
576-
self,
577-
data.array(),
578-
ctx.clone(),
579-
)?;
580-
581-
tracing::trace!(
582-
target: TARGET_SELECT,
583-
scheme = %scheme.id(),
584-
kind = "Sample",
585-
ratio = sample_ratio,
586-
"scheme.evaluated.resolved",
587-
);
588-
589-
if is_better_ratio(sample_ratio, &best) {
590-
best = Some((scheme, sample_ratio));
591-
}
576+
self.check_sample_scheme(data, &ctx, &mut best, scheme)?;
592577
}
593578
CompressionEstimate::Deferred(DeferredEstimate::Callback(estimate_callback)) => {
594-
let verdict = estimate_callback(self, data, ctx.clone())?;
595-
let resolved_kind = match verdict {
596-
EstimateVerdict::Skip => "Skip",
597-
EstimateVerdict::AlwaysUse => "AlwaysUse",
598-
EstimateVerdict::Ratio(_) => "Ratio",
599-
};
600-
if let EstimateVerdict::Ratio(ratio) = verdict {
601-
tracing::trace!(
602-
target: TARGET_SELECT,
603-
scheme = %scheme.id(),
604-
kind = "Estimate",
605-
resolved_kind,
606-
ratio,
607-
"scheme.evaluated.resolved",
608-
);
609-
} else {
610-
tracing::trace!(
611-
target: TARGET_SELECT,
612-
scheme = %scheme.id(),
613-
kind = "Estimate",
614-
resolved_kind,
615-
"scheme.evaluated.resolved",
616-
);
617-
}
618-
if let Some(winner_estimate) =
619-
Self::check_and_update_estimate_verdict(&mut best, scheme, verdict)
620-
{
579+
if let Some(winner_estimate) = self.check_estimate_callback(
580+
data,
581+
&ctx,
582+
&mut best,
583+
scheme,
584+
estimate_callback,
585+
)? {
621586
return Ok(Some((scheme, winner_estimate)));
622587
}
623588
}
@@ -627,6 +592,71 @@ impl CascadingCompressor {
627592
Ok(best.map(|(scheme, ratio)| (scheme, WinnerEstimate::Ratio(ratio))))
628593
}
629594

595+
/// Helper function for sampling a scheme to get an estimated compression ratio.
596+
fn check_sample_scheme(
597+
&self,
598+
data: &mut ArrayAndStats,
599+
ctx: &CompressorContext,
600+
best: &mut Option<(&'static dyn Scheme, f64)>,
601+
scheme: &'static dyn Scheme,
602+
) -> VortexResult<()> {
603+
let sample_ratio =
604+
estimate_compression_ratio_with_sampling(scheme, self, data.array(), ctx.clone())?;
605+
606+
tracing::trace!(
607+
target: TARGET_SELECT,
608+
scheme = %scheme.id(),
609+
kind = "Sample",
610+
ratio = sample_ratio,
611+
"scheme.evaluated.resolved",
612+
);
613+
614+
if is_better_ratio(sample_ratio, &*best) {
615+
*best = Some((scheme, sample_ratio));
616+
}
617+
618+
Ok(())
619+
}
620+
621+
/// Helper function for running a custom compression ratio estimation callback for a scheme.
622+
fn check_estimate_callback(
623+
&self,
624+
data: &mut ArrayAndStats,
625+
ctx: &CompressorContext,
626+
best: &mut Option<(&'static dyn Scheme, f64)>,
627+
scheme: &'static dyn Scheme,
628+
estimate_callback: Box<EstimateFn>,
629+
) -> VortexResult<Option<WinnerEstimate>> {
630+
let verdict = estimate_callback(self, data, ctx.clone())?;
631+
let resolved_kind = match verdict {
632+
EstimateVerdict::Skip => "Skip",
633+
EstimateVerdict::AlwaysUse => "AlwaysUse",
634+
EstimateVerdict::Ratio(_) => "Ratio",
635+
};
636+
if let EstimateVerdict::Ratio(ratio) = verdict {
637+
tracing::trace!(
638+
target: TARGET_SELECT,
639+
scheme = %scheme.id(),
640+
kind = "Estimate",
641+
resolved_kind,
642+
ratio,
643+
"scheme.evaluated.resolved",
644+
);
645+
} else {
646+
tracing::trace!(
647+
target: TARGET_SELECT,
648+
scheme = %scheme.id(),
649+
kind = "Estimate",
650+
resolved_kind,
651+
"scheme.evaluated.resolved",
652+
);
653+
}
654+
655+
Ok(Self::check_and_update_estimate_verdict(
656+
best, scheme, verdict,
657+
))
658+
}
659+
630660
/// Updates `best` from a terminal estimate verdict.
631661
fn check_and_update_estimate_verdict(
632662
best: &mut Option<(&'static dyn Scheme, f64)>,

vortex-compressor/src/lib.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,33 @@
8282
//!
8383
//! | Event | Target | Level | Fields |
8484
//! |-----------------------------|-------------------|-------|-------------------------------------------------------------------------------------------------|
85-
//! | `scheme.evaluated` | select | trace | `scheme`, `kind`, `ratio` (Option) |
85+
//! | `scheme.evaluated` | select | trace | `scheme`, `kind`, `ratio` (only when `kind = "Ratio"`) |
8686
//! | `scheme.evaluated.resolved` | select | trace | `scheme`, `kind`, `resolved_kind`?, `ratio`? |
87-
//! | `scheme.winner` | select | debug | `scheme`, `estimated_ratio`, `candidate_count` |
88-
//! | `scheme.compress_result` | encode | debug | `scheme`, `before_nbytes`, `after_nbytes`, `estimated_ratio`, `actual_ratio`, `accepted` |
87+
//! | `scheme.winner` | select | debug | `scheme`, `candidate_count`, and either `estimated_ratio` or `always_use = true` |
88+
//! | `scheme.compress_result` | encode | debug | `scheme`, `before_nbytes`, `after_nbytes`, `actual_ratio`, `accepted`, and either `estimated_ratio` or `always_use = true` |
8989
//! | `sample.collected` | estimate | trace | `scheme`, `sample_count`, `sample_size`, `sampled_len`, `source_len` |
9090
//! | `sample.result` | estimate | debug | `scheme`, `sampled_before`, `sampled_after`, `sampled_ratio` |
91-
//! | `short_circuit` | select / cascade | debug | `reason` (`cascade_exhausted` \| `no_schemes` \| `empty` \| `all_null` \| `fell_through` \| `larger_output`), scheme?/parent? |
92-
//!
93-
//! An `estimated_ratio` of [`f64::INFINITY`] indicates a scheme that returned
94-
//! [`CompressionEstimate::AlwaysUse`](estimate::CompressionEstimate::AlwaysUse).
91+
//! | `short_circuit` | select / cascade | debug | `reason` plus reason-specific fields (see below) |
92+
//!
93+
//! ### `short_circuit` reasons and fields
94+
//!
95+
//! The `short_circuit` event reports six distinct reasons, each carrying a different set of
96+
//! fields. Downstream tooling should branch on `reason` before reading the other fields.
97+
//!
98+
//! | `reason` | Target | Additional fields |
99+
//! |---------------------|-----------|---------------------------------------------------------------------------------------|
100+
//! | `cascade_exhausted` | cascade | `parent`, `child_index` |
101+
//! | `no_schemes` | select | — (no additional fields) |
102+
//! | `empty` | select | — (no additional fields) |
103+
//! | `all_null` | select | — (no additional fields) |
104+
//! | `fell_through` | select | `candidate_count` |
105+
//! | `larger_output` | select | `scheme`, `before_nbytes`, `after_nbytes`, `actual_ratio`, plus either `estimated_ratio` or `always_use = true` |
106+
//!
107+
//! The `always_use` boolean (emitted on `scheme.winner`, `scheme.compress_result`, and the
108+
//! `larger_output` short-circuit) indicates the winner was a scheme that returned
109+
//! [`EstimateVerdict::AlwaysUse`](estimate::EstimateVerdict::AlwaysUse) and therefore
110+
//! did not produce a numeric estimate. It is emitted in place of `estimated_ratio` so that
111+
//! JSON subscribers never see a non-finite number in the `estimated_ratio` slot.
95112
//!
96113
//! Field names are considered stable and are meant to be matched directly by downstream
97114
//! observability tooling. This means `tracing-opentelemetry`, `tracing-perfetto`, and

0 commit comments

Comments
 (0)