File processing patterns
This page describes common file processing tasks. For more information on file-based I/O, see Pipeline I/O and File-based input and output data.
- Java SDK
- Python SDK
Processing files as they arrive
This section shows you how to process files as they arrive in your file system or object store (like Google Cloud Storage). You can continuously read files or trigger stream and processing pipelines when a file arrives.
Continuous read mode
You can use FileIO
or TextIO
to continuously read the source for new files.
Use the FileIO
class to continuously watch a single file pattern. The following example matches a file pattern repeatedly every 30 seconds, continuously returns new matched files as an unbounded PCollection<Metadata>
, and stops if no new files appear for one hour:
The TextIO
class watchForNewFiles
property streams new file matches.
// This produces PCollection<String>
p.apply(
TextIO.read()
.from("<path-to-files>/*")
.watchForNewFiles(
// Check for new files every minute.
Duration.standardMinutes(1),
// Stop watching the file pattern if no new files appear for an hour.
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
Some runners may retain file lists during updates, but file lists don’t persist when you restart a pipeline. You can save file lists by:
- Storing processed filenames in an external file and deduplicating the lists at the next transform
- Adding timestamps to filenames, writing a glob pattern to pull in only new files, and matching the pattern when the pipeline restarts
The continuous-read option is not available for Python.
Stream processing triggered from external source
A streaming pipeline can process data from an unbounded source. For example, to trigger stream processing with Google Cloud Pub/Sub:
- Use an external process to detect when new files arrive.
- Send a Google Cloud Pub/Sub message with a URI to the file.
- Access the URI from a
DoFn
that follows the Google Cloud Pub/Sub source. - Process the file.
Batch processing triggered from external source
To start or schedule a batch pipeline job when a file arrives, write the triggering event in the source file itself. This has the most latency because the pipeline must initialize before processing. It’s best suited for low-frequency, large, file-size updates.
Accessing filenames
Use the FileIO
class to read filenames in a pipeline job. FileIO
returns a PCollection<ReadableFile>
object, and the ReadableFile
instance contains the filename.
To access filenames:
- Create a
ReadableFile
instance withFileIO
.FileIO
returns aPCollection<ReadableFile>
object. TheReadableFile
class contains the filename. - Call the
readFullyAsUTF8String()
method to read the file into memory and return the filename as aString
object. If memory is limited, you can use utility classes likeFileSystems
to work directly with the file.
To read filenames in a pipeline job:
- Collect the list of file URIs. You can use the
FileSystems
module to get a list of files that match a glob pattern. - Pass the file URIs to a
PCollection
.
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// The withCompression method is optional. By default, the Beam SDK detects compression from
// the filename.
.apply(FileIO.readMatches().withCompression(Compression.GZIP))
.apply(
ParDo.of(
new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(@Element FileIO.ReadableFile file) {
// We can now access the file and its metadata.
LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
}
}));
Last updated on 2024/11/14
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!