Runner Authoring Guide

This guide walks through how to implement a new runner. It is aimed at someone who has a data processing system and wants to use it to execute a Beam pipeline. The guide starts from the basics, to help you evaluate the work ahead. Then the sections become more and more detailed, to be a resource throughout the development of your runner.

Topics covered:

Implementing the Beam Primitives

Aside from encoding and persisting data - which presumably your engine already does in some way or another - most of what you need to do is implement the Beam primitives. This section provides a detailed look at each primitive, covering what you need to know that might not be obvious and what support code is provided.

The primitives are designed for the benefit of pipeline authors, not runner authors. Each represents a different conceptual mode of operation (external IO, element-wise, grouping, windowing, union) rather than a specific implementation decision. The same primitive may require a very different implementation based on how the user instantiates it. For example, a ParDo that uses state or timers may require key partitioning, a GroupByKey with speculative triggering may require a more costly or complex implementation.

What if you haven’t implemented some of these features?

That’s OK! You don’t have to do it all at once, and there may even be features that don’t make sense for your runner to ever support. We maintain a capability matrix on the Beam site so you can tell users what you support. When you receive a Pipeline, you should traverse it and determine whether or not you can execute each DoFn that you find. If you cannot execute some DoFn in the pipeline (or if there is any other requirement that your runner lacks) you should reject the pipeline. In your native environment, this may look like throwing an UnsupportedOperationException. The Runner API RPCs will make this explicit, for cross-language portability.

Implementing the Impulse primitive

Impulse is a PTransform that takes no inputs and produces exactly one output during the lifetime of the pipeline which should be the empty bytes in the global window with the minimum timestamp. This has the encoded value of 7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00 when encoded with the standard windowed value coder.

Though Impulse is generally not invoked by a user, it is the only root primitive operation, and other root operations (like Reads and Create) are composite operations constructed from an Impulse followed by a series of (possibly Splittable) ParDos.

Implementing the ParDo primitive

The ParDo primitive describes element-wise transformation for a PCollection. ParDo is the most complex primitive, because it is where any per-element processing is described. In addition to very simple operations like standard map or flatMap from functional programming, ParDo also supports multiple outputs, side inputs, initialization, flushing, teardown, and stateful processing.

The UDF that is applied to each element is called a DoFn. The exact APIs for a DoFn can vary per language/SDK but generally follow the same pattern, so we can discuss it with pseudocode. I will also often refer to the Java support code, since I know it and most of our current and future runners are Java-based.

Generally, rather than applying a series of ParDos one at a time over the entire input data set, it is more efficient to fuse several ParDos together in a single executable stage that consists of a whole series (in general, a DAG) of mapping operations. In addition to ParDos, windowing operations, local (pre- or post-GBK) combining operations, and other mapping operations may be fused into these stages as well.

As DoFns may execute code in a different language, or requiring a different environment, than the runner itself, Beam provides the ability to call these in a cross-process way. This is the crux of the Beam Fn API, for which more detail can be found below. It is, however, perfectly acceptable for a runner to invoke this user code in process (for simplicity or efficiency) when the environments are compatible.

Bundles

For correctness, a DoFn should represent an element-wise function, but in most SDKS this is a long-lived object that processes elements in small groups called bundles.

Your runner decides how many elements, and which elements, to include in a bundle, and can even decide dynamically in the middle of processing that the current bundle has “ended”. How a bundle is processed ties in with the rest of a DoFn’s lifecycle.

It will generally improve throughput to make the largest bundles possible, so that initialization and finalization costs are amortized over many elements. But if your data is arriving as a stream, then you will want to terminate a bundle in order to achieve appropriate latency, so bundles may be just a few elements.

A bundle is the unit of commitment in Beam. If an error is encountered while processing a bundle, all the prior outputs of that bundle (including any modifications to state or timers) must be discarded by the runner and the entire bundle retried. Upon successful completion of a bundle, its outputs, together with any state/timer modifications and watermark updates, must be committed atomically.

