apache_beam.transforms.trigger module

Support for Apache Beam triggers.

Triggers control when in processing time windows get emitted.

class apache_beam.transforms.trigger.AccumulationMode[source]

Bases: object

Controls what to do with data when a trigger fires multiple times.

DISCARDING = 1
ACCUMULATING = 2
class apache_beam.transforms.trigger.TriggerFn[source]

Bases: object

A TriggerFn determines when window (panes) are emitted.

See https://beam.apache.org/documentation/programming-guide/#triggers

abstract on_element(element, window, context)[source]

Called when a new element arrives in a window.

Parameters:
  • element – the element being added

  • window – the window to which the element is being added

  • context – a context (e.g. a TriggerContext instance) for managing state and setting timers

abstract on_merge(to_be_merged, merge_result, context)[source]

Called when multiple windows are merged.

Parameters:
  • to_be_merged – the set of windows to be merged

  • merge_result – the window into which the windows are being merged

  • context – a context (e.g. a TriggerContext instance) for managing state and setting timers

abstract should_fire(time_domain, timestamp, window, context)[source]

Whether this trigger should cause the window to fire.

Parameters:
  • time_domain – WATERMARK for event-time timers and REAL_TIME for processing-time timers.

  • timestamp – for time_domain WATERMARK, it represents the watermark: (a lower bound on) the watermark of the system and for time_domain REAL_TIME, it represents the trigger: timestamp of the processing-time timer.

  • window – the window whose trigger is being considered

  • context – a context (e.g. a TriggerContext instance) for managing state and setting timers

Returns:

whether this trigger should cause a firing

abstract has_ontime_pane()[source]

Whether this trigger creates an empty pane even if there are no elements.

Returns:

True if this trigger guarantees that there will always be an ON_TIME pane even if there are no elements in that pane.

abstract on_fire(watermark, window, context)[source]

Called when a trigger actually fires.

Parameters:
  • watermark – (a lower bound on) the watermark of the system

  • window – the window whose trigger is being fired

  • context – a context (e.g. a TriggerContext instance) for managing state and setting timers

Returns:

whether this trigger is finished

abstract reset(window, context)[source]

Clear any state and timers used by this TriggerFn.

may_lose_data(unused_windowing: Windowing) DataLossReason[source]

Returns whether or not this trigger could cause data loss.

A trigger can cause data loss in the following scenarios:

  • The trigger has a chance to finish. For instance, AfterWatermark() without a late trigger would cause all late data to be lost. This scenario is only accounted for if the windowing strategy allows late data. Otherwise, the trigger is not responsible for the data loss.

Note that this only returns the potential for loss. It does not mean that there will be data loss. It also only accounts for loss related to the trigger, not other potential causes.

Parameters:

windowing – The Windowing that this trigger belongs to. It does not need to be the top-level trigger.

Returns:

The DataLossReason. If there is no potential loss,

DataLossReason.NO_POTENTIAL_LOSS is returned. Otherwise, all the potential reasons are returned as a single value.

static from_runner_api(proto, context)[source]
abstract to_runner_api(unused_context)[source]
class apache_beam.transforms.trigger.DefaultTrigger[source]

Bases: TriggerFn

Semantically Repeatedly(AfterWatermark()), but more optimized.

on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(unused_windowing)[source]
static from_runner_api(proto, context)[source]
to_runner_api(unused_context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.AfterWatermark(early=None, late=None)[source]

Bases: TriggerFn

Fire exactly once when the watermark passes the end of the window.

Parameters:
  • early – if not None, a speculative trigger to repeatedly evaluate before the watermark passes the end of the window

  • late – if not None, a speculative trigger to repeatedly evaluate after the watermark passes the end of the window

LATE_TAG = CombiningValueStateTag(is_late, CallableWrapperCombineFn(<built-in function any>))
is_late(context)[source]
on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(windowing)[source]

May cause data loss if lateness allowed and no late trigger set.

static from_runner_api(proto, context)[source]
to_runner_api(context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.AfterProcessingTime(delay=0)[source]

Bases: TriggerFn

Fire exactly once after a specified delay from processing time.

Initialize a processing time trigger with a delay in seconds.

STATE_TAG = SetStateTag(has_timer)
on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, timestamp, window, context)[source]
on_fire(timestamp, window, context)[source]
reset(window, context)[source]
may_lose_data(unused_windowing)[source]

AfterProcessingTime may finish.

static from_runner_api(proto, context)[source]
to_runner_api(context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.AfterCount(count)[source]

Bases: TriggerFn

Fire when there are at least count elements in this window pane.

COUNT_TAG = CombiningValueStateTag(count, <apache_beam.transforms.combiners.CountCombineFn object>)
on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(unused_windowing)[source]

AfterCount may finish.

static from_runner_api(proto, unused_context)[source]
to_runner_api(unused_context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.Repeatedly(underlying)[source]

Bases: TriggerFn

Repeatedly invoke the given trigger, never finishing.

on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(windowing)[source]

Repeatedly will run in a loop and pick up whatever is left at GC.

static from_runner_api(proto, context)[source]
to_runner_api(context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.AfterAny(*triggers)[source]

Bases: _ParallelTriggerFn

Fires when any subtrigger fires.

Also finishes when any subtrigger finishes.

combine_op()

Return True if bool(x) is True for any x in the iterable.

If the iterable is empty, return False.

class apache_beam.transforms.trigger.AfterAll(*triggers)[source]

Bases: _ParallelTriggerFn

Fires when all subtriggers have fired.

Also finishes when all subtriggers have finished.

combine_op()

Return True if bool(x) is True for all values x in the iterable.

If the iterable is empty, return True.

class apache_beam.transforms.trigger.AfterEach(*triggers)[source]

Bases: TriggerFn

INDEX_TAG = CombiningValueStateTag(index, CallableWrapperCombineFn(<function AfterEach.<lambda>>))
on_element(element, window, context)[source]
on_merge(to_be_merged, merge_result, context)[source]
should_fire(time_domain, watermark, window, context)[source]
on_fire(watermark, window, context)[source]
reset(window, context)[source]
may_lose_data(windowing)[source]

If all sub-triggers may finish, this may finish.

static from_runner_api(proto, context)[source]
to_runner_api(context)[source]
has_ontime_pane()[source]
class apache_beam.transforms.trigger.OrFinally(*triggers)[source]

Bases: AfterAny

static from_runner_api(proto, context)[source]
to_runner_api(context)[source]