perf: Implement physical execution of uncorrelated scalar subqueries#21240
perf: Implement physical execution of uncorrelated scalar subqueries#21240neilconway wants to merge 69 commits intoapache:mainfrom
Conversation
| pub struct DefaultPhysicalProtoConverter; | ||
| #[derive(Default)] | ||
| pub struct DefaultPhysicalProtoConverter { | ||
| scalar_subquery_results: RefCell<Option<ScalarSubqueryResults>>, |
There was a problem hiding this comment.
I don't know the serialization/deserialization code well; would love feedback on whether this is the right way to do this.
There was a problem hiding this comment.
This feels like a bit of an anti-pattern. I'm going to need a bit of time to dive into what's going on here, but hopefully will get to it either this afternoon or maybe Sunday evening.
There was a problem hiding this comment.
I put up this PR targeting you branch as an explanation of what I mean.
The problem I have with adding state data to DefaultPhysicalProtoConverter is that now any time we have a custom proto converter that doesn't call the default, we will not be able to process these scalar subquery results.
Instead I think we just have to plumb this data member through the deserialization process. I haven't taken a super deep look into exactly how this ends up getting used to see if there's another way to take advantage. The method I used in the PR was basically to add a struct that contains all of the parts we pass through deserialization and add the scalar_subquery_results to it.
In regards to switching from FunctionRegistry -> TaskContext that's a great change. It was done part way in recent releases for the physical side but not on the logical side. It makes perfect sense to do it the way you have on the logical side.
| // Create the shared results container and register it (along with | ||
| // the index map) in ExecutionProps so that `create_physical_expr` | ||
| // can resolve `Expr::ScalarSubquery` into `ScalarSubqueryExpr` | ||
| // nodes. We clone the SessionState so these are available | ||
| // throughout physical planning without mutating the caller's state. | ||
| // | ||
| // Ideally, the subquery state would live in a dedicated planning | ||
| // context rather than on ExecutionProps (which is meant for | ||
| // session-level configuration). It's here because | ||
| // `create_physical_expr` only receives `&ExecutionProps`, and | ||
| // changing that signature would be a breaking public API change. | ||
| let results: Arc<Vec<OnceLock<ScalarValue>>> = | ||
| Arc::new((0..links.len()).map(|_| OnceLock::new()).collect()); | ||
| let session_state = if links.is_empty() { | ||
| Cow::Borrowed(session_state) | ||
| } else { | ||
| let mut owned = session_state.clone(); | ||
| owned.execution_props_mut().subquery_indexes = index_map; | ||
| owned.execution_props_mut().subquery_results = Arc::clone(&results); | ||
| Cow::Owned(owned) | ||
| }; |
There was a problem hiding this comment.
This seemed a bit kludgy but I couldn't think of a better way to do it; feedback/suggestions welcome.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
array_has#18181.Rationale for this change
Previously, DataFusion evaluated uncorrelated scalar subqueries by transforming them into joins. This has three shortcomings:
array_has#18181). It also makes filter pushdown for scalar subquery filters more difficult (Scalar subquery filters not pushed down to TableScan #21324)ORDER BYorJOIN ON, or as arguments to an aggregate function. Those cases are now supported.This PR introduces physical execution of uncorrelated scalar subqueries:
ScalarSubqueryExecplan node to the top of any physical plan with uncorrelated subqueries: it has N+1 children, N subqueries and its "main" input, which is the rest of the query plan. The subquery expression in the parent plan is replaced with aScalarSubqueryExpr.ScalarSubqueryExecmanages the execution of the subqueries. Subquery evaluation is done in parallel (for a given query level), but at present it happens strictly before evaluation of the parent query. This might be improved in the future (Consider overlapping scalar subquery and parent query computation #21591).ScalarSubqueryExprreads its value from a shared slot thatScalarSubqueryExecpopulates when the subquery finishes; the physical planner assigns each subquery its slot index viaExecutionProps.This architecture makes it easy to avoid the two shortcomings described above. Performance seems roughly unchanged (benchmarks added in this PR), but in situations like #18181, we can now leverage scalar fast-paths; in the case of #18181 specifically, this improves performance from ~800 ms to ~30 ms.
What changes are included in this PR?
ScalarSubqueryExprPhysicalProtoConverterExtensionto wire upScalarSubqueryExprcorrectlyAre these changes tested?
Yes. New SLT coverage for cardinality errors, ORDER BY / JOIN ON / aggregate-arg contexts, nested uncorrelated subqueries, duplicate-subquery deduplication, and partition-pruning filters; new roundtrip tests for logical and physical plan serialization.
Are there any user-facing changes?
SQL:
ORDER BY,JOIN ON, and as aggregate function arguments.Rust APIs:
datafusion-proto, breaking changes toSerializeable::from_bytes_with_registry(renamed tofrom_bytes_with_ctx),parse_expr/parse_sorts/parse_exprs, and thePhysicalProtoConverterExtensiontrait.Plan shape:
LogicalPlan::Subquerynodes will now be preserved in the logical planScalarSubqueryExecplan node andScalarSubqueryExprexpressionsThe wire format has also changed to include scalar subqueries.