The DoFn Lifecycle

DoFns in many SDKS have several methods such as setup, start_bundle, finish_bundle, teardown, etc. in addition to the standard, element-wise process calls. Generally proper invocation of this lifecycle should be handled for you when invoking one or more DoFns from the standard bundle processors (either via the FnAPI or directly using a BundleProcessor (java (python)). SDK-independent runners should never have to worry about these details directly.

Side Inputs

Main design document: https://s.apache.org/beam-side-inputs-1-pager

A side input is a global view of a window of a PCollection. This distinguishes it from the main input, which is processed one element at a time. The SDK/user prepares a PCollection adequately, the runner materializes it, and then the runner feeds it to the DoFn.

Unlike main input data, which is pushed by the runner to the ParDo (generally via the FnApi Data channel), side input data is pulled by the ParDo from the runner (generally over the FnAPI State channel).

A side input is accessed via a specific access_pattern. There are currently two access patterns enumerated in the StandardSideInputTypes proto: beam:side_input:iterable:v1 which indicates the runner must return all values in a PCollection corresponding to a specific window and beam:side_input:multimap:v1 which indicates the runner must return all values corresponding to a specific key and window. Being able to serve these access patterns efficiently may influence how a runner materializes this PCollection.

SideInputs can be detected by looking at the side_inputs map in the ParDoPayload of ParDo transforms. The ParDo operation itself is responsible for invoking the window_mapping_fn (before invoking the runner) and view_fn (on the runner-returned values), so the runner need not concern itself with these fields.

When a side input is needed but the side input has no data associated with it for a given window, elements in that window must be deferred until the side input has some data or the watermark has advances sufficiently such that we can be sure there will be no data for that window. The PushBackSideInputDoFnRunner is an example of implementing this.

State and Timers

Main design document: https://s.apache.org/beam-state

When a ParDo includes state and timers, its execution on your runner is usually very different. In particular, the state must be persisted when the bundle completes and retrieved for future bundles. Timers that are set must also be injected into future bundles as the watermark advances sufficiently.

State and timers are partitioned per key and window, that is, a DoFn processing a given key must have a consistent view of the state and timers across all elements that share this key. You may need or want to explicitly shuffle data to support this. Once the watermark has passed the end of the window (plus an allowance for allowed lateness, if any), state associated with this window can be dropped.

State setting and retrieval is performed on the FnAPI State channel, whereas timer setting and firing happens on the FnAPI Data channel.

Splittable DoFn

Main design document: https://s.apache.org/splittable-do-fn

Splittable DoFn is a generalization of ParDo that is useful for high-fanout mappings that can be done in parallel. The prototypical example of such an operation is reading from a file, where a single file name (as an input element) can be mapped to all the elements contained in that file. The DoFn is considered splittable in the sense that an element representing, say, a single file can be split (e.g. into ranges of that file) to be processed (e.g. read) by different workers. The full power of this primitive is in the fact that these splits can happen dynamically rather than just statically (i.e. ahead of time) avoiding the problem of over- or undersplitting.

A full explanation of Splittable DoFn is out of scope for this doc, but here is a brief overview as it pertains to its execution.

A Splittable DoFn can participate in the dynamic splitting protocol by splitting within an element as well as between elements. Dynamic splitting is triggered by the runner issuing ProcessBundleSplitRequest messages on the control channel. The SDK will commit to process just a portion of the indicated element and return a description of the remainder (i.e. the unprocessed portion) to the runner in the ProcessBundleSplitResponse to be scheduled by the runner (e.g. on a different worker or as part of a different bundle).

A Splittable DoFn can also initiate its own spitting, indicating it has processed an element as far as it can for the moment (e.g. when tailing a file) but more remains. These most often occur when reading unbounded sources. In this case a set of elements representing the deferred work are passed back in the residual_roots field of the ProcessBundleResponse. At a future time, the runner must re-invoke these same operations with the elements given in residual_roots.

Implementing the GroupByKey (and window) primitive

The GroupByKey operation (sometimes called GBK for short) groups a PCollection of key-value pairs by key and window, emitting results according to the PCollection’s triggering configuration.

It is quite a bit more elaborate than simply colocating elements with the same key, and uses many fields from the PCollection’s windowing strategy.

Group By Encoded Bytes

For both the key and window, your runner sees them as “just bytes”. So you need to group in a way that is consistent with grouping by those bytes, even if you have some special knowledge of the types involved.

The elements you are processing will be key-value pairs, and you’ll need to extract the keys. For this reason, the format of key-value pairs is standardized and shared across all SDKS. See either KvCoder in Java or TupleCoder in Python for documentation on the binary format.

Window Merging

As well as grouping by key, your runner must group elements by their window. A WindowFn has the option of declaring that it merges windows on a per-key basis. For example, session windows for the same key will be merged if they overlap. So your runner must invoke the merge method of the WindowFn during grouping.

Implementing via GroupByKeyOnly + GroupAlsoByWindow

The Java and Python codebases includes support code for a particularly common way of implementing the full GroupByKey operation: first group the keys, and then group by window. For merging windows, this is essentially required, since merging is per key.

Often presenting the set of values in timestamp order can allow more efficient grouping of these values into their final windows.

Dropping late data

Main design document: https://s.apache.org/beam-lateness

A window is expired in a PCollection if the watermark of the input PCollection has exceeded the end of the window by at least the input PCollection’s allowed lateness.

Data for an expired window can be dropped any time and should be dropped at a GroupByKey. If you are using GroupAlsoByWindow, then just before executing this transform. You may shuffle less data if you drop data prior to GroupByKeyOnly, but should only safely be done for non-merging windows, as a window that appears expired may merge to become not expired.

Triggering

Main design document: https://s.apache.org/beam-triggers

The input PCollection’s trigger and accumulation mode specify when and how outputs should be emitted from the GroupByKey operation.

In Java, there is a lot of support code for executing triggers in the GroupAlsoByWindow implementations, ReduceFnRunner (legacy name), and TriggerStateMachine, which is an obvious way of implementing all triggers as an event-driven machine over elements and timers. In Python this is supported by the TriggerDriver classes.

TimestampCombiner

When an aggregated output is produced from multiple inputs, the GroupByKey operation has to choose a timestamp for the combination. To do so, first the WindowFn has a chance to shift timestamps - this is needed to ensure watermarks do not prevent progress of windows like sliding windows (the details are beyond this doc). Then, the shifted timestamps need to be combined - this is specified by a TimestampCombiner, which can either select the minimum or maximum of its inputs, or just ignore inputs and choose the end of the window.

Implementing the Window primitive

The window primitive applies a WindowFn UDF to place each input element into one or more windows of its output PCollection. Note that the primitive also generally configures other aspects of the windowing strategy for a PCollection, but the fully constructed graph that your runner receives will already have a complete windowing strategy for each PCollection.

To implement this primitive, you need to invoke the provided WindowFn on each element, which will return some set of windows for that element to be a part of in the output PCollection.

Most runners implement this by fusing these window-altering mappings in with the DoFns.

Implementation considerations

A “window” is just a second grouping key that has a “maximum timestamp”. It can be any arbitrary user-defined type. The WindowFn provides the coder for the window type.

Beam’s support code provides WindowedValue which is a compressed representation of an element in multiple windows. You may want to do use this, or your own compressed representation. Remember that it simply represents multiple elements at the same time; there is no such thing as an element “in multiple windows”.

For values in the global window, you may want to use an even further compressed representation that doesn’t bother including the window at all.

We provide coders with these optimizations such as PARAM_WINDOWED_VALUE that can be used to reduce the size of serialized data.

In the future, this primitive may be retired as it can be implemented as a ParDo if the capabilities of ParDo are enhanced to allow output to new windows.

Implementing the Flatten primitive

This one is easy - take as input a finite set of PCollections and outputs their bag union, keeping windows intact.

For this operation to make sense, it is the SDK’s responsibility to make sure the windowing strategies are compatible.

Also note that there is no requirement that the coders for all the PCollections be the same. If your runner wants to require that (to avoid tedious re-encoding) you have to enforce it yourself. Or you could just implement the fast path as an optimization.

Special mention: the Combine composite

A composite transform that is almost always treated specially by a runner is CombinePerKey, which applies an associative and commutative operator to the elements of a PCollection. This composite is not a primitive. It is implemented in terms of ParDo and GroupByKey, so your runner will work without treating it - but it does carry additional information that you probably want to use for optimizations: the associative-commutative operator, known as a CombineFn.

Generally runners will want to implement this via what is called combiner lifting, where a new operation is placed before the GroupByKey that does partial (within-bundle) combining, which often requires a slight modification of what comes after the GroupByKey as well. An example of this transformation can be found in the Python or go implementations of this optimization. The resulting pre- and post-GroupByKey operations are generally fused in with the ParDos and executed as above.

Working with pipelines

When you receive a pipeline from a user, you will need to translate it. An explanation of how Beam pipelines are represented can be found here which compliment the official proto declarations.

Testing your runner

The Beam Java SDK and Python SDK have suites of runner validation tests. The configuration may evolve faster than this document, so check the configuration of other Beam runners. But be aware that we have tests and you can use them very easily! To enable these tests in a Java-based runner using Gradle, you scan the dependencies of the SDK for tests with the JUnit category ValidatesRunner.

task validatesRunner(type: Test) {
  group = "Verification"
  description = "Validates the runner"
  def pipelineOptions = JsonOutput.toJson(["--runner=MyRunner", ... misc test options ...])
  systemProperty "beamTestPipelineOptions", pipelineOptions
  classpath = configurations.validatesRunner
  testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
  useJUnit {
    includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
  }
}

Enabling these tests in other languages is unexplored.

Integrating your runner nicely with SDKs

Whether or not your runner is based in the same language as an SDK (such as Java), you will want to provide a shim to invoke it from another SDK if you want the users of that SDK (such as Python) to use it.

Integrating with the Java SDK

Allowing users to pass options to your runner

The mechanism for configuration is PipelineOptions, an interface that works completely differently than normal Java objects. Forget what you know, and follow the rules, and PipelineOptions will treat you well.

You must implement a sub-interface for your runner with getters and setters with matching names, like so:

public interface MyRunnerOptions extends PipelineOptions {
  @Description("The Foo to use with MyRunner")
  @Required
  public Foo getMyRequiredFoo();
  public void setMyRequiredFoo(Foo newValue);

  @Description("Enable Baz; on by default")
  @Default.Boolean(true)
  public Boolean isBazEnabled();
  public void setBazEnabled(Boolean newValue);
}

You can set up defaults, etc. See the javadoc for details. When your runner is instantiated with a PipelineOptions object, you access your interface by options.as(MyRunnerOptions.class).

To make these options available on the command line, you register your options with a PipelineOptionsRegistrar. It is easy if you use @AutoService:

@AutoService(PipelineOptionsRegistrar.class)
public static class MyOptionsRegistrar implements PipelineOptionsRegistrar {
  @Override
  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
    return ImmutableList.<Class<? extends PipelineOptions>>of(MyRunnerOptions.class);
  }
}

