Skip to content

[Runner] Add Spark 4 runner#38212

Open
tkaymak wants to merge 10 commits intoapache:masterfrom
tkaymak:spark4-runner
Open

[Runner] Add Spark 4 runner#38212
tkaymak wants to merge 10 commits intoapache:masterfrom
tkaymak:spark4-runner

Conversation

@tkaymak
Copy link
Copy Markdown
Contributor

@tkaymak tkaymak commented Apr 16, 2026

Addresses #36841

This PR is split into smaller commits for easier review.

  1. Prepare the build system as we need to deal with Scala 2.13 and JDK 17+ is required
  2. Add Spark 4 runner
  3. Add Spark 4 job server

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Add spark4_version (4.0.2) to BeamModulePlugin alongside the existing
spark3_version. Update spark_runner.gradle to conditionally select the
correct Scala library (2.13 vs 2.12), Jackson module, Kafka test
dependency, and require Java 17 when building against Spark 4.

Register the new :runners:spark:4 module in settings.gradle.kts.

These changes are purely additive — all conditionals gate on
spark_version.startsWith("4") or spark_scala_version == '2.13', leaving
the Spark 3 build path untouched.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request lays the groundwork for supporting Spark 4 as an execution engine for Apache Beam pipelines. It introduces a new runner built on Spark's Structured Streaming framework, initially focusing on batch processing capabilities. The changes encompass comprehensive updates to the build configuration, core pipeline component translations, and a robust metrics integration to ensure compatibility and observability within the Spark 4 ecosystem.

Highlights

  • Spark 4 Runner Introduction: Introduced an experimental Spark 4 runner for Apache Beam, leveraging Spark's Structured Streaming API for pipeline execution.
  • Build System Updates: Updated the Gradle build system to support Spark 4.0.2 with Scala 2.13, including conditional Java 17 requirement and Kafka version adjustments for Scala 2.13 compatibility.
  • Core Pipeline Component Translation: Implemented translators for fundamental Beam transforms such as Impulse, ParDo, Combine (Globally, PerKey, GroupedValues), GroupByKey, Flatten, Window.Assign, and ReadSource, focusing on batch mode execution.
  • Enhanced Metrics Integration: Integrated Beam metrics with Spark's metrics system through new MetricsAccumulator, SparkBeamMetricSource, and custom sinks (CSV, Graphite), allowing Beam metrics to be exposed via Spark's monitoring interfaces.
  • Advanced Data Handling: Developed sophisticated data handling mechanisms including a custom Spark Session factory with Kryo serialization for Beam types, optimized Spark Encoder helpers for various Beam data structures (WindowedValue, KV, Collections, Maps), and a caching SideInputReader.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Ignored Files
  • Ignored by pattern: .github/workflows/** (2)
    • .github/workflows/beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.yml
    • .github/workflows/beam_PreCommit_Java_Spark4_Versions.yml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

apply plugin: 'org.apache.beam.module'
applyJavaNature(
enableStrictDependencies: true,
requireJavaVersion: (spark_version.startsWith("4") ? org.gradle.api.JavaVersion.VERSION_17 : null),
Copy link
Copy Markdown
Contributor Author

@tkaymak tkaymak Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark 4 requires Java Version 17

Comment thread runners/spark/4/build.gradle Outdated
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 16, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for Spark 4.0.2 to the Apache Beam Spark runner, which includes a new structured streaming module, dependency updates for Java 17 and Scala 2.13, and various cross-version compatibility fixes. The code review identifies a critical bug where an incorrect function cast in the stateful runner would lead to a ClassCastException at runtime. Additionally, the feedback points out that the pipeline cancellation logic is incomplete because it does not interrupt the underlying execution future, and it identifies redundant code in the BoundedDatasetFactory that should be removed for clarity.

@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 16, 2026

If the file SparkGroupAlsoByWindowViaWindowSet.java is to remain shared, the current implementation is likely the best way to handle the cross-version compatibility issue without over-engineering.

If we prefer a cleaner implementation for Spark 4 at the cost of file duplication (or creating a version adapter), I can move the file and refactor it.

Tobias Kaymak and others added 9 commits April 16, 2026 16:07
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add the Gradle build file for the Spark 4 structured streaming runner.
The module mirrors runners/spark/3/ — it inherits the shared RDD-base
source from runners/spark/src/ via copySourceBase and adds its own
Structured Streaming implementation in src/main/java.

Key differences from the Spark 3 build:
- Uses spark4_version (4.0.2) with Scala 2.13.
- Excludes DStream-based streaming tests (Spark 4 supports only
  structured streaming batch).
- Unconditionally adds --add-opens JVM flags required by Kryo on
  Java 17 (Spark 4's minimum).
- Binds Spark driver to 127.0.0.1 for macOS compatibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add the Spark 4 structured streaming runner implementation and tests.
Most files are adapted from the Spark 3 structured streaming runner
with targeted changes for Spark 4 / Scala 2.13 API compatibility.

Key Spark 4-specific changes (diff against runners/spark/3/src/):

EncoderFactory — Replaced the direct ExpressionEncoder constructor
  (removed in Spark 4) with BeamAgnosticEncoder, a named class
  implementing both AgnosticExpressionPathEncoder (for expression
  delegation via toCatalyst/fromCatalyst) and AgnosticEncoders
  .StructEncoder (so Dataset.select(TypedColumn) creates an N-attribute
  plan, preventing FIELD_NUMBER_MISMATCH). The toCatalyst/fromCatalyst
  methods substitute the provided input expression via transformUp,
  enabling correct nesting inside composite encoders like
  Encoders.tuple().

EncoderHelpers — Added toExpressionEncoder() helper to handle Spark 4
  built-in encoders that are AgnosticEncoder subclasses rather than
  ExpressionEncoder.

GroupByKeyTranslatorBatch — Migrated from internal catalyst Expression
  API (CreateNamedStruct, Literal$) to public Column API (struct(),
  lit(), array()), as required by Spark 4.

BoundedDatasetFactory — Use classic.Dataset$.MODULE$.ofRows() as
  Dataset moved to org.apache.spark.sql.classic in Spark 4.

ScalaInterop — Replace WrappedArray.ofRef (removed in Scala 2.13)
  with JavaConverters.asScalaBuffer().toList() in seqOf().

GroupByKeyHelpers, CombinePerKeyTranslatorBatch — Replace
  TraversableOnce with IterableOnce (Scala 2.13 rename).

SparkStructuredStreamingPipelineResult — Replace sparkproject.guava
  with Beam's vendored Guava.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add GitHub Actions workflows for the Spark 4 runner module:

- beam_PreCommit_Java_Spark4_Versions: runs sparkVersionsTest on
  changes to runners/spark/**.  Currently a no-op (the sparkVersions
  map is empty) but scaffolds future patch version coverage.
- beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming: runs
  the structured streaming test suite on Java 17.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Remove endOfData() call in close method.
Add job-server and container build configurations for Spark 4,
mirroring the existing Spark 3 job-server setup. The container
uses eclipse-temurin:17 (Spark 4 requires Java 17). The shared
spark_job_server.gradle gains a requireJavaVersion conditional
for Spark 4 parent projects.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The hostname binding hack is no longer needed now that the local
machine resolves its hostname to 127.0.0.1 via /etc/hosts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@tkaymak tkaymak changed the title [WIP] Add Spark 4 runner [Runner] Add Spark 4 runner Apr 16, 2026
@tkaymak
Copy link
Copy Markdown
Contributor Author

tkaymak commented Apr 16, 2026

assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @Abacn for label build.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant