Pipeline option patterns

The samples on this page show you common pipeline configurations. For more information about pipeline configuration options, see Creating a pipeline and Configuring pipeline options.

Retroactively logging runtime parameters

Use the ValueProvider interface to access runtime parameters after completing a pipeline job.

You can use the ValueProvider interface to pass runtime parameters to your pipeline, but you can only log the parameters from within the Beam DAG. A solution is to add a pipeline branch with a DoFn that processes a placeholder value and then logs the runtime parameters:

  /** Sample of PipelineOptions with a ValueProvider option argument. */
  public interface MyOptions extends PipelineOptions {
    @Description("My option")
    @Default.String("Hello world!")
    ValueProvider<String> getStringValue();

    void setStringValue(ValueProvider<String> value);
  }

  public static void accessingValueProviderInfoAfterRunSnip1(String[] args) {

    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    // Create pipeline.
    Pipeline p = Pipeline.create(options);

    // Add a branch for logging the ValueProvider value.
    p.apply(Create.of(1))
        .apply(
            ParDo.of(
                new DoFn<Integer, Integer>() {

                  // Define the DoFn that logs the ValueProvider value.
                  @ProcessElement
                  public void process(ProcessContext c) {

                    MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
                    // This example logs the ValueProvider value, but you could store it by
                    // pushing it to an external database.

                    LOG.info("Option StringValue was {}", ops.getStringValue());
                  }
                }));

    // The main pipeline.
    p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally());

    p.run();
  }
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--string_value', type=str)

class LogValueProvidersFn(beam.DoFn):
  def __init__(self, string_vp):
    self.string_vp = string_vp

  # Define the DoFn that logs the ValueProvider value.
  # The DoFn is called when creating the pipeline branch.
  # This example logs the ValueProvider value, but
  # you could store it by pushing it to an external database.
  def process(self, an_int):
    logging.info('The string_value is %s' % self.string_vp.get())
    # Another option (where you don't need to pass the value at all) is:
    logging.info(
        'The string value is %s' %
        RuntimeValueProvider.get_value('string_value', str, ''))

beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)

# Create pipeline.
with beam.Pipeline(options=beam_options) as pipeline:

  # Add a branch for logging the ValueProvider value.
  _ = (
      pipeline
      | beam.Create([None])
      | 'LogValueProvs' >> beam.ParDo(LogValueProvidersFn(args.string_value)))

  # The main pipeline.
  result_pc = (
      pipeline
      | "main_pc" >> beam.Create([1, 2, 3])
      | beam.combiners.Sum.Globally())