Handle canceled partitioned hash join dynamic filters lazily#21666
Handle canceled partitioned hash join dynamic filters lazily#21666adriangb wants to merge 11 commits intoapache:mainfrom
Conversation
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (d17d5e4) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (d17d5e4) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (d17d5e4) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmark tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (d17d5e4) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
query 72 took a bit of a hit here. Query72select i_item_desc
,w_warehouse_name
,d1.d_week_seq
,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
,sum(case when p_promo_sk is not null then 1 else 0 end) promo
,count(*) total_cnt
from catalog_sales
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join item on (i_item_sk = cs_item_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
and inv_quantity_on_hand < cs_quantity
and d3.d_date > (d1.d_date + INTERVAL '5 days')
and hd_buy_potential = '1001-5000'
and d1.d_year = 2001
and cd_marital_status = 'M'
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
limit 100; |
|
Yes. And I think this was another bandaid. But it's closer to the root cause than previous attempts. This has to do with cancellation when multiple joins are involved. TLDR I think what is happening is when you have multiple joins you end up with a tree of operators. One of the joins up higher in the tree hits the new optimization and aborts work, dropping tasks that would have polled downstream joins. But not the downstream join is stuck waiting for all of it's partition tasks to finish even though they never will. I think we were all operating under the assumption that the issue was within a single join operator but really it's an issue any time an upstream operator cancels on a join. I think the real solution is to track when a join build partition task gets dropped and report that to the dynamic filter building so that it doesn't wait for that partition to report. |
|
run benchmark tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (0584854) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
There was a problem hiding this comment.
Pull request overview
Fixes a hang in partitioned hash join dynamic filter coordination by allowing partitions that are dropped early to be treated as “canceled” and not block filter finalization (DataFusion issue #21625).
Changes:
- Add cancellation tracking for partitioned build-side reports and treat canceled partitions as
truein the synthesized partitioned dynamic filter. - Mark partitioned
HashJoinStreampartitions as canceled onDropwhen they never reported build data. - Add a regression test covering the early-completing
RightSemiparent join scenario that previously hung.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| datafusion/physical-plan/src/joins/hash_join/stream.rs | Track whether build info was reported and report partition cancellation to the coordinator on Drop. |
| datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs | Replace barrier-based coordination with explicit partition status + notify-based completion; synthesize filters that handle canceled partitions. |
| datafusion/physical-plan/src/joins/hash_join/exec.rs | Refactor dynamic-filter accumulator initialization and add a regression test for the cancellation/hang scenario. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| enum CompletionState { | ||
| Pending, | ||
| Finalizing, | ||
| Ready(std::result::Result<(), String>), |
There was a problem hiding this comment.
CompletionState::Ready stores errors as Result<(), String>, which forces later callers to lose the original DataFusionError variant/backtrace/context. Consider storing Result<(), Arc<DataFusionError>> (or datafusion_common::SharedResult<()>) in CompletionState instead, so you can propagate DataFusionError::Shared(...) to all waiters without stringifying.
| Ready(std::result::Result<(), String>), | |
| Ready(std::result::Result<(), Arc<datafusion_common::DataFusionError>>), |
| let guard = self.inner.lock(); | ||
| match &guard.completion { | ||
| CompletionState::Ready(Ok(())) => return Ok(()), | ||
| CompletionState::Ready(Err(err)) => { | ||
| return Err(DataFusionError::Execution(err.clone())); | ||
| } |
There was a problem hiding this comment.
wait_for_completion converts stored failures into DataFusionError::Execution(err.clone()), which changes the error category and can make debugging harder. If you keep an Arc<DataFusionError> in CompletionState, you can return Err(DataFusionError::Shared(err_arc)) here and preserve the original error semantics.
| /// # Returns | ||
| /// * `Result<()>` - Ok if successful, Err if filter update failed or mode mismatch | ||
| pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { | ||
| // Store data in the accumulator | ||
| { | ||
| let finalize_input = { |
There was a problem hiding this comment.
The report_build_data doc comment still says "have reported (barrier wait)", but this method no longer uses tokio::sync::Barrier (it uses Notify/CompletionState). Please update the docs to match the current synchronization mechanism so the comment doesn’t mislead future changes.
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (7011c5d) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (7011c5d) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing codex/hash-join-empty-partition-reporting (7011c5d) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Okay I think this addresses the root cause with no performance regression or behavior changes. |
|
@RatulDawar could you let me know what you think of this solution? |
|
@adriangb went through the solution and the PR, makes much more sense to remove barrier and track states instead here, this also give a much predictable behaviour due to the mention of explicit states. Just one concern with code here, reaching to a correct state here depends on if the person calls report_build_data. Can we have a state transition method so that build data is automatically reported and we would just need to call the state changes method ike existing state_after_build_ready. |
Replaces the manual PartitionBuildData construction + report_build_data call + build_reported flag set in collect_build_side with a single transition_after_build_collected method, making it impossible to forget to report build data when transitioning state. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@RatulDawar does 5ad96d9 help? |
Which issue does this PR close?
Rationale for this change
Partitioned hash join dynamic filters assumed every build-side partition would eventually report build data to the shared coordinator. That assumption breaks when an upstream partitioned operator legally short-circuits and drops a child hash-join partition before it is ever polled far enough to report.
In the original reproducer, a parent
RightSemijoin completes early for partitions whose own build side is empty. That causes child partitioned hash-join streams to be dropped while still waiting to build/report their dynamic-filter contribution. Sibling partitions then wait forever for reports that will never arrive.What changes are included in this PR?
Droptruein the synthesized partitioned filter so they do not block completion or incorrectly filter probe rowsfalseAre these changes tested?
cargo fmt --allcargo test -p datafusion-physical-plan test_partitioned_dynamic_filter_reports_empty_canceled_partitions -- --nocapturecargo test -p datafusion --test core_integration physical_optimizer::filter_pushdown::test_hashjoin_dynamic_filter_pushdown_partitioned -- --nocapturecargo test -p datafusion --test core_integration physical_optimizer::filter_pushdown::test_hashjoin_dynamic_filter_pushdown_partitioned --features force_hash_collisions -- --nocapturetest_partitioned_dynamic_filter_reports_empty_canceled_partitionstimes out on the pre-fix revision and passes on this branchcargo clippy --all-targets --all-features -- -D warningsstill fails on an unrelated existing workspace lint indatafusion/expr/src/logical_plan/plan.rs:3773(clippy::mutable_key_type).Are there any user-facing changes?
No.