Using the Apache Spark Runner

The Apache Spark Runner can be used to execute Beam pipelines using Apache Spark. The Spark Runner can execute Spark pipelines just like a native Spark application; deploying a self-contained application for local mode, running on Spark’s Standalone RM, or using YARN or Mesos.

The Spark Runner executes Beam pipelines on top of Apache Spark, providing:

The Beam Capability Matrix documents the currently supported capabilities of the Spark Runner.

Three flavors of the Spark runner

The Spark runner comes in three flavors:

  1. A legacy Runner which supports only Java (and other JVM-based languages) and that is based on Spark RDD/DStream
  2. An Structured Streaming Spark Runner which supports only Java (and other JVM-based languages) and that is based on Spark Datasets and the Apache Spark Structured Streaming framework.

Note: It is still experimental, its coverage of the Beam model is partial. As for now it only supports batch mode.

  1. A portable Runner which supports Java, Python, and Go

This guide is split into two parts to document the non-portable and the portable functionality of the Spark Runner. Please use the switcher below to select the appropriate Runner:

Which runner to use: portable or non portable runner?

Beam and its Runners originally only supported JVM-based languages (e.g. Java/Scala/Kotlin). Python and Go SDKs were added later on. The architecture of the Runners had to be changed significantly to support executing pipelines written in other languages.

If your applications only use Java, then you should currently go with one of the java based runners. If you want to run Python or Go pipelines with Beam on Spark, you need to use the portable Runner. For more information on portability, please visit the Portability page.

Spark Runner prerequisites and setup

The Spark runner currently supports Spark’s 3.2.x branch.

Note: Support for Spark 2.4.x was dropped with Beam 2.46.0.

You can add a dependency on the latest version of the Spark runner by adding to your pom.xml the following:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-spark-3</artifactId>
  <version>2.60.0</version>
</dependency>

Deploying Spark with your application

In some cases, such as running in local mode/Standalone, your (self-contained) application would be required to pack Spark by explicitly adding the following dependencies in your pom.xml:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

