apache_beam.transforms.util module

Simple utility PTransforms.

class apache_beam.transforms.util.BatchElements(min_batch_size=1, max_batch_size=10000, target_batch_overhead=0.05, target_batch_duration_secs=10, target_batch_duration_secs_including_fixed_cost=None, max_batch_duration_secs=None, *, element_size_fn=<function BatchElements.<lambda>>, variance=0.25, clock=<built-in function time>, record_metrics=True)[source]

Bases: PTransform

A Transform that batches elements for amortized processing.

This transform is designed to precede operations whose processing cost is of the form

time = fixed_cost + num_elements * per_element_cost

where the per element cost is (often significantly) smaller than the fixed cost and could be amortized over multiple elements. It consumes a PCollection of element type T and produces a PCollection of element type List[T].

This transform attempts to find the best batch size between the minimim and maximum parameters by profiling the time taken by (fused) downstream operations. For a fixed batch size, set the min and max to be equal.

Elements are batched per-window and batches emitted in the window corresponding to its contents. Each batch is emitted with a timestamp at the end of their window.

When the max_batch_duration_secs arg is provided, a stateful implementation of BatchElements is used to batch elements across bundles. This is most impactful in streaming applications where many bundles only contain one element. Larger max_batch_duration_secs values might reduce the throughput of the transform, while smaller values might improve the throughput but make it more likely that batches are smaller than the target batch size.

As a general recommendation, start with low values (e.g. 0.005 aka 5ms) and increase as needed to get the desired tradeoff between target batch size and latency or throughput.

For more information on tuning parameters to this transform, see https://beam.apache.org/documentation/patterns/batch-elements

Parameters:
  • min_batch_size – (optional) the smallest size of a batch

  • max_batch_size – (optional) the largest size of a batch

  • target_batch_overhead – (optional) a target for fixed_cost / time, as used in the formula above

  • target_batch_duration_secs – (optional) a target for total time per bundle, in seconds, excluding fixed cost

  • target_batch_duration_secs_including_fixed_cost – (optional) a target for total time per bundle, in seconds, including fixed cost

  • max_batch_duration_secs – (optional) the maximum amount of time to buffer a batch before emitting. Setting this argument to be non-none uses the stateful implementation of BatchElements.

  • element_size_fn – (optional) A mapping of an element to its contribution to batch size, defaulting to every element having size 1. When provided, attempts to provide batches of optimal total size which may consist of a varying number of elements.

  • variance – (optional) the permitted (relative) amount of deviation from the (estimated) ideal batch size used to produce a wider base for linear interpolation

  • clock – (optional) an alternative to time.time for measuring the cost of donwstream operations (mostly for testing)

  • record_metrics – (optional) whether or not to record beam metrics on distributions of the batch size. Defaults to True.

expand(pcoll)[source]
class apache_beam.transforms.util.CoGroupByKey(*, pipeline=None)[source]

Bases: PTransform

Groups results across several PCollections by key.

Given an input dict of serializable keys (called “tags”) to 0 or more PCollections of (key, value) tuples, it creates a single output PCollection of (key, value) tuples whose keys are the unique input keys from all inputs, and whose values are dicts mapping each tag to an iterable of whatever values were under the key in the corresponding PCollection, in this manner:

('some key', {'tag1': ['value 1 under "some key" in pcoll1',
                       'value 2 under "some key" in pcoll1',
                       ...],
              'tag2': ... ,
              ... })

where [] refers to an iterable, not a list.

For example, given:

{'tag1': pc1, 'tag2': pc2, 333: pc3}

where:

pc1 = beam.Create([(k1, v1)]))
pc2 = beam.Create([])
pc3 = beam.Create([(k1, v31), (k1, v32), (k2, v33)])

The output PCollection would consist of items:

[(k1, {'tag1': [v1], 'tag2': [], 333: [v31, v32]}),
 (k2, {'tag1': [], 'tag2': [], 333: [v33]})]

where [] refers to an iterable, not a list.

CoGroupByKey also works for tuples, lists, or other flat iterables of PCollections, in which case the values of the resulting PCollections will be tuples whose nth value is the iterable of values from the nth PCollection—conceptually, the “tags” are the indices into the input. Thus, for this input:

(pc1, pc2, pc3)

the output would be:

