Developing I/O connectors for Java

IMPORTANT: Use Splittable DoFn to develop your new I/O. For more details, read the new I/O connector overview.

To connect to a data store that isn’t supported by Beam’s existing I/O connectors, you must create a custom I/O connector that usually consist of a source and a sink. All Beam sources and sinks are composite transforms; however, the implementation of your custom I/O depends on your use case. Before you start, read the new I/O connector overview for an overview of developing a new I/O connector, the available implementation options, and how to choose the right option for your use case.

This guide covers using the Source and FileBasedSink interfaces using Java. The Python SDK offers the same functionality, but uses a slightly different API. See Developing I/O connectors for Python for information specific to the Python SDK.

Basic code requirements

Beam runners use the classes you provide to read and/or write data using multiple worker instances in parallel. As such, the code you provide for Source and FileBasedSink subclasses must meet some basic requirements:

  1. Serializability: Your Source or FileBasedSink subclass, whether bounded or unbounded, must be Serializable. A runner might create multiple instances of your Source or FileBasedSink subclass to be sent to multiple remote workers to facilitate reading or writing in parallel.

  2. Immutability: Your Source or FileBasedSink subclass must be effectively immutable. All private fields must be declared final, and all private variables of collection type must be effectively immutable. If your class has setter methods, those methods must return an independent copy of the object with the relevant field modified.

    You should only use mutable state in your Source or FileBasedSink subclass if you are using lazy evaluation of expensive computations that you need to implement the source or sink; in that case, you must declare all mutable instance variables transient.

  3. Thread-Safety: Your code must be thread-safe. If you build your source to work with dynamic work rebalancing, it is critical that you make your code thread-safe. The Beam SDK provides a helper class to make this easier. See Using Your BoundedSource with dynamic work rebalancing for more details.

  4. Testability: It is critical to exhaustively unit test all of your Source and FileBasedSink subclasses, especially if you build your classes to work with advanced features such as dynamic work rebalancing. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard to detect.

    To assist in testing BoundedSource implementations, you can use the SourceTestUtils class. SourceTestUtils contains utilities for automatically verifying some of the properties of your BoundedSource implementation. You can use SourceTestUtils to increase your implementation’s test coverage using a wide range of inputs with relatively few lines of code. For examples that use SourceTestUtils, see the AvroSourceTest and TextIOReadTest source code.

In addition, see the PTransform style guide for Beam’s transform style guidance.

Implementing the Source interface

To create a data source for your pipeline, you must provide the format-specific logic that tells a runner how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel. If you’re creating a data source that reads unbounded data, you must provide additional logic for managing your source’s watermark and optional checkpointing.

Supply the logic for your source by creating the following classes:

Implementing the Source subclass

You must create a subclass of either BoundedSource or UnboundedSource, depending on whether your data is a finite batch or an infinite stream. In either case, your Source subclass must override the abstract methods in the superclass. A runner might call these methods when using your data source. For example, when reading from a bounded source, a runner uses these methods to estimate the size of your data set and to split it up for parallel reading.

Your Source subclass should also manage basic information about your data source, such as the location. For example, the example Source implementation in Beam’s DatastoreIO class takes host, datasetID, and query as arguments. The connector uses these values to obtain data from Cloud Datastore.

BoundedSource

BoundedSource represents a finite data set from which a Beam runner may read, possibly in parallel. BoundedSource contains a set of abstract methods that the runner uses to split the data set for reading by multiple workers.

To implement a BoundedSource, your subclass must override the following abstract methods:

You can see a model of how to implement BoundedSource and the required abstract methods in Beam’s implementations for Cloud BigTable (BigtableIO.java) and BigQuery (BigQuerySourceBase.java).

UnboundedSource

UnboundedSource represents an infinite data stream from which the runner may read, possibly in parallel. UnboundedSource contains a set of abstract methods that the runner uses to support streaming reads in parallel; these include checkpointing for failure recovery, record IDs to prevent data duplication, and watermarking for estimating data completeness in downstream parts of your pipeline.

To implement an UnboundedSource, your subclass must override the following abstract methods:

Implementing the Reader subclass

You must create a subclass of either BoundedReader or UnboundedReader to be returned by your source subclass’s createReader method. The runner uses the methods in your Reader (whether bounded or unbounded) to do the actual reading of your dataset.

BoundedReader and UnboundedReader have similar basic interfaces, which you’ll need to define. In addition, there are some additional methods unique to UnboundedReader that you’ll need to implement for working with unbounded data, and an optional method you can implement if you want your BoundedReader to take advantage of dynamic work rebalancing. There are also minor differences in the semantics for the start() and advance() methods when using UnboundedReader.

Reader methods common to both BoundedReader and UnboundedReader

A runner uses the following methods to read data using BoundedReader or UnboundedReader:

Reader methods unique to UnboundedReader

In addition to the basic Reader interface, UnboundedReader has some additional methods for managing reads from an unbounded data source:

You can read a bounded PCollection from an UnboundedSource by specifying either .withMaxNumRecords or .withMaxReadTime when you read from your source. .withMaxNumRecords reads a fixed maximum number of records from your unbounded source, while .withMaxReadTime reads from your unbounded source for a fixed maximum time duration.

Using your BoundedSource with dynamic work rebalancing

If your source provides bounded data, you can have your BoundedReader work with dynamic work rebalancing by implementing the method splitAtFraction. The runner may call splitAtFraction concurrently with start or advance on a given reader so that the remaining data in your Source can be split and redistributed to other workers.

When you implement splitAtFraction, your code must produce a mutually-exclusive set of splits where the union of those splits matches the total data set.

If you implement splitAtFraction, you must implement both splitAtFraction and getFractionConsumed in a thread-safe manner, or data loss is possible. You should also unit-test your implementation exhaustively to avoid data duplication or data loss.

To ensure that your code is thread-safe, use the RangeTracker thread-safe helper object to manage positions in your data source when implementing splitAtFraction and getFractionConsumed.

We highly recommended that you unit test your implementations of splitAtFraction using the SourceTestUtils class. SourceTestUtils contains a number of methods for testing your implementation of splitAtFraction, including exhaustive automatic testing.

Convenience Source and Reader base classes

The Beam SDK contains some convenient abstract base classes to help you create Source and Reader classes that work with common data storage formats, like files.

FileBasedSource

If your data source uses files, you can derive your Source and Reader classes from the FileBasedSource and FileBasedReader abstract base classes. FileBasedSource is a bounded source subclass that implements code common to Beam sources that interact with files, including:

Using the FileBasedSink abstraction

If your data source uses files, you can implement the FileBasedSink abstraction to create a file-based sink. For other sinks, use ParDo, GroupByKey, and other transforms offered by the Beam SDK for Java. See the developing I/O connectors overview for more details.

When using the FileBasedSink interface, you must provide the format-specific logic that tells the runner how to write bounded data from your pipeline’s PCollections to an output sink. The runner writes bundles of data in parallel using multiple workers.

Supply the logic for your file-based sink by implementing the following classes:

The FileBasedSink abstract base class implements code that is common to Beam sinks that interact with files, including:

FileBasedSink and its subclasses support writing files to any Beam-supported FileSystem implementations. See the following Beam-provided FileBasedSink implementations for examples:

PTransform wrappers

When you create a source or sink that end-users will use, avoid exposing your source or sink code. To avoid exposing your sources and sinks to end-users, your new classes should be protected or private. Then, implement a user-facing wrapper PTransform. By exposing your source or sink as a transform, your implementation is hidden and can be arbitrarily complex or simple. The greatest benefit of not exposing implementation details is that later on, you can add additional functionality without breaking the existing implementation for users.

For example, if your users’ pipelines read from your source using read and you want to insert a reshard into the pipeline, all users would need to add the reshard themselves (using the GroupByKey transform). To solve this, we recommended that you expose the source as a composite PTransform that performs both the read operation and the reshard.

See Beam’s PTransform style guide for additional information about wrapping with a PTransform.