ParDo
Pydoc |
A transform for generic parallel processing.
A ParDo
transform considers each element in the input PCollection
,
performs some processing function (your user code) on that element,
and emits zero or more elements to an output PCollection
.
See more information in the Beam Programming Guide.
Examples
In the following examples, we explore how to create custom DoFn
s and access
the timestamp and windowing information.
Example 1: ParDo with a simple DoFn
The following example defines a simple DoFn
class called SplitWords
which stores the delimiter
as an object field.
The process
method is called once per element,
and it can yield zero or more output elements.
Example 2: ParDo with timestamp and window information
In this example, we add new parameters to the process
method to bind parameter values at runtime.
beam.DoFn.TimestampParam
binds the timestamp information as anapache_beam.utils.timestamp.Timestamp
object.beam.DoFn.WindowParam
binds the window information as the appropriateapache_beam.transforms.window.*Window
object.
Example 3: ParDo with DoFn methods
A DoFn
can be customized with a number of methods that can help create more complex behaviors.
You can customize what a worker does when it starts and shuts down with setup
and teardown
.
You can also customize what to do when a
bundle of elements
starts and finishes with start_bundle
and finish_bundle
.
DoFn.setup()
: Called whenever theDoFn
instance is deserialized on the worker. This means it can be called more than once per worker because multiple instances of a givenDoFn
subclass may be created (e.g., due to parallelization, or due to garbage collection after a period of disuse). This is a good place to connect to database instances, open network connections or other resources. See alsoDoFn.SetupContextParam
for a way to accomplish this via context managers.DoFn.start_bundle()
: Called once per bundle of elements before callingprocess
on the first element of the bundle. This is a good place to start keeping track of the bundle elements. See alsoDoFn.BundleContextParam
for a way to accomplish this via context managers.DoFn.process(element, *args, **kwargs)
: Called once per element, can yield zero or more elements. Additional*args
or**kwargs
can be passed throughbeam.ParDo()
. [required]DoFn.finish_bundle()
: Called once per bundle of elements after callingprocess
after the last element of the bundle, can yield zero or more elements. This is a good place to do batch calls on a bundle of elements, such as running a database query.For example, you can initialize a batch in
start_bundle
, add elements to the batch inprocess
instead of yielding them, then running a batch query on those elements onfinish_bundle
, and yielding all the results.Note that yielded elements from
finish_bundle
must be of the typeapache_beam.utils.windowed_value.WindowedValue
. You need to provide a timestamp as a unix timestamp, which you can get from the relevant processed elements. You also need to provide a window, which you can get from the relevant processed elements like in the example below.DoFn.teardown()
: Called once (as a best effort) perDoFn
instance when theDoFn
instance is shutting down. This is a good place to close database instances, close network connections or other resources.Note that
teardown
is called as a best effort and is not guaranteed. For example, if the worker crashes,teardown
might not be called.
Known issues:
- [Issue 19394]
DoFn.teardown()
metrics are lost.
Related transforms
- Map behaves the same, but produces exactly one output for each input.
- FlatMap behaves the same as
Map
, but for each input it may produce zero or more outputs. - Filter is useful if the function is just deciding whether to output an element or not.
Pydoc |
Last updated on 2025/01/10
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!