Allow Python user type pass through Beam SQL#38206
Allow Python user type pass through Beam SQL#38206Abacn wants to merge 1 commit intoapache:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances Apache Beam's capabilities for handling Python user types within Beam SQL. It refines the definition of the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
335b7d9 to
f670f41
Compare
* Complete pythonsdk_any logical type representation def. Otherwise Java side SchemaTranslation for this logical type would fail * Handle PassthroughLogicalType in Beam SQL. Allow Beam SQL treat PassthroughLogicalType as its base type * Fix nested bytes in Beam SQL * Introduce a schema option for compact encoding for static non-null schema
f670f41 to
a8fa6db
Compare
|
R: @ahmedabu98 would you mind taking a look? Since you've been working on Beam SQL |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
ahmedabu98
left a comment
There was a problem hiding this comment.
A lot of this was new to me but was mostly able to follow along. Left some comments
| Schema.LogicalType<org.apache.beam.sdk.values.Row, org.apache.beam.sdk.values.Row> logicalType = | ||
| new org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType< |
There was a problem hiding this comment.
readability nit: import these instead of using fully qualified names?
|
|
||
| def estimate_size(self, unused_value, nested=False): | ||
| # type: (Any, bool) -> int | ||
| # A short is encoded as 2 bytes, regardless of nesting. |
| def recover_to_python_type(input): | ||
| fields = [] | ||
| for field in input: | ||
| print(field) |
| UserTypeRow(1, Aribitrary("abc"), -1j), | ||
| ]) | ||
| | SqlTransform("SELECT arb, complex FROM PCOLLECTION") | ||
| # TODO: recover to user type. Currently pipeline can run, |
There was a problem hiding this comment.
Should we create a github issue for this TODO ?
| name="type_byte", | ||
| type=schema_pb2.FieldType( | ||
| atomic_type=schema_pb2.BYTE, nullable=False)), | ||
| schema_pb2.Field( | ||
| name="payload", |
There was a problem hiding this comment.
Can we create static top-level variables for "type_byte" and "payload" too?
There was a problem hiding this comment.
And maybe a comment on what they represent (IIUC it's for FastPrimitivesCoder?)
| def recover_to_python_type(input): | ||
| fields = [] | ||
| for field in input: | ||
| print(field) | ||
| if hasattr(field, 'type_byte') and hasattr(field, 'payload'): | ||
| obj = coders.FastPrimitivesCoder().decode( | ||
| field.type_byte.to_bytes() + field.payload) | ||
| fields.append(obj) | ||
| else: | ||
| fields.append(field) | ||
| return tuple(fields) |
There was a problem hiding this comment.
Should we make this utility public?
Currently Python SDK's Row can have arbitrary type and work within Python SDK, but not language boundary. There are many missing connections to make it work from Python->Java/Beam SQL->Python end-to-end.
This change completes most missing point. Beam Row with Python user type (backed by FastPrimitiveCoder and with pythonsdk_any logical type urn) can now be recognized decoded by Java.
Complete pythonsdk_any logical type representation def. Otherwise Java side SchemaTranslation for this logical type would fail
Handle PassthroughLogicalType in Beam SQL. Allow Beam SQL treat PassthroughLogicalType as its base type
Fix nested bytes in Beam SQL
Introduce a schema option for compact encoding for static non-null schema
Support BYTE atomic type in Python
Please add a meaningful description for your change here
However due to Beam Row->Calcite Row->Beam Row mapping losing the originally logical type schema, the output Row becomes a Row instead of language type (this limitation is related to #24019). At least the pipeline expansion no longer fail for xlang pipeline involving Any typehints
Fix #21024; fix #20738
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.