File processing patterns

This page describes common file processing tasks. For more information on file-based I/O, see Pipeline I/O and File-based input and output data.

Processing files as they arrive

This section shows you how to process files as they arrive in your file system or object store (like Google Cloud Storage). You can continuously read files or trigger stream and processing pipelines when a file arrives.

Continuous read mode

You can use FileIO or TextIO to continuously read the source for new files.

Use the FileIO class to continuously watch a single file pattern. The following example matches a file pattern repeatedly every 30 seconds, continuously returns new matched files as an unbounded PCollection<Metadata>, and stops if no new files appear for one hour:

// This produces PCollection<MatchResult.Metadata>
p.apply(
    FileIO.match()
        .filepattern("...")
        .continuously(
            Duration.standardSeconds(30),
            Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

The TextIO class watchForNewFiles property streams new file matches.

// This produces PCollection<String>
p.apply(
    TextIO.read()
        .from("<path-to-files>/*")
        .watchForNewFiles(
            // Check for new files every minute.
            Duration.standardMinutes(1),
            // Stop watching the file pattern if no new files appear for an hour.
            Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

Some runners may retain file lists during updates, but file lists don’t persist when you restart a pipeline. You can save file lists by:

The continuous-read option is not available for Python.

Stream processing triggered from external source

A streaming pipeline can process data from an unbounded source. For example, to trigger stream processing with Google Cloud Pub/Sub:

  1. Use an external process to detect when new files arrive.
  2. Send a Google Cloud Pub/Sub message with a URI to the file.
  3. Access the URI from a DoFn that follows the Google Cloud Pub/Sub source.
  4. Process the file.

Batch processing triggered from external source

To start or schedule a batch pipeline job when a file arrives, write the triggering event in the source file itself. This has the most latency because the pipeline must initialize before processing. It’s best suited for low-frequency, large, file-size updates.

Accessing filenames

Use the FileIO class to read filenames in a pipeline job. FileIO returns a PCollection<ReadableFile> object, and the ReadableFile instance contains the filename.

To access filenames:

  1. Create a ReadableFile instance with FileIO. FileIO returns a PCollection<ReadableFile> object. The ReadableFile class contains the filename.
  2. Call the readFullyAsUTF8String() method to read the file into memory and return the filename as a String object. If memory is limited, you can use utility classes like FileSystems to work directly with the file.

To read filenames in a pipeline job:

  1. Collect the list of file URIs. You can use the FileSystems module to get a list of files that match a glob pattern.
  2. Pass the file URIs to a PCollection.

p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
    // The withCompression method is optional. By default, the Beam SDK detects compression from
    // the filename.
    .apply(FileIO.readMatches().withCompression(Compression.GZIP))
    .apply(
        ParDo.of(
            new DoFn<FileIO.ReadableFile, String>() {
              @ProcessElement
              public void process(@Element FileIO.ReadableFile file) {
                // We can now access the file and its metadata.
                LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
              }
            }));
with beam.Pipeline() as pipeline:
  readable_files = (
      pipeline
      | fileio.MatchFiles('hdfs://path/to/*.txt')
      | fileio.ReadMatches()
      | beam.Reshuffle())
  files_and_contents = (
      readable_files
      | beam.Map(lambda x: (x.metadata.path, x.read_utf8())))