Registering your runner with SDKs for command line use

To make your runner available on the command line, you register your options with a PipelineRunnerRegistrar. It is easy if you use @AutoService:

@AutoService(PipelineRunnerRegistrar.class)
public static class MyRunnerRegistrar implements PipelineRunnerRegistrar {
  @Override
  public Iterable<Class<? extends PipelineRunner>> getPipelineRunners() {
    return ImmutableList.<Class<? extends PipelineRunner>>of(MyRunner.class);
  }
}

Integrating with the Python SDK

In the Python SDK the registration of the code is not automatic. So there are few things to keep in mind when creating a new runner.

Any dependencies on packages for the new runner should be options so create a new target in extra_requires in setup.py that is needed for the new runner.

All runner code should go in it’s own package in apache_beam/runners directory.

Register the new runner in the create_runner function of runner.py so that the partial name is matched with the correct class to be used.

Python Runners can also be identified (e.g. when passing the runner parameter) by their fully qualified name whether or not they live in the Beam repository.

Writing an SDK-independent runner

There are two aspects to making your runner SDK-independent, able to run pipelines written in other languages: The Fn API and the Runner API.

The Fn API

Design documents:

To run a user’s pipeline, you need to be able to invoke their UDFs. The Fn API is an RPC interface for the standard UDFs of Beam, implemented using protocol buffers over gRPC.

