Using the Apache Samza Runner

The Apache Samza Runner can be used to execute Beam pipelines using Apache Samza. The Samza Runner executes Beam pipeline in a Samza application and can run locally. The application can further be built into a .tgz file, and deployed to a YARN cluster or Samza standalone cluster with Zookeeper.

The Samza Runner and Samza are suitable for large scale, stateful streaming jobs, and provide:

The Beam Capability Matrix documents the currently supported capabilities of the Samza Runner.

Samza Runner prerequisites and setup

The Samza Runner is built on Samza version greater than 1.0.

Specify your dependency

You can specify your dependency on the Samza Runner by adding the following to your pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-samza</artifactId>
  <version>2.60.0</version>
  <scope>runtime</scope>
</dependency>

<!-- Samza dependencies -->
<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-api</artifactId>
  <version>${samza.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-core_2.11</artifactId>
  <version>${samza.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kafka_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kv_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

<dependency>
  <groupId>org.apache.samza</groupId>
  <artifactId>samza-kv-rocksdb_2.11</artifactId>
  <version>${samza.version}</version>
  <scope>runtime</scope>
</dependency>

Executing a pipeline with Samza Runner

If you run your pipeline locally or deploy it to a standalone cluster with all the jars and resource files, no packaging is required. For example, the following command runs the WordCount example:

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Psamza-runner \
    -Dexec.args="--runner=SamzaRunner \
      --inputFile=/path/to/input \
      --output=/path/to/counts"

To deploy your pipeline to a YARN cluster, here is the instructions of deploying a sample Samza job. First you need to package your application jars and resource files into a .tgz archive file, and make it available to download for Yarn containers. In your config, you need to specify the URI of this TGZ file location:

yarn.package.path=${your_job_tgz_URI}

job.name=${your_job_name}
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.coordinator.system=${job_coordinator_system}
job.default.system=${job_default_system}

For more details on the configuration, see Samza Configuration Reference.

The config file will be passed in by setting the command line arg --configFilePath=/path/to/config.properties. With that, you can run your main class of Beam pipeline in a Yarn Resource Manager, and the Samza Runner will submit a Yarn job under the hood.

Check out our Samza Beam example from Github

Pipeline options for the Samza Runner

When executing your pipeline with the Samza Runner, you can use the following pipeline options.

FieldDescriptionDefault Value
runnerThe pipeline runner to use. This option allows you to determine the pipeline runner at runtime.Set to SamzaRunner to run using Samza.
configFilePathThe config for Samza using a properties file.empty, i.e. use local execution.
configFactoryThe factory to read config file from config file path.PropertiesConfigFactory, reading configs as a property file.
configOverrideThe config override to set programmatically.empty, i.e. use config file or local execution.
jobInstanceThe instance name of the job.1
samzaExecutionEnvironmentSamza application execution environment. See SamzaExecutionEnvironment for more details.LOCAL
watermarkIntervalThe interval to check for watermarks in milliseconds.1000
systemBufferSizeThe maximum number of messages to buffer for a given system.5000
eventTimerBufferSizeThe maximum number of event-time timers to buffer in memory for a PTransform5000
maxSourceParallelismThe maximum parallelism allowed for any data source.1
storeBatchGetSizeThe batch get size limit for the state store.10000
enableMetricsEnable/disable Beam metrics in Samza Runner.true
stateDurableThe config for state to be durable.false
maxBundleSizeThe maximum number of elements in a bundle.1 (by default the auto bundling is disabled)
maxBundleTimeMsThe maximum time to wait before finalising a bundle (in milliseconds)..1000

Monitoring your job

You can monitor your pipeline job using metrics emitted from both Beam and Samza, e.g. Beam source metrics such as elements_read and backlog_elements, and Samza job metrics such as job-healthy and process-envelopes. A complete list of Samza metrics is in Samza Metrics Reference. You can view your job’s metrics via JMX in development, and send the metrics to graphing system such as Graphite. For more details, please see Samza Metrics.

For a running Samza YARN job, you can use YARN web UI to monitor the job status and check logs.