Combine
A user-defined CombineFn
may be applied to combine all elements in a
PCollection
(global combine) or to combine all elements associated
with each key.
While the result is similar to applying a GroupByKey
followed by
aggregating values in each Iterable
, there is an impact
on the code you must write as well as the performance of the pipeline.
Writing a ParDo
that counts the number of elements in each value
would be very straightforward. However, as described in the execution
model, it would also require all values associated with each key to be
processed by a single worker. This introduces a lot of communication overhead.
Using a CombineFn
requires the code be structured as an associative and
commumative operation. But, it allows the use of partial sums to be precomputed.
See more information in the Beam Programming Guide.
Examples
Example 1: Global combine
Use the global combine to combine all of the elements in a given PCollection
into a single value, represented in your pipeline as a new PCollection
containing
one element. The following example code shows how to apply the Beam-provided
sum combine function to produce a single sum value for a PCollection
of integers.
Example 2: Keyed combine
Use a keyed combine to combine all of the values associated with each key into a single output value for each key. As with the global combine, the function passed to a keyed combine must be associative and commutative.
// PCollection is grouped by key and the Double values associated with each key are combined into a Double.
PCollection<KV<String, Double>> salesRecords = ...;
PCollection<KV<String, Double>> totalSalesPerPerson =
salesRecords.apply(Combine.<String, Double, Double>perKey(
new Sum.SumDoubleFn()));
// The combined value is of a different type than the original collection of values per key. PCollection has
// keys of type String and values of type Integer, and the combined value is a Double.
PCollection<KV<String, Integer>> playerAccuracy = ...;
PCollection<KV<String, Double>> avgAccuracyPerPlayer =
playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
new MeanInts())));
Example 3:
Related transforms
Last updated on 2025/01/20
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!