[(k1, ([v1], [], [v31, v32]),
 (k2, ([], [], [v33]))]

where, again, [] refers to an iterable, not a list.

\*\*kwargs

Accepts a single named argument “pipeline”, which specifies the pipeline that “owns” this PTransform. Ordinarily CoGroupByKey can obtain this information from one of the input PCollections, but if there are none (or if there’s a chance there may be none), this argument is the only way to provide pipeline information, and should be considered mandatory.

expand(pcolls)[source]
apache_beam.transforms.util.Distinct()[source]

Produces a PCollection containing distinct elements of a PCollection.

apache_beam.transforms.util.Keys(label='Keys')[source]

Produces a PCollection of first elements of 2-tuples in a PCollection.

apache_beam.transforms.util.KvSwap(label='KvSwap')[source]

Produces a PCollection reversing 2-tuples in a PCollection.

class apache_beam.transforms.util.LogElements(label=None, prefix='', with_timestamp=False, with_window=False, level=None)[source]

Bases: PTransform

PTransform for printing the elements of a PCollection.

Parameters:
  • label (str) – (optional) A custom label for the transform.

  • prefix (str) – (optional) A prefix string to prepend to each logged element.

  • with_timestamp (bool) – (optional) Whether to include element’s timestamp.

  • with_window (bool) – (optional) Whether to include element’s window.

  • level – (optional) The logging level for the output (e.g. logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR). If not specified, the log is printed to stdout.

expand(input)[source]
class apache_beam.transforms.util.Regex[source]

Bases: object

PTransform to use Regular Expression to process the elements in a PCollection.

ALL = '__regex_all_groups'
static matches(regex, group=0)[source]

Returns the matches (group 0 by default) if zero or more characters at the beginning of string match the regular expression. To match the entire string, add “$” sign at the end of regex expression.

Group can be integer value or a string value.

Parameters:
  • regex – the regular expression string or (re.compile) pattern.

  • group – (optional) name/number of the group, it can be integer or a string value. Defaults to 0, meaning the entire matched string will be returned.

static all_matches(regex)[source]

Returns all matches (groups) if zero or more characters at the beginning of string match the regular expression.

Parameters:

regex – the regular expression string or (re.compile) pattern.

static matches_kv(regex, keyGroup, valueGroup=0)[source]

Returns the KV pairs if the string matches the regular expression, deriving the key & value from the specified group of the regular expression.

Parameters:
  • regex – the regular expression string or (re.compile) pattern.

  • keyGroup – The Regex group to use as the key. Can be int or str.

  • valueGroup – (optional) Regex group to use the value. Can be int or str. The default value “0” returns entire matched string.

static find(regex, group=0)[source]

Returns the matches if a portion of the line matches the Regex. Returns the entire group (group 0 by default). Group can be integer value or a string value.

Parameters:
  • regex – the regular expression string or (re.compile) pattern.

  • group – (optional) name of the group, it can be integer or a string value.

static find_all(regex, group=0, outputEmpty=True)[source]

Returns the matches if a portion of the line matches the Regex. By default, list of group 0 will return with empty items. To get all groups, pass the Regex.ALL flag in the group parameter which returns all the groups in the tuple format.

Parameters:
  • regex – the regular expression string or (re.compile) pattern.

  • group – (optional) name of the group, it can be integer or a string value.

  • outputEmpty – (optional) Should empty be output. True to output empties and false if not.

static find_kv(regex, keyGroup, valueGroup=0)[source]

Returns the matches if a portion of the line matches the Regex. Returns the specified groups as the key and value pair.

Parameters:
  • regex – the regular expression string or (re.compile) pattern.

  • keyGroup – The Regex group to use as the key. Can be int or str.

  • valueGroup – (optional) Regex group to use the value. Can be int or str. The default value “0” returns entire matched string.

static replace_all(regex, replacement)[source]

Returns the matches if a portion of the line matches the regex and replaces all matches with the replacement string.

Parameters:
  • regex – the regular expression string or (re.compile) pattern.

  • replacement – the string to be substituted for each match.

static replace_first(regex, replacement)[source]

Returns the matches if a portion of the line matches the regex and replaces the first match with the replacement string.

Parameters:
  • regex – the regular expression string or (re.compile) pattern.

  • replacement – the string to be substituted for each match.

static split(regex, outputEmpty=False)[source]

Returns the list string which was splitted on the basis of regular expression. It will not output empty items (by defaults).

Parameters:
  • regex – the regular expression string or (re.compile) pattern.

  • outputEmpty – (optional) Should empty be output. True to output empties and false if not.

class apache_beam.transforms.util.Reify[source]

Bases: object

PTransforms for converting between explicit and implicit form of various Beam values.

class Timestamp(label: str | None = None)[source]

Bases: PTransform

PTransform to wrap a value in a TimestampedValue with it’s associated timestamp.

static add_timestamp_info(element, timestamp=TimestampParam)[source]
expand(pcoll)[source]
class Window(label: str | None = None)[source]

Bases: PTransform

PTransform to convert an element in a PCollection into a tuple of (element, timestamp, window), wrapped in a TimestampedValue with it’s associated timestamp.

static add_window_info(element, timestamp=TimestampParam, window=WindowParam)[source]
expand(pcoll)[source]
class TimestampInValue(label: str | None = None)[source]

Bases: PTransform

PTransform to wrap the Value in a KV pair in a TimestampedValue with the element’s associated timestamp.

static add_timestamp_info(element, timestamp=TimestampParam)[source]
expand(pcoll)[source]
class WindowInValue(label: str | None = None)[source]

Bases: PTransform

PTransform to convert the Value in a KV pair into a tuple of (value, timestamp, window), with the whole element being wrapped inside a TimestampedValue.

static add_window_info(element, timestamp=TimestampParam, window=WindowParam)[source]
expand(pcoll)[source]
apache_beam.transforms.util.RemoveDuplicates()[source]

Produces a PCollection containing distinct elements of a PCollection.

class apache_beam.transforms.util.Reshuffle(num_buckets=None)[source]

Bases: PTransform

PTransform that returns a PCollection equivalent to its input, but operationally provides some of the side effects of a GroupByKey, in particular checkpointing, and preventing fusion of the surrounding transforms.

Reshuffle adds a temporary random key to each element, performs a ReshufflePerKey, and finally removes the temporary key.

Parameters:

num_buckets – If set, specifies the maximum random keys that would be generated.

expand(pcoll: PValue) PCollection[source]
to_runner_api_parameter(unused_context: PipelineContext) Tuple[str, None][source]
static from_runner_api_parameter(unused_ptransform, unused_parameter, unused_context)[source]
class apache_beam.transforms.util.ToString[source]

Bases: object

PTransform for converting a PCollection element, KV or PCollection Iterable to string.

static Element()[source]

Transforms each element of the PCollection to a string.

static Iterables(delimiter=None)[source]

Transforms each item in the iterable of the input of PCollection to a string. There is no trailing delimiter.

static Kvs(delimiter=None)

Transforms each item in the iterable of the input of PCollection to a string. There is no trailing delimiter.

apache_beam.transforms.util.Values(label='Values')[source]

Produces a PCollection of second elements of 2-tuples in a PCollection.

apache_beam.transforms.util.WithKeys(k, *args, **kwargs)[source]

PTransform that takes a PCollection, and either a constant key or a callable, and returns a PCollection of (K, V), where each of the values in the input PCollection has been paired with either the constant key or a key computed from the value. The callable may optionally accept positional or keyword arguments, which should be passed to WithKeys directly. These may be either SideInputs or static (non-PCollection) values, such as ints.

class apache_beam.transforms.util.GroupIntoBatches(batch_size, max_buffering_duration_secs=None, clock=<built-in function time>)[source]

Bases: PTransform

PTransform that batches the input into desired batch size. Elements are buffered until they are equal to batch size provided in the argument at which point they are output to the output Pcollection.

Windows are preserved (batches will contain elements from the same window)

Create a new GroupIntoBatches.

Parameters:
  • batch_size – (required) How many elements should be in a batch

  • max_buffering_duration_secs – (optional) How long in seconds at most an incomplete batch of elements is allowed to be buffered in the states. The duration must be a positive second duration and should be given as an int or float. Setting this parameter to zero effectively means no buffering limit.

  • clock – (optional) an alternative to time.time (mostly for testing)

expand(pcoll)[source]
to_runner_api_parameter(unused_context: PipelineContext) Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload][source]
static from_runner_api_parameter(unused_ptransform, proto, unused_context)[source]
class WithShardedKey(batch_size, max_buffering_duration_secs=None, clock=<built-in function time>)[source]

Bases: PTransform

A GroupIntoBatches transform that outputs batched elements associated with sharded input keys.

By default, keys are sharded to such that the input elements with the same key are spread to all available threads executing the transform. Runners may override the default sharding to do a better load balancing during the execution time.

Create a new GroupIntoBatches with sharded output. See GroupIntoBatches transform for a description of input parameters.

expand(pcoll)[source]
to_runner_api_parameter(unused_context: PipelineContext) Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload][source]
static from_runner_api_parameter(unused_ptransform, proto, unused_context)[source]