The Fn API includes:

You are fully welcome to also use the SDK for your language for utility code, or provide optimized implementations of bundle processing for same-language UDFs.

The Runner API

The Runner API is an SDK-independent schema for a pipeline along with RPC interfaces for launching a pipeline and checking the status of a job. By examining a pipeline only through Runner API interfaces, you remove your runner’s dependence on the SDK for its language for pipeline analysis and job translation.

To execute such an SDK-independent pipeline, you will need to support the Fn API. UDFs are embedded in the pipeline as a specification of the function (often just opaque serialized bytes for a particular language) plus a specification of an environment that can execute it (essentially a particular SDK). So far, this specification is expected to be a URI for a Docker container hosting the SDK’s Fn API harness.

You are fully welcome to also use the SDK for your language, which may offer useful utility code.

The language-independent definition of a pipeline is described via a protocol buffers schema, covered below for reference. But your runner need not directly manipulate protobuf messages. Instead, the Beam codebase provides utilities for working with pipelines so that you don’t need to be aware of whether or not the pipeline has ever been serialized or transmitted, or what language it may have been written in to begin with.

Java

If your runner is Java-based, the tools to interact with pipelines in an SDK-agnostic manner are in the beam-runners-core-construction-java artifact, in the org.apache.beam.sdk.util.construction namespace. The utilities are named consistently, like so:

