Pipeline option patterns
The samples on this page show you common pipeline configurations. For more information about pipeline configuration options, see Creating a pipeline and Configuring pipeline options.
- Java SDK
- Python SDK
Retroactively logging runtime parameters
Use the ValueProvider
interface to access runtime parameters after completing a pipeline job.
You can use the ValueProvider
interface to pass runtime parameters to your pipeline, but you can only log the parameters from within the Beam DAG. A solution is to add a pipeline branch with a DoFn
that processes a placeholder value and then logs the runtime parameters:
/** Sample of PipelineOptions with a ValueProvider option argument. */
public interface MyOptions extends PipelineOptions {
@Description("My option")
@Default.String("Hello world!")
ValueProvider<String> getStringValue();
void setStringValue(ValueProvider<String> value);
}
public static void accessingValueProviderInfoAfterRunSnip1(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
// Create pipeline.
Pipeline p = Pipeline.create(options);
// Add a branch for logging the ValueProvider value.
p.apply(Create.of(1))
.apply(
ParDo.of(
new DoFn<Integer, Integer>() {
// Define the DoFn that logs the ValueProvider value.
@ProcessElement
public void process(ProcessContext c) {
MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
// This example logs the ValueProvider value, but you could store it by
// pushing it to an external database.
LOG.info("Option StringValue was {}", ops.getStringValue());
}
}));
// The main pipeline.
p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally());
p.run();
}
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--string_value', type=str)
class LogValueProvidersFn(beam.DoFn):
def __init__(self, string_vp):
self.string_vp = string_vp
# Define the DoFn that logs the ValueProvider value.
# The DoFn is called when creating the pipeline branch.
# This example logs the ValueProvider value, but
# you could store it by pushing it to an external database.
def process(self, an_int):
logging.info('The string_value is %s' % self.string_vp.get())
# Another option (where you don't need to pass the value at all) is:
logging.info(
'The string value is %s' %
RuntimeValueProvider.get_value('string_value', str, ''))
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
# Create pipeline.
with beam.Pipeline(options=beam_options) as pipeline:
# Add a branch for logging the ValueProvider value.
_ = (
pipeline
| beam.Create([None])
| 'LogValueProvs' >> beam.ParDo(LogValueProvidersFn(args.string_value)))
# The main pipeline.
result_pc = (
pipeline
| "main_pc" >> beam.Create([1, 2, 3])
| beam.combiners.Sum.Globally())
Last updated on 2024/12/20
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!