And shading the application jar using the maven shade plugin:

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <configuration>
    <createDependencyReducedPom>false</createDependencyReducedPom>
    <filters>
      <filter>
        <artifact>*:*</artifact>
        <excludes>
          <exclude>META-INF/*.SF</exclude>
          <exclude>META-INF/*.DSA</exclude>
          <exclude>META-INF/*.RSA</exclude>
        </excludes>
      </filter>
    </filters>
  </configuration>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <shadedArtifactAttached>true</shadedArtifactAttached>
        <shadedClassifierName>shaded</shadedClassifierName>
        <transformers>
          <transformer
            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

After running mvn package, run ls target and you should see (assuming your artifactId is beam-examples and the version is 1.0.0):

beam-examples-1.0.0-shaded.jar

To run against a Standalone cluster simply run:


For RDD/DStream based runner:

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner


For Structured Streaming based runner:

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkStructuredStreamingRunner

You will need Docker to be installed in your execution environment. To develop Apache Beam with Python you have to install the Apache Beam Python SDK: pip install apache_beam. Please refer to the Python documentation on how to create a Python pipeline.

pip install apache_beam

Starting from Beam 2.20.0, pre-built Spark Job Service Docker images are available at Docker Hub.

For older Beam versions, you will need a copy of Apache Beam’s source code. You can download it on the Downloads page.

  1. Start the JobService endpoint:
    • with Docker (preferred): docker run --net=host apache/beam_spark_job_server:latest
    • or from Beam source code: ./gradlew :runners:spark:3:job-server:runShadow

The JobService is the central instance where you submit your Beam pipeline. The JobService will create a Spark job for the pipeline and execute the job. To execute the job on a Spark cluster, the Beam JobService needs to be provided with the Spark master address.

  1. Submit the Python pipeline to the above endpoint by using the PortableRunner, job_endpoint set to localhost:8099 (this is the default address of the JobService), and environment_type set to LOOPBACK. For example:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
    ...

Running on a pre-deployed Spark cluster

Deploying your Beam pipeline on a cluster that already has a Spark deployment (Spark classes are available in container classpath) does not require any additional dependencies. For more details on the different deployment modes see: Standalone, YARN, or Mesos.

  1. Start a Spark cluster which exposes the master on port 7077 by default.

  1. Start JobService that will connect with the Spark master:
    • with Docker (preferred): docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077
    • or from Beam source code: ./gradlew :runners:spark:3:job-server:runShadow -PsparkMasterUrl=spark://localhost:7077

  1. Submit the pipeline as above. Note however that environment_type=LOOPBACK is only intended for local testing. See here for details.

(Note that, depending on your cluster setup, you may need to change the environment_type option. See here for details.)

Running on Dataproc cluster (YARN backed)

To run Beam jobs written in Python, Go, and other supported languages, you can use the SparkRunner and PortableRunner as described on the Beam’s Spark Runner page (also see Portability Framework Roadmap).

The following example runs a portable Beam job in Python from the Dataproc cluster’s master node with Yarn backed.

Note: This example executes successfully with Dataproc 2.0, Spark 3.1.2 and Beam 2.37.0.

  1. Create a Dataproc cluster with Docker component enabled.
gcloud dataproc clusters create CLUSTER_NAME \
    --optional-components=DOCKER \
    --image-version=DATAPROC_IMAGE_VERSION \
    --region=REGION \
    --enable-component-gateway \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --properties spark:spark.master.rest.enabled=true
  1. Create a Cloud Storage bucket.
gsutil mb BUCKET_NAME
  1. Install the necessary Python libraries for the job in your local environment.
python -m pip install apache-beam[gcp]==BEAM_VERSION
  1. Bundle the word count example pipeline along with all dependencies, artifacts, etc. required to run the pipeline into a jar that can be executed later.
python -m apache_beam.examples.wordcount \
    --runner=SparkRunner \
    --output_executable_path=OUTPUT_JAR_PATH \
    --output=gs://BUCKET_NAME/python-wordcount-out \
    --spark_version=3
  1. Submit spark job to Dataproc cluster’s master node.
gcloud dataproc jobs submit spark \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --class=org.apache.beam.runners.spark.SparkPipelineRunner \
        --jars=OUTPUT_JAR_PATH
  1. Check that the results were written to your bucket.
gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID

Pipeline options for the Spark Runner

When executing your pipeline with the Spark Runner, you should consider the following pipeline options.


For RDD/DStream based runner:

FieldDescriptionDefault Value
runnerThe pipeline runner to use. This option allows you to determine the pipeline runner at runtime.Set to SparkRunner to run using Spark.
sparkMasterThe url of the Spark Master. This is the equivalent of setting SparkConf#setMaster(String) and can either be local[x] to run local with x cores, spark://host:port to connect to a Spark Standalone cluster, mesos://host:port to connect to a Mesos cluster, or yarn to connect to a yarn cluster.local[4]
storageLevelThe StorageLevel to use when caching RDDs in batch pipelines. The Spark Runner automatically caches RDDs that are evaluated repeatedly. This is a batch-only property as streaming pipelines in Beam are stateful, which requires Spark DStream's StorageLevel to be MEMORY_ONLY.MEMORY_ONLY
batchIntervalMillisThe StreamingContext's batchDuration - setting Spark's batch interval.1000
enableSparkMetricSinksEnable reporting metrics to Spark's metrics Sinks.true
cacheDisabledDisable caching of reused PCollections for whole Pipeline. It's useful when it's faster to recompute RDD rather than save.false


For Structured Streaming based runner:

FieldDescriptionDefault Value
runnerThe pipeline runner to use. This option allows you to determine the pipeline runner at runtime.Set to SparkStructuredStreamingRunner to run using Spark Structured Streaming.
sparkMasterThe url of the Spark Master. This is the equivalent of setting SparkConf#setMaster(String) and can either be local[x] to run local with x cores, spark://host:port to connect to a Spark Standalone cluster, mesos://host:port to connect to a Mesos cluster, or yarn to connect to a yarn cluster.local[4]
testModeEnable test mode that gives useful debugging information: catalyst execution plans and Beam DAG printingfalse
enableSparkMetricSinksEnable reporting metrics to Spark's metrics Sinks.true
checkpointDirA checkpoint directory for streaming resilience, ignored in batch. For durability, a reliable filesystem such as HDFS/S3/GS is necessary.local dir in /tmp
filesToStageJar-Files to send to all workers and put on the classpath.all files from the classpath
EnableSparkMetricSinksEnable/disable sending aggregator values to Spark's metric sinkstrue
FieldDescriptionValue
--runnerThe pipeline runner to use. This option allows you to determine the pipeline runner at runtime.Set to PortableRunner to run using Spark.
--job_endpointJob service endpoint to use. Should be in the form hostname:port, e.g. localhost:3000Set to match your job service endpoint (localhost:8099 by default)

Additional notes

Using spark-submit

When submitting a Spark application to cluster, it is common (and recommended) to use the spark-submit script that is provided with the spark installation. The PipelineOptions described above are not to replace spark-submit, but to complement it. Passing any of the above mentioned options could be done as one of the application-arguments, and setting –master takes precedence. For more on how to generally use spark-submit checkout Spark documentation.

Monitoring your job

You can monitor a running Spark job using the Spark Web Interfaces. By default, this is available at port 4040 on the driver node. If you run Spark on your local machine that would be http://localhost:4040. Spark also has a history server to view after the fact.

Metrics are also available via REST API. Spark provides a metrics system that allows reporting Spark metrics to a variety of Sinks. The Spark runner reports user-defined Beam Aggregators using this same metrics system and currently supports GraphiteSink and CSVSink. Providing support for additional Sinks supported by Spark is easy and straight-forward.

Spark metrics are not yet supported on the portable runner.

Streaming Execution


For RDD/DStream based runner:
If your pipeline uses an UnboundedSource the Spark Runner will automatically set streaming mode. Forcing streaming mode is mostly used for testing and is not recommended.

For Structured Streaming based runner:
Streaming mode is not implemented yet in the Spark Structured Streaming runner.

Streaming is not yet supported on the Spark portable runner.

Using a provided SparkContext and StreamingListeners


For RDD/DStream based runner:
If you would like to execute your Spark job with a provided SparkContext, such as when using the spark-jobserver, or use StreamingListeners, you can’t use SparkPipelineOptions (the context or a listener cannot be passed as a command-line argument anyway). Instead, you should use SparkContextOptions which can only be used programmatically and is not a common PipelineOptions implementation.

For Structured Streaming based runner:
Provided SparkSession and StreamingListeners are not supported on the Spark Structured Streaming runner

Provided SparkContext and StreamingListeners are not supported on the Spark portable runner.

Kubernetes

Submit beam job without job server

To submit a beam job directly on spark kubernetes cluster without spinning up an extra job server, you can do:

spark-submit --master MASTER_URL \
  --conf spark.kubernetes.driver.podTemplateFile=driver_pod_template.yaml \
  --conf spark.kubernetes.executor.podTemplateFile=executor_pod_template.yaml \
  --class org.apache.beam.runners.spark.SparkPipelineRunner \
  --conf spark.kubernetes.container.image=apache/spark:v3.3.2 \
  ./wc_job.jar

Similar to run the beam job on Dataproc, you can bundle the job jar like below. The example use the PROCESS type of SDK harness to execute the job by processes.

python -m beam_example_wc \
    --runner=SparkRunner \
    --output_executable_path=./wc_job.jar \
    --environment_type=PROCESS \
    --environment_config='{\"command\": \"/opt/apache/beam/boot\"}' \
    --spark_version=3

And below is an example of kubernetes executor pod template, the initContainer is required to download the beam SDK harness to run the beam pipelines.

spec:
  containers:
    - name: spark-kubernetes-executor
      volumeMounts:
      - name: beam-data
        mountPath: /opt/apache/beam/
  initContainers:
  - name: init-beam
    image: apache/beam_python3.7_sdk
    command:
    - cp
    - /opt/apache/beam/boot
    - /init-container/data/boot
    volumeMounts:
    - name: beam-data
      mountPath: /init-container/data
  volumes:
  - name: beam-data
    emptyDir: {}

Submit beam job with job server

An example of configuring Spark to run Apache beam job with a job server.