By inspecting transforms only through these classes, your runner will not depend on the particulars of the Java SDK.

The Runner API protos

The Runner API refers to a specific manifestation of the concepts in the Beam model, as a protocol buffers schema. Even though you should not manipulate these messages directly, it can be helpful to know the canonical data that makes up a pipeline.

Most of the API is exactly the same as the high-level description; you can get started implementing a runner without understanding all the low-level details.

The most important takeaway of the Runner API for you is that it is a language-independent definition of a Beam pipeline. You will probably always interact via a particular SDK’s support code wrapping these definitions with sensible idiomatic APIs, but always be aware that this is the specification and any other data is not necessarily inherent to the pipeline, but may be SDK-specific enrichments (or bugs!).

The UDFs in the pipeline may be written for any Beam SDK, or even multiple in the same pipeline. So this is where we will start, taking a bottom-up approach to understanding the protocol buffers definitions for UDFs before going back to the higher-level, mostly obvious, record definitions.

FunctionSpec proto

The heart of cross-language portability is the FunctionSpec. This is a language-independent specification of a function, in the usual programming sense that includes side effects, etc.

message FunctionSpec {
  string urn;
  bytes payload;
}

A FunctionSpec includes a URN identifying the function as well as an arbitrary fixed parameter. For example the (hypothetical) “max” CombineFn might have the URN beam:combinefn:max:0.1 and a parameter that indicates by what comparison to take the max.

For most UDFs in a pipeline constructed using a particular language’s SDK, the URN will indicate that the SDK must interpret it, for example beam:dofn:javasdk:0.1 or beam:dofn:pythonsdk:0.1. The parameter will contain serialized code, such as a Java-serialized DoFn or a Python pickled DoFn.

A FunctionSpec is not only for UDFs. It is just a generic way to name/specify any function. It is also used as the specification for a PTransform. But when used in a PTransform it describes a function from PCollection to PCollection and cannot be specific to an SDK because the runner is in charge of evaluating transforms and producing PCollections.

