Getting started from Apache Spark

If you already know Apache Spark, using Beam should be easy. The basic concepts are the same, and the APIs are similar as well.

Spark stores data Spark DataFrames for structured data, and in Resilient Distributed Datasets (RDD) for unstructured data. We are using RDDs for this guide.

A Spark RDD represents a collection of elements, while in Beam it’s called a Parallel Collection (PCollection). A PCollection in Beam does not have any ordering guarantees.

Likewise, a transform in Beam is called a Parallel Transform (PTransform).

Here are some examples of common operations and their equivalent between PySpark and Beam.

Overview

Here’s a simple example of a PySpark pipeline that takes the numbers from one to four, multiplies them by two, adds all the values together, and prints the result.

import pyspark

sc = pyspark.SparkContext()
result = (
    sc.parallelize([1, 2, 3, 4])
    .map(lambda x: x * 2)
    .reduce(lambda x, y: x + y)
)
print(result)

In Beam you pipe your data through the pipeline using the pipe operator | like data | beam.Map(...) instead of chaining methods like data.map(...), but they’re doing the same thing.

Here’s what an equivalent pipeline looks like in Beam.

import apache_beam as beam

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | beam.Create([1, 2, 3, 4])
        | beam.Map(lambda x: x * 2)
        | beam.CombineGlobally(sum)
        | beam.Map(print)
    )

ℹ️ Note that we called print inside a Map transform. That’s because we can only access the elements of a PCollection from within a PTransform. To inspect the data locally, you can use the InteractiveRunner

Another thing to note is that Beam pipelines are constructed lazily. This means that when you pipe | data you’re only declaring the transformations and the order you want them to happen, but the actual computation doesn’t happen. The pipeline is run after the with beam.Pipeline() as pipeline context has closed.

ℹ️ When the with beam.Pipeline() as pipeline context closes, it implicitly calls pipeline.run() which triggers the computation to happen.

The pipeline is then sent to your runner of choice and it processes the data.

ℹ️ The pipeline can run locally with the DirectRunner, or in a distributed runner such as Flink, Spark, or Dataflow. The Spark runner is not related to PySpark.

A label can optionally be added to a transform using the right shift operator >> like data | 'My description' >> beam.Map(...). This serves both as comments and makes your pipeline easier to debug.

This is how the pipeline looks after adding labels.

import apache_beam as beam

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create numbers' >> beam.Create([1, 2, 3, 4])
        | 'Multiply by two' >> beam.Map(lambda x: x * 2)
        | 'Sum everything' >> beam.CombineGlobally(sum)
        | 'Print results' >> beam.Map(print)
    )

Setup

Here’s a comparison on how to get started both in PySpark and Beam.

PySparkBeam
Install$ pip install pyspark$ pip install apache-beam
Importsimport pysparkimport apache_beam as beam
Creating a
local pipeline
sc = pyspark.SparkContext() as sc:
# Your pipeline code here.
with beam.Pipeline() as pipeline:
    # Your pipeline code here.
Creating valuesvalues = sc.parallelize([1, 2, 3, 4])values = pipeline | beam.Create([1, 2, 3, 4])
Creating
key-value pairs
pairs = sc.parallelize([
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
])
pairs = pipeline | beam.Create([
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
])
Running a
local pipeline
$ spark-submit spark_pipeline.py$ python beam_pipeline.py

Transforms

Here are the equivalents of some common transforms in both PySpark and Beam.

PySparkBeam
Mapvalues.map(lambda x: x * 2)values | beam.Map(lambda x: x * 2)
Filtervalues.filter(lambda x: x % 2 == 0)values | beam.Filter(lambda x: x % 2 == 0)
FlatMapvalues.flatMap(lambda x: range(x))values | beam.FlatMap(lambda x: range(x))
Group by keypairs.groupByKey()pairs | beam.GroupByKey()
Reducevalues.reduce(lambda x, y: x+y)values | beam.CombineGlobally(sum)
Reduce by keypairs.reduceByKey(lambda x, y: x+y)pairs | beam.CombinePerKey(sum)
Distinctvalues.distinct()values | beam.Distinct()
Countvalues.count()values | beam.combiners.Count.Globally()
Count by keypairs.countByKey()pairs | beam.combiners.Count.PerKey()
Take smallestvalues.takeOrdered(3)values | beam.combiners.Top.Smallest(3)
Take largestvalues.takeOrdered(3, lambda x: -x)values | beam.combiners.Top.Largest(3)
Random samplevalues.takeSample(False, 3)values | beam.combiners.Sample.FixedSizeGlobally(3)
Unionvalues.union(otherValues)(values, otherValues) | beam.Flatten()
Co-grouppairs.cogroup(otherPairs){'Xs': pairs, 'Ys': otherPairs} | beam.CoGroupByKey()

ℹ️ To learn more about the transforms available in Beam, check the Python transform gallery.

Using calculated values

Since we are working in potentially distributed environments, we can’t guarantee that the results we’ve calculated are available at any given machine.

In PySpark, we can get a result from a collection of elements (RDD) by using data.collect(), or other aggregations such as reduce(), count(), and more.

Here’s an example to scale numbers into a range between zero and one.

import pyspark

sc = pyspark.SparkContext()
values = sc.parallelize([1, 2, 3, 4])
min_value = values.reduce(min)
max_value = values.reduce(max)

# We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`.
scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value))

# But to access `scaled_values`, we need to call `collect`.
print(scaled_values.collect())

In Beam the results from all transforms result in a PCollection. We use side inputs to feed a PCollection into a transform and access its values.

Any transform that accepts a function, like Map, can take side inputs. If we only need a single value, we can use beam.pvalue.AsSingleton and access them as a Python value. If we need multiple values, we can use beam.pvalue.AsIter and access them as an iterable.

import apache_beam as beam

with beam.Pipeline() as pipeline:
    values = pipeline | beam.Create([1, 2, 3, 4])
    min_value = values | beam.CombineGlobally(min)
    max_value = values | beam.CombineGlobally(max)

    # To access `min_value` and `max_value`, we need to pass them as a side input.
    scaled_values = values | beam.Map(
        lambda x, minimum, maximum: (x - minimum) / (maximum - minimum),
        minimum=beam.pvalue.AsSingleton(min_value),
        maximum=beam.pvalue.AsSingleton(max_value),
    )

    scaled_values | beam.Map(print)

ℹ️ In Beam we need to pass a side input explicitly, but we get the benefit that a reduction or aggregation does not have to fit into memory. Lazily computing side inputs also allows us to compute values only once, rather than for each distinct reduction (or requiring explicit caching of the RDD).

Next Steps

Please don’t hesitate to reach out if you encounter any issues!