feat: add sort_pushdown_inexact benchmark for RG reorder#21674
feat: add sort_pushdown_inexact benchmark for RG reorder#21674zhuqi-lucas wants to merge 1 commit intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new benchmark scenario to measure DataFusion’s sort-pushdown Inexact path behavior (TopK preserved) and the impact of row-group reorder by statistics, while also enabling Parquet filter pushdown so TopK’s dynamic filters can benefit from late materialization.
Changes:
- Enable
execution.parquet.pushdown_filtersin the sort_pushdown benchmark runner to allow TopK dynamic filter pushdown to Parquet decoding. - Add
sort_pushdown_inexacttobench.sh, including data generation (single Parquet file) and a dedicated run target. - Add 4 new SQL queries under
benchmarks/queries/sort_pushdown_inexact/covering narrow/wide and small/largeLIMITvariants forORDER BY ... DESC.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| benchmarks/src/sort_pushdown.rs | Turns on Parquet filter pushdown for sort_pushdown benchmark runs. |
| benchmarks/bench.sh | Wires in sort_pushdown_inexact data + run targets and generates a single-file dataset. |
| benchmarks/queries/sort_pushdown_inexact/q1.sql | DESC TopK (narrow) query for Inexact path coverage. |
| benchmarks/queries/sort_pushdown_inexact/q2.sql | DESC TopK (narrow) with larger LIMIT to stress threshold convergence. |
| benchmarks/queries/sort_pushdown_inexact/q3.sql | DESC TopK (wide SELECT *) to show late materialization benefits. |
| benchmarks/queries/sort_pushdown_inexact/q4.sql | Wide SELECT * with larger LIMIT for cumulative effects. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| run_sort_pushdown_inexact() { | ||
| INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" | ||
| RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" | ||
| echo "Running sort pushdown Inexact benchmark (row group reorder by statistics)..." | ||
| debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} |
There was a problem hiding this comment.
run_sort_pushdown_inexact passes --sorted, which tells DataFusion the file is ordered by l_orderkey (via WITH ORDER). However, the generated shuffled.parquet is intentionally not globally sorted by l_orderkey (row groups are arranged by the bucket key). This benchmark therefore relies on the planner staying on an Inexact path (keeping TopK/Sort for correctness). Please document this assumption explicitly and/or add a guard to ensure the benchmark can’t silently become incorrect if future optimizations start treating the reversed scan as Exact and eliminate the ordering operator.
There was a problem hiding this comment.
I'm not sure what the --sorted flag is supposed to do but worth checking @zhuqi-lucas
There was a problem hiding this comment.
--sorted adds WITH ORDER (l_orderkey ASC) which is needed to trigger the reverse scan path in try_pushdown_sort — currently the only path where reorder_by_statistics is called.
After reviewing this more carefully, I realized we need two benchmark suites to cover the full optimization path:
sort_pushdown_inexact(with--sorted, DESC queries) — tests the reverse scan path where RG reorder is already supportedsort_pushdown_inexact_unsorted(without--sorted, ASC+DESC queries) — tests the Unsupported path where RG reorder will be supported in a follow-up PR (feat: reorder row groups by statistics during sort pushdown #21580)
Updated in the latest push. This way each follow-up PR can run its corresponding benchmark to show the improvement.
| let mut config = self.common.config()?; | ||
| // Enable filter pushdown so TopK's dynamic filter is pushed to the | ||
| // parquet reader for late materialization — only sort-column rows | ||
| // pass the filter, non-sort columns are skipped for filtered rows. | ||
| config.options_mut().execution.parquet.pushdown_filters = true; |
There was a problem hiding this comment.
Does this enable it for all benchmarks? Generally we control this on a per-run basis.
There was a problem hiding this comment.
Good catch @adriangb ! Fixed — reverted the sort_pushdown.rs change and moved pushdown_filters to an env var only for the inexact benchmark run. Other sort_pushdown benchmarks are unaffected.
ab5fe05 to
bc87f85
Compare
| run_sort_pushdown_inexact() { | ||
| INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact" | ||
| RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json" | ||
| echo "Running sort pushdown Inexact benchmark (row group reorder by statistics)..." | ||
| debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG} |
There was a problem hiding this comment.
I'm not sure what the --sorted flag is supposed to do but worth checking @zhuqi-lucas
| # Use datafusion-cli to bucket rows into 64 groups by a deterministic | ||
| # scrambler, then sort within each bucket by orderkey. This produces | ||
| # ~64 RG-sized segments where each has a tight orderkey range but the | ||
| # segments appear in scrambled (non-sorted) order in the file. |
There was a problem hiding this comment.
I think another interesting benchmark, which is our case at least, is when there is overlap between the row groups but some general logic. E.g.:
rg,min,max
0,0,5
1,4,7
2,5,32
In our case, this happens because there's a stream of data that's coming in with time stamps, and it should arrive at around the same time it was created, but there are always some network delays, time skew, etc. that means that it's not perfect. But data that arrived 30 minutes later is guaranteed to have timestamps in a different range than data that arrive 30 minutes before it.
There was a problem hiding this comment.
Great suggestion! Partially overlapping RGs from streaming data is a very realistic scenario. I will add a benchmark variant for this pattern when I update the PR — something like time-ordered chunks with small overlaps between adjacent chunks to simulate network delays / time skew.
There was a problem hiding this comment.
Did you plan to include it in this PR or a followup?
There was a problem hiding this comment.
Will add it in this PR — I'll create a third data variant with partially overlapping RGs (simulating streaming data with network delays) alongside the current shuffled data.
Add two benchmark suites for the Inexact sort pushdown path: 1. sort_pushdown_inexact (--sorted, DESC queries): Tests reverse scan path where RG reorder by statistics is applied. 4 queries: narrow/wide rows x LIMIT 100/1000. 2. sort_pushdown_inexact_unsorted (no WITH ORDER): Tests Unsupported path for future RG reorder support without declared file ordering. 6 queries: ASC/DESC x narrow/wide. Both use a single shuffled parquet file with out-of-order row groups and enable pushdown_filters via env var for late materialization. Closes apache#21582
bc87f85 to
9290773
Compare
Which issue does this PR close?
Closes #21582
Rationale for this change
The existing sort_pushdown benchmarks only cover the Exact path (sort elimination). The Inexact path — where TopK is preserved and row group reorder by statistics helps threshold convergence — had no benchmark to measure its impact.
What changes are included in this PR?
sort_pushdown_inexactwith 4 DESC LIMIT queries (narrow/wide rows, small/large LIMIT)pushdown_filtersin sort_pushdown benchmarks so TopK's dynamic filter is pushed to the parquet reader for late materializationHow to run
Or on GKE:
@alamb benchmark sort_pushdown_inexactAre these changes tested?
Benchmark code only — validated locally.
Are there any user-facing changes?
No. New benchmark only.