It goes without saying that not every environment will be able to deserialize every function spec. For this reason PTransforms have an environment_id parameter that indicates at least one environment that is capable of interpreting the contained URNs. This is a reference to an environment in the environments map of the Pipeline proto and is typically defined by a docker image (possibly with some extra dependencies). There may be other environments that are also capable of doing so, and a runner is free to use them if it has this knowledge.

Primitive transform payload protos

The payload for the primitive transforms are just proto serializations of their specifications. Rather than reproduce their full code here, I will just highlight the important pieces to show how they fit together.

It is worth emphasizing again that while you probably will not interact directly with these payloads, they are the only data that is inherently part of the transform.

ParDoPayload proto

A ParDo transform carries its DoFn in an SdkFunctionSpec and then provides language-independent specifications for its other features - side inputs, state declarations, timer declarations, etc.

message ParDoPayload {
  FunctionSpec do_fn;
  map<string, SideInput> side_inputs;
  map<string, StateSpec> state_specs;
  map<string, TimerSpec> timer_specs;
  ...
}

CombinePayload proto

Combine is not a primitive. But non-primitives are perfectly able to carry additional information for better optimization. The most important thing that a Combine transform carries is the CombineFn in an SdkFunctionSpec record. In order to effectively carry out the optimizations desired, it is also necessary to know the coder for intermediate accumulations, so it also carries a reference to this coder.

message CombinePayload {
  FunctionSpec combine_fn;
  string accumulator_coder_id;
  ...
}

PTransform proto

A PTransform is a function from PCollection to PCollection. This is represented in the proto using a FunctionSpec.

message PTransform {
  FunctionSpec spec;
  repeated string subtransforms;

  // Maps from local string names to PCollection ids
  map<string, bytes> inputs;
  map<string, bytes> outputs;
  ...
}

A PTransform may have subtransforms if it is a composite, in which case the FunctionSpec may be omitted since the subtransforms define its behavior.

The input and output PCollections are unordered and referred to by a local name. The SDK decides what this name is, since it will likely be embedded in serialized UDFs.

A runner that understands the specification of a given PTransform (whether primitive or composite), as defined by its FunctionSpec, is free to substitute it with another PTransform (or set thereof) that has identical semantics. This is typically how CombinePerKey is handled, but many other substitutions can be done as well.

PCollection proto

A PCollection just stores a coder, windowing strategy, and whether or not it is bounded.

message PCollection {
  string coder_id;
  IsBounded is_bounded;
  string windowing_strategy_id;
  ...
}

Coder proto

This is a very interesting proto. A coder is a parameterized function that may only be understood by a particular SDK, hence an FunctionSpec, but also may have component coders that fully define it. For example, a ListCoder is only a meta-format, while ListCoder(VarIntCoder) is a fully specified format.

message Coder {
  FunctionSpec spec;
  repeated string component_coder_ids;
}

There are a large number of standard coders understood by most, if not all, SDKs. Using these allows for cross-language transforms.

The Jobs API RPCs

Overview Spec

While your language’s SDK will may insulate you from touching the Runner API protos directly, you may need to implement adapters for your runner, to expose it to another language. This allows a Python SDK to invoke a Java runner or vice versa. A typical implementation of this can be found in local_job_service.py which is used directly to front several Python-implemented runners.

The RPCs themselves will necessarily follow the existing APIs of PipelineRunner and PipelineResult, but altered to be the minimal backend channel, versus a rich and convenient API.

A key piece of this is the Artifacts API, which allows a Runner to fetch and deploy binary artifacts (such as jars, pypi packages, etc.) that are listed as dependencies in the various environments, and may have various representations. This is invoked after a pipeline is submitted, but before it is executed. The SDK submitting a pipeline acts as an artifact server to the runner receiving the request, and in turn the runner then acts as an artifact server to the workers (environments) hosting the users UDFs.