Python Streaming Pipelines

Python streaming pipeline execution became available (with some limitations) starting with Beam SDK version 2.5.0.

Why use streaming execution?

Beam creates an unbounded PCollection if your pipeline reads from a streaming or continuously-updating data source (such as Cloud Pub/Sub). A runner must process an unbounded PCollection using a streaming job that runs continuously, as the entire collection is never available for processing at any one time. Size and boundedness has more information about bounded and unbounded collections.

Modifying a pipeline to use stream processing

To modify a batch pipeline to support streaming, you must make the following code changes:

The Beam SDK for Python includes two I/O connectors that support unbounded PCollections: Google Cloud Pub/Sub (reading and writing) and Google BigQuery (writing).

The following snippets show the necessary code changes to modify the batch WordCount example to support streaming:

These batch WordCount snippets are from wordcount.py. This code uses the TextIO I/O connector to read from and write to a bounded collection.

  lines = p | 'read' >> ReadFromText(known_args.input)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))
  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write the output using a "Write" transform that has side effects.
  output | 'write' >> WriteToText(known_args.output)

These streaming WordCount snippets are from streaming_wordcount.py. This code uses an I/O connector that reads from and writes to an unbounded source (Cloud Pub/Sub) and specifies a fixed windowing strategy.

  lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
  ...

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | beam.WindowInto(window.FixedWindows(15, 0))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))

  ...

  output = counts | 'format' >> beam.Map(format_result)

  # Write to Pub/Sub
  output | beam.io.WriteStringsToPubSub(known_args.output_topic)

Running a streaming pipeline

To run the example streaming WordCount pipeline, you must have a Cloud Pub/Sub input topic and output topic. To create, subscribe to, and pull from a topic for testing purposes, you can use the commands in the Cloud Pub/Sub quickstart.

The following simple bash script feeds lines of an input text file to your input topic:

cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> --message "$line"; done

Alternately, you can read from a publicly available Cloud Pub/Sub stream, such as projects/pubsub-public-data/topics/taxirides-realtime. However, you must create your own output topic to test writes.

The following commands run the streaming_wordcount.py example streaming pipeline. Specify your Cloud Pub/Sub project and input topic (--input_topic), output Cloud Pub/Sub project and topic (--output_topic).

# DirectRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming
See /documentation/runners/spark/ for more information.
# As part of the initial setup, install Google Cloud Platform specific extra components.
pip install apache-beam[gcp]

# DataflowRunner requires the --streaming option
python -m apache_beam.examples.streaming_wordcount \
  --runner DataflowRunner \
  --project YOUR_GCP_PROJECT \
  --region YOUR_GCP_REGION \
  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
  --streaming

Check your runner’s documentation for any additional runner-specific information about executing streaming pipelines: