Managed I/O Connectors
Beam’s new Managed API streamlines how you use existing I/Os, offering both simplicity and powerful enhancements. I/Os are now configured through a lightweight, consistent interface: a simple configuration map with a unified API that spans multiple connectors.
With Managed I/O, runners gain deeper insight into each I/O’s structure and intent. This allows the runner to optimize performance, adjust behavior dynamically, or even replace the I/O with a more efficient or updated implementation behind the scenes.
For example, the DataflowRunner can seamlessly upgrade a Managed transform to its latest SDK version, automatically applying bug fixes and new features (no manual updates or user intervention required!)
Available Configurations
Note: required configuration fields are bolded.
Connector Name | Read Configuration | Write Configuration |
---|---|---|
ICEBERG_CDC | table (str )catalog_name ( str )catalog_properties ( map[str, str] )config_properties ( map[str, str] )from_snapshot ( int64 )from_timestamp ( int64 )poll_interval_seconds ( int32 )starting_strategy ( str )streaming ( boolean )to_snapshot ( int64 )to_timestamp ( int64 ) | Unavailable |
ICEBERG | table (str )catalog_name ( str )catalog_properties ( map[str, str] )config_properties ( map[str, str] ) | table (str )catalog_name ( str )catalog_properties ( map[str, str] )config_properties ( map[str, str] )drop ( list[str] )keep ( list[str] )only ( str )triggering_frequency_seconds ( int32 ) |
KAFKA | bootstrap_servers (str )topic ( str )confluent_schema_registry_subject ( str )confluent_schema_registry_url ( str )consumer_config_updates ( map[str, str] )file_descriptor_path ( str )format ( str )message_name ( str )schema ( str ) | bootstrap_servers (str )format ( str )topic ( str )file_descriptor_path ( str )message_name ( str )producer_config_updates ( map[str, str] )schema ( str ) |
BIGQUERY | kms_key (str )query ( str )row_restriction ( str )fields ( list[str] )table ( str ) | table (str )drop ( list[str] )keep ( list[str] )kms_key ( str )only ( str )triggering_frequency_seconds ( int64 ) |
Configuration Details
ICEBERG_CDC
Read
Configuration | Type | Description |
---|---|---|
table | str | Identifier of the Iceberg table. |
catalog_name | str | Name of the catalog containing the table. |
catalog_properties | map[str, str] | Properties used to set up the Iceberg catalog. |
config_properties | map[str, str] | Properties passed to the Hadoop Configuration. |
from_snapshot | int64 | Starts reading from this snapshot ID (inclusive). |
from_timestamp | int64 | Starts reading from the first snapshot (inclusive) that was created after this timestamp (in milliseconds). |
poll_interval_seconds | int32 | The interval at which to poll for new snapshots. Defaults to 60 seconds. |
starting_strategy | str | The source's starting strategy. Valid options are: "earliest" or "latest". Can be overriden by setting a starting snapshot or timestamp. Defaults to earliest for batch, and latest for streaming. |
streaming | boolean | Enables streaming reads, where source continuously polls for snapshots forever. |
to_snapshot | int64 | Reads up to this snapshot ID (inclusive). |
to_timestamp | int64 | Reads up to the latest snapshot (inclusive) created before this timestamp (in milliseconds). |
ICEBERG
Write
Configuration | Type | Description |
---|---|---|
table | str | Identifier of the Iceberg table. |
catalog_name | str | Name of the catalog containing the table. |
catalog_properties | map[str, str] | Properties used to set up the Iceberg catalog. |
config_properties | map[str, str] | Properties passed to the Hadoop Configuration. |
drop | list[str] | A list of field names to drop from the input record before writing. Is mutually exclusive with 'keep' and 'only'. |
keep | list[str] | A list of field names to keep in the input record. All other fields are dropped before writing. Is mutually exclusive with 'drop' and 'only'. |
only | str | The name of a single record field that should be written. Is mutually exclusive with 'keep' and 'drop'. |
triggering_frequency_seconds | int32 | For a streaming pipeline, sets the frequency at which snapshots are produced. |
ICEBERG
Read
Configuration | Type | Description |
---|---|---|
table | str | Identifier of the Iceberg table. |
catalog_name | str | Name of the catalog containing the table. |
catalog_properties | map[str, str] | Properties used to set up the Iceberg catalog. |
config_properties | map[str, str] | Properties passed to the Hadoop Configuration. |
KAFKA
Read
Configuration | Type | Description |
---|---|---|
bootstrap_servers | str | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form `host1:port1,host2:port2,...` |
topic | str | n/a |
confluent_schema_registry_subject | str | n/a |
confluent_schema_registry_url | str | n/a |
consumer_config_updates | map[str, str] | A list of key-value pairs that act as configuration parameters for Kafka consumers. Most of these configurations will not be needed, but if you need to customize your Kafka consumer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html |
file_descriptor_path | str | The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. |
format | str | The encoding format for the data stored in Kafka. Valid options are: RAW,STRING,AVRO,JSON,PROTO |
message_name | str | The name of the Protocol Buffer message to be used for schema extraction and data conversion. |
schema | str | The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/). If a URL to Confluent Schema Registry is provided, then this field is ignored, and the schema is fetched from Confluent Schema Registry. |
KAFKA
Write
Configuration | Type | Description |
---|---|---|
bootstrap_servers | str | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. | Format: host1:port1,host2:port2,... |
format | str | The encoding format for the data stored in Kafka. Valid options are: RAW,JSON,AVRO,PROTO |
topic | str | n/a |
file_descriptor_path | str | The path to the Protocol Buffer File Descriptor Set file. This file is used for schema definition and message serialization. |
message_name | str | The name of the Protocol Buffer message to be used for schema extraction and data conversion. |
producer_config_updates | map[str, str] | A list of key-value pairs that act as configuration parameters for Kafka producers. Most of these configurations will not be needed, but if you need to customize your Kafka producer, you may use this. See a detailed list: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html |
schema | str | n/a |
BIGQUERY
Write
Configuration | Type | Description |
---|---|---|
table | str | The bigquery table to write to. Format: [${PROJECT}:]${DATASET}.${TABLE} |
drop | list[str] | A list of field names to drop from the input record before writing. Is mutually exclusive with 'keep' and 'only'. |
keep | list[str] | A list of field names to keep in the input record. All other fields are dropped before writing. Is mutually exclusive with 'drop' and 'only'. |
kms_key | str | Use this Cloud KMS key to encrypt your data |
only | str | The name of a single record field that should be written. Is mutually exclusive with 'keep' and 'drop'. |
triggering_frequency_seconds | int64 | Determines how often to 'commit' progress into BigQuery. Default is every 5 seconds. |
BIGQUERY
Read
Configuration | Type | Description |
---|---|---|
kms_key | str | Use this Cloud KMS key to encrypt your data |
query | str | The SQL query to be executed to read from the BigQuery table. |
row_restriction | str | Read only rows that match this filter, which must be compatible with Google standard SQL. This is not supported when reading via query. |
fields | list[str] | Read only the specified fields (columns) from a BigQuery table. Fields may not be returned in the order specified. If no value is specified, then all fields are returned. Example: "col1, col2, col3" |
table | str | The fully-qualified name of the BigQuery table to read from. Format: [${PROJECT}:]${DATASET}.${TABLE} |
Last updated on 2025/04/29
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!