Python multi-language pipelines quickstart

This page provides a high-level overview of creating multi-language pipelines with the Apache Beam SDK for Python. For a more comprehensive treatment of the topic, see Multi-language pipelines.

The code shown in this quickstart is available in a collection of runnable examples.

To build and run a multi-language Python pipeline, you need a Python environment with the Beam SDK installed. If you don’t have an environment set up, first complete the Apache Beam Python SDK Quickstart.

A multi-language pipeline is a pipeline that’s built in one Beam SDK language and uses one or more transforms from another Beam SDK language. These “other-language” transforms are called cross-language transforms. The idea is to make pipeline components easier to share across the Beam SDKs, and to grow the pool of available transforms for all the SDKs. In the examples below, the multi-language pipeline is built with the Beam Python SDK, and the cross-language transforms are built with the Beam Java SDK.

Create a cross-language transform

Here’s a simple Java transform, JavaPrefix, that adds a prefix to an input string:

public class JavaPrefix extends PTransform<PCollection<String>, PCollection<String>> {

  final String prefix;

  public JavaPrefix(String prefix) {
    this.prefix = prefix;
  }

  class AddPrefixDoFn extends DoFn<String, String> {

    @ProcessElement
    public void process(@Element String input, OutputReceiver<String> o) {
      o.output(prefix + input);
    }
  }

  @Override
  public PCollection<String> expand(PCollection<String> input) {
    return input
        .apply(
            "AddPrefix",
            ParDo.of(new AddPrefixDoFn()));
  }
}

To make this available as a cross-language transform, you have to add a config object and a builder.

Note: Starting with Beam 2.34.0, Python SDK users can use some Java transforms without writing additional Java code. To learn more, see Creating cross-language Java transforms.

The config object is a simple Java object (POJO) that has fields required by the transform. Here’s an example, JavaPrefixConfiguration:

public class JavaPrefixConfiguration {

  String prefix;

  public void setPrefix(String prefix) {
    this.prefix = prefix;
  }
}

The builder class, implemented below as JavaPrefixBuilder, must implement ExternalTransformBuilder and override buildExternal, which uses the config object.

public class JavaPrefixBuilder implements
    ExternalTransformBuilder<JavaPrefixConfiguration, PCollection<String>, PCollection<String>> {

    @Override
    public PTransform<PCollection<String>, PCollection<String>> buildExternal(
        JavaPrefixConfiguration configuration) {
      return new JavaPrefix(configuration.prefix);
    }
}

You also need to add a registrar class to register your transform with the expansion service.

@AutoService(ExternalTransformRegistrar.class)
public class JavaPrefixRegistrar implements ExternalTransformRegistrar {

  final String URN = "beam:transform:my.beam.test:javaprefix:v1";

  @Override
  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
    return ImmutableMap.of(URN,new JavaPrefixBuilder());
  }
}

As shown here in JavaPrefixRegistrar, the registrar must implement ExternalTransformRegistrar, which has one method, knownBuilderInstances. This returns a map that maps a unique URN to an instance of your builder. You can use the AutoService annotation to register this class with the expansion service.

Choose an expansion service

When building a job for a multi-language pipeline, Beam uses an expansion service to expand composite transforms. You must have at least one expansion service per remote SDK.

In most cases, you can use the default Java ExpansionService. The service takes a single parameter, which specifies the port of the expansion service. The address is then provided by the Python pipeline.

Before running your multi-language pipeline, you need to build the Java cross-language transform and start the expansion service. When you start the expansion service, you need to add dependencies to the classpath. You can use more than one JAR, but it’s often easier to create a single shaded JAR. Both Python and Java dependencies will be staged for the runner by the Python SDK.

The steps for running the expansion service will vary depending on your build tooling. Assuming you’ve built a JAR named java-prefix-bundled-0.1.jar, you can start the service with a command like the following, where 12345 is the port on which the expansion service will run:

java -jar java-prefix-bundled-0.1.jar 12345

For instructions on running an example expansion service, see this README.

Create a Python pipeline

Your Python pipeline can now use the ExternalTransform API to configure your cross-language transform. Here’s an example from addprefix.py:

with beam.Pipeline(options=pipeline_options) as p:
  input = p | 'Read' >> ReadFromText(input_path).with_output_types(str)

  java_output = (
      input
      | 'JavaPrefix' >> beam.ExternalTransform(
            'beam:transform:my.beam.test:javaprefix:v1',
            ImplicitSchemaPayloadBuilder({'prefix': 'java:'}),
            "localhost:12345"))

  def python_prefix(record):
    return 'python:%s' % record

  output = java_output | 'PythonPrefix' >> beam.Map(python_prefix)
  output | 'Write' >> WriteToText(output_path)

ExternalTransform takes three parameters:

The URN is simply a unique Beam identifier for the transform, and the expansion service has already been discussed. The PayloadBuilder is a new concept, discussed next.

NOTE: To ensure that your URN doesn’t run into confilcts with URNs from other transforms, follow the URN conventions described at Selecting a URN for Cross-language Transforms.

Provide a payload builder

The Python pipeline example above provides an ImplicitSchemaPayloadBuilder as the second argument to ExternalTransform. The ImplicitSchemaPayloadBuilder builds a payload that generates a schema from the provided values. In this case, the provided values are contained in the following key-value pair: {'prefix': 'java:'}. The JavaPrefix transform expects a prefix argument, and the payload builder passes in the string java:, which will be prepended to each input element.

Payload builders help build the payload for the transform in the expansion request. Instead of the ImplicitSchemaPayloadBuilder, you could use a NamedTupleBasedPayloadBuilder, which builds a payload based on a named tuple schema, or an AnnotationBasedPayloadBuilder, which builds a schema based on type annotations. For a complete list of available payload builders, see the transforms.external API reference.

Use standard element types

At a multi-language boundary, you have to use element types that all the Beam SDKs understand. These are types represented by the Beam standard coders:

For arbitrary structured types (for example, an arbitrary Java object), use ROW (PCollection<Row>). You may have to develop a new Java composite transform that produces a PCollection<Row>. You can use SDK-specific coders within a composite cross-language transform, as long as these coders aren’t used by PCollections that are consumed by the other SDKs.

Run the pipeline

The exact commands for running the Python pipeline will vary based on your environment. Assuming that your pipeline is coded in a file named addprefix.py, the steps should be similar to those below. For more information, see the comments in addprefix.py.

Run with direct runner

In the following command, input1 is a file containing lines of text:

python addprefix.py --runner DirectRunner --environment_type=DOCKER --input input1 --output output

Run with Dataflow runner

The following script runs the multi-language pipeline on Dataflow, using example text from a Cloud Storage bucket. You’ll need to adapt the script to your environment.

#!/bin/bash
export GCP_PROJECT=<project>
export GCS_BUCKET=<bucket>
export TEMP_LOCATION=gs://$GCS_BUCKET/tmp
export GCP_REGION=<region>
export JOB_NAME="javaprefix-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="1"

# other commands, e.g. changing into the appropriate directory

gsutil rm gs://$GCS_BUCKET/javaprefix/*

python addprefix.py \
    --runner DataflowRunner \
    --temp_location $TEMP_LOCATION \
    --project $GCP_PROJECT \
    --region $GCP_REGION \
    --job_name $JOB_NAME \
    --num_workers $NUM_WORKERS \
    --input "gs://dataflow-samples/shakespeare/kinglear.txt" \
    --output "gs://$GCS_BUCKET/javaprefix/output"