Side input patterns

The samples on this page show you common Beam side input patterns. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. For more information, see the programming guide section on side inputs.

If you are trying to enrich your data by doing a key-value lookup to a remote service, you may first want to consider the Enrichment transform which can abstract away some of the details of side inputs and provide additional benefits like client-side throttling.

Slowly updating global window side inputs

You can retrieve side inputs from global windows to use them in a pipeline job with non-global windows, like a FixedWindow.

To slowly update global window side inputs in pipelines with non-global windows:

  1. Write a DoFn that periodically pulls data from a bounded source into a global window.

    a. Use the GenerateSequence source transform to periodically emit a value.

    b. Instantiate a data-driven trigger that activates on each element and pulls data from a bounded source.

    c. Fire the trigger to pass the data into the global window.

  2. Create the side input for downstream transforms. The side input should fit into memory.

The global window side input triggers on processing time, so the main pipeline non-deterministically matches the side input to elements in event time.

For instance, the following code sample uses a Map to create a DoFn. The Map becomes a View.asSingleton side input that’s rebuilt on each counter tick. The side input updates every 5 seconds in order to demonstrate the workflow. In a real-world scenario, the side input would typically update every few hours or once per day.

  public static void sideInputPatterns() {
    // This pipeline uses View.asSingleton for a placeholder external service.
    // Run in debug mode to see the output.
    Pipeline p = Pipeline.create();

    // Create a side input that updates every 5 seconds.
    // View as an iterable, not singleton, so that if we happen to trigger more
    // than once before Latest.globally is computed we can handle both elements.
    PCollectionView<Iterable<Map<String, String>>> mapIterable =
        p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
            .apply(
                ParDo.of(
                    new DoFn<Long, Map<String, String>>() {

                      @ProcessElement
                      public void process(
                          @Element Long input,
                          @Timestamp Instant timestamp,
                          OutputReceiver<Map<String, String>> o) {
                        // Replace map with test data from the placeholder external service.
                        // Add external reads here.
                        o.output(PlaceholderExternalService.readTestData(timestamp));
                      }
                    }))
            .apply(
                Window.<Map<String, String>>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(Latest.globally())
            .apply(View.asIterable());

    // Consume side input. GenerateSequence generates test data.
    // Use a real source (like PubSubIO or KafkaIO) in production.
    p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
        .apply(Sum.longsGlobally().withoutDefaults())
        .apply(
            ParDo.of(
                    new DoFn<Long, KV<Long, Long>>() {

                      @ProcessElement
                      public void process(ProcessContext c, @Timestamp Instant timestamp) {
                        Iterable<Map<String, String>> si = c.sideInput(mapIterable);
                        // Take an element from the side input iterable (likely length 1)
                        Map<String, String> keyMap = si.iterator().next();
                        c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

                        LOG.info(
                            "Value is {} with timestamp {}, using key A from side input with time {}.",
                            c.element(),
                            timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")),
                            keyMap.get("Key_A"));
                      }
                    })
                .withSideInputs(mapIterable));

    p.run();
  }

  /** Placeholder class that represents an external service generating test data. */
  public static class PlaceholderExternalService {

    public static Map<String, String> readTestData(Instant timestamp) {

      Map<String, String> map = new HashMap<>();

      map.put("Key_A", timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")));

      return map;
    }
  }
No sample present.

Slowly updating side input using windowing

You can read side input data periodically into distinct PCollection windows. When you apply the side input to your main input, each main input window is automatically matched to a single side input window. This guarantees consistency on the duration of the single window, meaning that each window on the main input will be matched to a single version of side input data.

To read side input data periodically into distinct PCollection windows:

  1. Use the PeriodicImpulse or PeriodicSequence PTransform to:
    • Generate an infinite sequence of elements at required processing time intervals
    • Assign them to separate windows.
  2. Fetch data using SDF Read or ReadAll PTransform triggered by arrival of PCollection element.
  3. Apply the side input.
PCollectionView<List<Long>> sideInput =
    p.apply(
            "SIImpulse",
            PeriodicImpulse.create()
                .startAt(startAt)
                .stopAt(stopAt)
                .withInterval(interval1)
                .applyWindowing())
        .apply(
            "FileToRead",
            ParDo.of(
                new DoFn<Instant, String>() {
                  @DoFn.ProcessElement
                  public void process(@Element Instant notUsed, OutputReceiver<String> o) {
                    o.output(fileToRead);
                  }
                }))
        .apply(FileIO.matchAll())
        .apply(FileIO.readMatches())
        .apply(TextIO.readFiles())
        .apply(
            ParDo.of(
                new DoFn<String, String>() {
                  @ProcessElement
                  public void process(@Element String src, OutputReceiver<String> o) {
                    o.output(src);
                  }
                }))
        .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
        .apply(View.asList());

PCollection<Instant> mainInput =
    p.apply(
        "MIImpulse",
        PeriodicImpulse.create()
            .startAt(startAt.minus(Duration.standardSeconds(1)))
            .stopAt(stopAt.minus(Duration.standardSeconds(1)))
            .withInterval(interval2)
            .applyWindowing());

// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
PCollection<Long> result =
    mainInput.apply(
        "generateOutput",
        ParDo.of(
                new DoFn<Instant, Long>() {
                  @ProcessElement
                  public void process(ProcessContext c) {
                    c.output((long) c.sideInput(sideInput).size());
                  }
                })
            .withSideInputs(sideInput));
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window

# from apache_beam.utils.timestamp import MAX_TIMESTAMP
# last_timestamp = MAX_TIMESTAMP to go on indefninitely

# Any user-defined function.
# cross join is used as an example.
def cross_join(left, rights):
  for x in rights:
    yield (left, x)

# Create pipeline.
pipeline = beam.Pipeline()
side_input = (
    pipeline
    | 'PeriodicImpulse' >> PeriodicImpulse(
        first_timestamp, last_timestamp, interval, True)
    | 'MapToFileName' >> beam.Map(lambda x: src_file_pattern + str(x))
    | 'ReadFromFile' >> beam.io.ReadAllFromText())

main_input = (
    pipeline
    | 'MpImpulse' >> beam.Create(sample_main_input_elements)
    |
    'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
    | 'WindowMpInto' >> beam.WindowInto(
        window.FixedWindows(main_input_windowing_interval)))

result = (
    main_input
    | 'ApplyCrossJoin' >> beam.FlatMap(
        cross_join, rights=beam.pvalue.AsIter(side_input)))