Java multi-language pipelines quickstart
This page provides a high-level overview of creating multi-language pipelines with the Apache Beam SDK for Java. For a more complete discussion of the topic, see Multi-language pipelines.
A multi-language pipeline is a pipeline that’s built in one Beam SDK language and uses one or more transforms from another Beam SDK language. These transforms from another SDK are called cross-language transforms. Multi-language support makes pipeline components easier to share across the Beam SDKs and grows the pool of available transforms for all the SDKs.
In the examples below, the multi-language pipeline is built with the Beam Java SDK, and the cross-language transform is built with the Beam Python SDK.
Prerequisites
This quickstart is based on a Java example pipeline, PythonDataframeWordCount, that counts words in a Shakespeare text. If you’d like to run the pipeline, you can clone or download the Beam repository and build the example from the source code.
To build and run the example, you need a Java environment with the Beam Java SDK version 2.41.0 or later installed, and a Python environment. If you don’t already have these environments set up, first complete the Apache Beam Java SDK Quickstart and the Apache Beam Python SDK Quickstart.
For running with portable DirectRunner, you need to have Docker installed locally and the Docker daemon should be running. This is not needed for Dataflow.
For running on Dataflow, you need a Google Cloud project with billing enabled and a Google Cloud Storage bucket.
This example relies on Python pandas package 1.4.0 or later which is unavailable for Python versions earlier than 3.8. Hence please make sure that the default Python version installed in your system is 3.8 or later.
Specify a cross-language transform
The Java example pipeline uses the Python DataframeTransform as a cross-language transform. The transform is part of the Beam Dataframe API for working with pandas-like DataFrame objects.
To apply a cross-language transform, your pipeline must specify it. Python
transforms are identified by their fully qualified name. For example,
DataframeTransform
can be found in the apache_beam.dataframe.transforms
package, so its fully qualified name is
apache_beam.dataframe.transforms.DataframeTransform
.
The example pipeline,
PythonDataframeWordCount,
passes this fully qualified name to
PythonExternalTransform.
Note: The example pipeline is intended to demonstrate the development of Java multi-language pipelines that use arbitrary Python cross-language transforms. For production use cases of the Dataframe API in Java, you should use the higher-level DataframeTransform instead.
Here’s the complete pipeline definition from the example:
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWordsFn()))
.setRowSchema(ExtractWordsFn.SCHEMA)
.apply(
PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from(
"apache_beam.dataframe.transforms.DataframeTransform",
options.getExpansionService())
.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
.withKwarg("include_indexes", true))
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
PythonExternalTransform
is a wrapper for invoking external Python transforms.
The
from
method accepts two strings: 1) the fully qualified transform name; 2) an
optional address and port number for the expansion service. The method returns
a stub for the Python cross-language transform that can be used directly in a
Java pipeline.
withKwarg
specifies a keyword argument for instantiating the Python cross-language
transform. In this case, withKwarg
is invoked twice, to specify a func
argument and an include_indexes
argument, and these arguments are passed to
DataframeTransform
. PythonExternalTransform
also provides other ways to
specify args and kwargs for Python cross-language transforms.
To understand how this pipeline works, it’s helpful to look more closely at the
first withKwarg
invocation:
.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
The argument to PythonCallableSource.of
is a string representation of a Python
lambda function. DataframeTransform
takes as an argument a Python callable to
apply to a PCollection
as if it were a Dataframe. The withKwarg
method lets
you specify a Python callable in your Java pipeline. To learn more about passing
a function to DataframeTransform
, see
Embedding DataFrames in a pipeline.
Run the Java pipeline
If you want to customize the environment or use transforms not available in the default Beam SDK, you might need to run your own expansion service. In such cases, start the expansion service before running your pipeline.
Before running the pipeline, make sure to perform the runner specific setup for your selected Beam runner.
Run with Dataflow runner using a Maven Archetype (Beam 2.43.0 and later)
- Check out the Beam examples Maven archetype for the relevant Beam version.
export BEAM_VERSION=<Beam version>
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=$BEAM_VERSION \
-DgroupId=org.example \
-DartifactId=multi-language-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
- Run the pipeline.
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.PythonDataframeWordCount \
-Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \
--region=$GCP_REGION \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output" \
-Pdataflow-runner
Run with Dataflow runner at HEAD
The following script runs the example multi-language pipeline on Dataflow, using example text from a Cloud Storage bucket. You’ll need to adapt the script to your environment.
export GCP_PROJECT=<project>
export OUTPUT_BUCKET=<bucket>
export GCP_REGION=<region>
export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp
./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
--runner=DataflowRunner \
--project=$GCP_PROJECT \
--output=gs://${OUTPUT_BUCKET}/count \
--region=${GCP_REGION}"
The pipeline outputs a file with the results to gs://$OUTPUT_BUCKET/count-00000-of-00001.
Run with DirectRunner
Note: Multi-language Pipelines need to use portable runners. Portable DirectRunner is still experimental and does not support all Beam features.
- Create a Python virtual environment with the latest version of Beam Python SDK installed. Please see here for instructions.
- Run the job server for portable DirectRunner (implemented in Python).
export JOB_SERVER_PORT=<port>
python -m apache_beam.runners.portability.local_job_service_main -p $JOB_SERVER_PORT
In a different shell, go to a Beam HEAD Git clone.
Build the Beam Java SDK container for a local pipeline execution (this guide requires that your JAVA_HOME is set to Java 11).
./gradlew :sdks:java:container:java11:docker -Pjava11Home=$JAVA_HOME
- Run the pipeline.
export JOB_SERVER_PORT=<port> # Same port as before
export OUTPUT_FILE=<local relative path>
./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \
--runner=PortableRunner \
--jobEndpoint=localhost:$JOB_SERVER_PORT \
--output=$OUTPUT_FILE"
Note This output gets written to the local file system of a Python Docker container. To verify the output by writing to GCS, you need to specify a publicly accessible GCS path for the
output
option since portable DirectRunner is currently unable to correctly forward local credentials for accessing GCS.
Advanced: Start an expansion service
When building a job for a multi-language pipeline, Beam uses an expansion service to expand composite transforms. You must have at least one expansion service per remote SDK.
In the general case, if you have a supported version of Python installed on your
system, you can let PythonExternalTransform
handle the details of creating and
starting up the expansion service. But if you want to customize the environment
or use transforms not available in the default Beam SDK, you might need to run
your own expansion service.
For example, to start the standard expansion service for a Python transform, ExpansionServiceServicer, follow these steps:
Activate a new virtual environment following these instructions.
Install Apache Beam with
gcp
anddataframe
packages.
pip install 'apache-beam[gcp,dataframe]'
- Run the following command
python -m apache_beam.runners.portability.expansion_service_main -p <PORT> --fully_qualified_name_glob "*"
The command runs
expansion_service_main.py, which starts the standard expansion service. When you use
Gradle to run your Java pipeline, you can specify the expansion service with the
expansionService
option. For example: --expansionService=localhost:<PORT>
.
Next steps
To learn more about Beam support for cross-language pipelines, see Multi-language pipelines. To learn more about the Beam DataFrame API, see Beam DataFrames overview.