Built-in I/O Transforms

Google BigQuery I/O connector

The Beam SDKs include built-in transforms that can read data from and write data to Google BigQuery tables.

Before you start

To use BigQueryIO, add the Maven artifact dependency to your pom.xml file.

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.60.0</version>
</dependency>

Additional resources:

To use BigQueryIO, you must install the Google Cloud Platform dependencies by running pip install apache-beam[gcp].

Additional resources:

BigQuery basics

Table names

To read or write from a BigQuery table, you must provide a fully-qualified BigQuery table name (for example, bigquery-public-data:github_repos.sample_contents). A fully-qualified BigQuery table name consists of three parts:

A table name can also include a table decorator if you are using time-partitioned tables.

To specify a BigQuery table, you can use either the table’s fully-qualified name as a string, or use a TableReference TableReference object.

Using a string

To specify a table with a string, use the format [project_id]:[dataset_id].[table_id] to specify the fully-qualified BigQuery table name.

String tableSpec = "apache-beam-testing.samples.weather_stations";
# project-id:dataset_id.table_id
table_spec = 'apache-beam-testing.samples.weather_stations'

You can also omit project_id and use the [dataset_id].[table_id] format. If you omit the project ID, Beam uses the default project ID from your pipeline options. pipeline options.

String tableSpec = "samples.weather_stations";
# dataset_id.table_id
table_spec = 'samples.weather_stations'

Using a TableReference

To specify a table with a TableReference, create a new TableReference using the three parts of the BigQuery table name.

TableReference tableSpec =
    new TableReference()
        .setProjectId("clouddataflow-readonly")
        .setDatasetId("samples")
        .setTableId("weather_stations");
from apache_beam.io.gcp.internal.clients import bigquery

table_spec = bigquery.TableReference(
    projectId='clouddataflow-readonly',
    datasetId='samples',
    tableId='weather_stations')

The Beam SDK for Java also provides the parseTableSpec helper method, which constructs a TableReference object from a String that contains the fully-qualified BigQuery table name. However, the static factory methods for BigQueryIO transforms accept the table name as a String and construct a TableReference object for you.

Table rows

BigQueryIO read and write transforms produce and consume data as a PCollection of dictionaries, where each element in the PCollection represents a single row in the table.

Schemas

When writing to BigQuery, you must supply a table schema for the destination table that you want to write to, unless you specify a create disposition of CREATE_NEVER. Creating a table schema covers schemas in more detail.

Data types

BigQuery supports the following data types: STRING, BYTES, INTEGER, FLOAT, NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME and GEOGRAPHY. For an overview of Google Standard SQL data types, see Data types. BigQueryIO allows you to use all of these data types. The following example shows the correct format for data types used when reading from and writing to BigQuery:

import com.google.api.services.bigquery.model.TableRow;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Base64;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class BigQueryTableRowCreate {
  public static TableRow createTableRow() {
    TableRow row =
        new TableRow()
            // To learn more about BigQuery data types:
            // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
            .set("string_field", "UTF-8 strings are supported! 🌱🌳🌍")
            .set("int64_field", 432)
            .set("float64_field", 3.141592653589793)
            .set("numeric_field", new BigDecimal("1234.56").toString())
            .set("bool_field", true)
            .set(
                "bytes_field",
                Base64.getEncoder()
                    .encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))

            // To learn more about date formatting:
            // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
            .set("date_field", LocalDate.parse("2020-03-19").toString()) // ISO_LOCAL_DATE
            .set(
                "datetime_field",
                LocalDateTime.parse("2020-03-19T20:41:25.123").toString()) // ISO_LOCAL_DATE_TIME
            .set("time_field", LocalTime.parse("20:41:25.123").toString()) // ISO_LOCAL_TIME
            .set(
                "timestamp_field",
                Instant.parse("2020-03-20T03:41:42.123Z").toString()) // ISO_INSTANT

            // To learn more about the geography Well-Known Text (WKT) format:
            // https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
            .set("geography_field", "POINT(30 10)")

            // An array has its mode set to REPEATED.
            .set("array_field", Arrays.asList(1, 2, 3, 4))

            // Any class can be written as a STRUCT as long as all the fields in the
            // schema are present and they are encoded correctly as BigQuery types.
            .set(
                "struct_field",
                Stream.of(
                        new SimpleEntry<>("string_value", "Text 🌱🌳🌍"),
                        new SimpleEntry<>("int64_value", "42"))
                    .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)));
    return row;
  }
}
bigquery_data = [{
    'string': 'abc',
    'bytes': base64.b64encode(b'\xab\xac'),
    'integer': 5,
    'float': 0.5,
    'numeric': Decimal('5'),
    'boolean': True,
    'timestamp': '2018-12-31 12:44:31.744957 UTC',
    'date': '2018-12-31',
    'time': '12:44:31',
    'datetime': '2018-12-31T12:44:31',
    'geography': 'POINT(30 10)'
}]

As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded strings.

As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (See https://en.wikipedia.org/wiki/Well-known_text format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery. When bytes are read from BigQuery they are returned as base64-encoded bytes.

Reading from BigQuery

BigQueryIO allows you to read from a BigQuery table, or to execute a SQL query and read the results. By default, Beam invokes a BigQuery export request when you apply a BigQueryIO read transform. However, the Beam SDK for Java also supports using the BigQuery Storage Read API to read directly from BigQuery storage. See Using the Storage Read API for more information.

Beam’s use of BigQuery APIs is subject to BigQuery’s Quota and Pricing policies.

The Beam SDK for Java has two BigQueryIO read methods. Both of these methods allow you to read from a table, or read fields using a query string.

  1. read(SerializableFunction) reads Avro-formatted records and uses a specified parsing function to parse them into a PCollection of custom typed objects. Each element in the PCollection represents a single row in the table. The example code for reading with a query string shows how to use read(SerializableFunction).

  2. readTableRows returns a PCollection of BigQuery TableRow objects. Each element in the PCollection represents a single row in the table. Integer values in the TableRow objects are encoded as strings to match BigQuery’s exported JSON format. This method is convenient, but can be 2-3 times slower in performance compared to read(SerializableFunction). The example code for reading from a table shows how to use readTableRows.

Note: BigQueryIO.read() is deprecated as of Beam SDK 2.2.0. Instead, use read(SerializableFunction<SchemaAndRecord, T>) to parse BigQuery rows from Avro GenericRecord into your custom type, or use readTableRows() to parse them into JSON TableRow objects.

To read from a BigQuery table using the Beam SDK for Python, apply a ReadFromBigQuery transform. ReadFromBigQuery returns a PCollection of dictionaries, where each element in the PCollection represents a single row in the table. Integer values in the TableRow objects are encoded as strings to match BigQuery’s exported JSON format.

Note: BigQuerySource() is deprecated as of Beam SDK 2.25.0. Before 2.25.0, to read from a BigQuery table using the Beam SDK, apply a Read transform on a BigQuerySource. For example, beam.io.Read(beam.io.BigQuerySource(table_spec)).

Reading from a table

To read an entire BigQuery table, use the from method with a BigQuery table name. This example uses readTableRows.

To read an entire BigQuery table, use the table parameter with the BigQuery table name.

The following code reads an entire table that contains weather station data and then extracts the max_temperature column.

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromTable {
  public static PCollection<MyData> readFromTable(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table)))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec)
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

Reading with a query string

If you don’t want to read an entire table, you can supply a query string with the fromQuery method.

If you don’t want to read an entire table, you can supply a query string to ReadFromBigQuery by specifying the query parameter.

The following code uses a SQL query to only read the max_temperature column.

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromQuery {
  public static PCollection<MyData> readFromQuery(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows()
                    .fromQuery(String.format("SELECT * FROM `%s.%s.%s`", project, dataset, table))
                    .usingStandardSql())
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'QueryTable' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature FROM '\
              '[apache-beam-testing.samples.weather_stations]')
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

You can also use BigQuery’s standard SQL dialect with a query string, as shown in the following example:

PCollection<Double> maxTemperatures =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
            .fromQuery(
                "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
            .usingStandardSql()
            .withCoder(DoubleCoder.of()));
max_temperatures = (
    pipeline
    | 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature FROM '\
              '`clouddataflow-readonly.samples.weather_stations`',
        use_standard_sql=True)
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

Query execution project

By default the pipeline executes the query in the Google Cloud project associated with the pipeline (in case of the Dataflow runner it’s the project where the pipeline runs). There are cases where the query execution project should be different from the pipeline project. If you use Java SDK, you can define the query execution project by setting the pipeline option “bigQueryProject” to the desired Google Cloud project id.

Using the Storage Read API

The BigQuery Storage API allows you to directly access tables in BigQuery storage, and supports features such as column selection and predicate filter push-down which can allow more efficient pipeline execution.

The Beam SDK for Java supports using the BigQuery Storage API when reading from BigQuery. SDK versions before 2.25.0 support the BigQuery Storage API as an experimental feature and use the pre-GA BigQuery Storage API surface. Callers should migrate pipelines which use the BigQuery Storage API to use SDK version 2.25.0 or later.

The Beam SDK for Python supports the BigQuery Storage API. Enable it by passing method=DIRECT_READ as a parameter to ReadFromBigQuery.

Updating your code

Use the following methods when you read from a table:

The following code snippet reads from a table. This example is from the BigQueryTornadoes example. When the example’s read method option is set to DIRECT_READ, the pipeline uses the BigQuery Storage API and column projection to read public samples of weather data from a BigQuery table. You can view the full source code on GitHub.

import java.util.Arrays;
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromTableWithBigQueryStorageAPI {
  public static PCollection<MyData> readFromTableWithBigQueryStorageAPI(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery table",
                BigQueryIO.readTableRows()
                    .from(String.format("%s:%s.%s", project, dataset, table))
                    .withMethod(Method.DIRECT_READ)
                    .withSelectedFields(
                        Arrays.asList(
                            "string_field",
                            "int64_field",
                            "float64_field",
                            "numeric_field",
                            "bool_field",
                            "bytes_field",
                            "date_field",
                            "datetime_field",
                            "time_field",
                            "timestamp_field",
                            "geography_field",
                            "array_field",
                            "struct_field")))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'ReadTableWithStorageAPI' >> beam.io.ReadFromBigQuery(
        table=table_spec, method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
    | beam.Map(lambda elem: elem['max_temperature']))

The following code snippet reads with a query string.

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromQueryWithBigQueryStorageAPI {
  public static PCollection<MyData> readFromQueryWithBigQueryStorageAPI(
      String project, String dataset, String table, String query, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    /*
    String query = String.format("SELECT\n" +
        "  string_field,\n" +
        "  int64_field,\n" +
        "  float64_field,\n" +
        "  numeric_field,\n" +
        "  bool_field,\n" +
        "  bytes_field,\n" +
        "  date_field,\n" +
        "  datetime_field,\n" +
        "  time_field,\n" +
        "  timestamp_field,\n" +
        "  geography_field,\n" +
        "  array_field,\n" +
        "  struct_field\n" +
        "FROM\n" +
        "  `%s:%s.%s`", project, dataset, table)
    */

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery table",
                BigQueryIO.readTableRows()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withMethod(Method.DIRECT_READ))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
# The SDK for Python does not support the BigQuery Storage API.

Writing to BigQuery

BigQueryIO lets you write to BigQuery tables. If you are using the Beam SDK for Java, you can write different rows to different tables. The Beam SDK for Java also supports using the BigQuery Storage Write API to write directly to BigQuery storage. For more information, see Using the Storage Write API.

BigQueryIO write transforms use APIs that are subject to BigQuery’s Quota and Pricing policies.

When you apply a write transform, you must provide the following information for the destination table(s):

In addition, if your write operation creates a new BigQuery table, you must also supply a table schema for the destination table.

Create disposition

The create disposition controls whether or not your BigQuery write operation should create a table if the destination table does not exist.

Use .withCreateDisposition to specify the create disposition. Valid enum values are:

  • Write.CreateDisposition.CREATE_IF_NEEDED: Specifies that the write operation should create a new table if one does not exist. If you use this value, you must provide a table schema with the withSchema method. CREATE_IF_NEEDED is the default behavior.

  • Write.CreateDisposition.CREATE_NEVER: Specifies that a table should never be created. If the destination table does not exist, the write operation fails.

Use the create_disposition parameter to specify the create disposition. Valid enum values are:

  • BigQueryDisposition.CREATE_IF_NEEDED: Specifies that the write operation should create a new table if one does not exist. If you use this value, you must provide a table schema. CREATE_IF_NEEDED is the default behavior.

  • BigQueryDisposition.CREATE_NEVER: Specifies that a table should never be created. If the destination table does not exist, the write operation fails.

If you specify CREATE_IF_NEEDED as the create disposition and you don’t supply a table schema, the transform might fail at runtime if the destination table does not exist.

Write disposition

The write disposition controls how your BigQuery write operation applies to an existing table.

Use .withWriteDisposition to specify the write disposition. Valid enum values are:

  • Write.WriteDisposition.WRITE_EMPTY: Specifies that the write operation should fail at runtime if the destination table is not empty. WRITE_EMPTY is the default behavior.

  • Write.WriteDisposition.WRITE_TRUNCATE: Specifies that the write operation should replace an existing table. Any existing rows in the destination table are removed, and the new rows are added to the table.

  • Write.WriteDisposition.WRITE_APPEND: Specifies that the write operation should append the rows to the end of the existing table.

Use the write_disposition parameter to specify the write disposition. Valid enum values are:

  • BigQueryDisposition.WRITE_EMPTY: Specifies that the write operation should fail at runtime if the destination table is not empty. WRITE_EMPTY is the default behavior.

  • BigQueryDisposition.WRITE_TRUNCATE: Specifies that the write operation should replace an existing table. Any existing rows in the destination table are removed, and the new rows are added to the table.

  • BigQueryDisposition.WRITE_APPEND: Specifies that the write operation should append the rows to the end of the existing table.

When you use WRITE_EMPTY, the check for whether or not the destination table is empty can occur before the actual write operation. This check doesn’t guarantee that your pipeline will have exclusive access to the table. Two concurrent pipelines that write to the same output table with a write disposition of WRITE_EMPTY might start successfully, but both pipelines can fail later when the write attempts happen.

Creating a table schema

If your BigQuery write operation creates a new table, you must provide schema information. The schema contains information about each field in the table. When updating a pipeline with a new schema, the existing schema fields must stay in the same order, or the pipeline will break, failing to write to BigQuery.

To create a table schema in Java, you can either use a TableSchema object, or use a string that contains a JSON-serialized TableSchema object.

To create a table schema in Python, you can either use a TableSchema object, or use a string that defines a list of fields. Single string based schemas do not support nested fields, repeated fields, or specifying a BigQuery mode for fields (the mode is always set to NULLABLE).

Using a TableSchema

To create and use a table schema as a TableSchema object, follow these steps.

  1. Create a list of TableFieldSchema objects. Each TableFieldSchema object represents a field in the table.

  2. Create a TableSchema object and use the setFields method to specify your list of fields.

  3. Use the withSchema method to provide your table schema when you apply a write transform.

  1. Create a TableSchema object.

  2. Create and append a TableFieldSchema object for each field in your table.

  3. Use the schema parameter to provide your table schema when you apply a write transform. Set the parameter’s value to the TableSchema object.

The following example code shows how to create a TableSchema for a table with two fields (source and quote) of type string.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;

class BigQuerySchemaCreate {
  public static TableSchema createSchema() {
    // To learn more about BigQuery schemas:
    // https://cloud.google.com/bigquery/docs/schemas
    TableSchema schema =
        new TableSchema()
            .setFields(
                Arrays.asList(
                    new TableFieldSchema()
                        .setName("string_field")
                        .setType("STRING")
                        .setMode("REQUIRED"),
                    new TableFieldSchema()
                        .setName("int64_field")
                        .setType("INT64")
                        .setMode("NULLABLE"),
                    new TableFieldSchema()
                        .setName("float64_field")
                        .setType("FLOAT64"), // default mode is "NULLABLE"
                    new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
                    new TableFieldSchema().setName("bool_field").setType("BOOL"),
                    new TableFieldSchema().setName("bytes_field").setType("BYTES"),
                    new TableFieldSchema().setName("date_field").setType("DATE"),
                    new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
                    new TableFieldSchema().setName("time_field").setType("TIME"),
                    new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
                    new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
                    new TableFieldSchema()
                        .setName("array_field")
                        .setType("INT64")
                        .setMode("REPEATED")
                        .setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
                    new TableFieldSchema()
                        .setName("struct_field")
                        .setType("STRUCT")
                        .setDescription(
                            "A STRUCT accepts a custom data class, the fields must match the custom class fields.")
                        .setFields(
                            Arrays.asList(
                                new TableFieldSchema().setName("string_value").setType("STRING"),
                                new TableFieldSchema().setName("int64_value").setType("INT64")))));
    return schema;
  }
}
table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

Using a string

To create and use a table schema as a string that contains JSON-serialized TableSchema object, follow these steps.

  1. Create a string that contains a JSON-serialized TableSchema object.

  2. Use the withJsonSchema method to provide your table schema when you apply a write transform.

To create and use a table schema as a string, follow these steps.

  1. Create a single comma separated string of the form “field1:type1,field2:type2,field3:type3” that defines a list of fields. The type should specify the field’s BigQuery type.

  2. Use the schema parameter to provide your table schema when you apply a write transform. Set the parameter’s value to the string.

The following example shows how to use a string to specify the same table schema as the previous example.

String tableSchemaJson =
    ""
        + "{"
        + "  \"fields\": ["
        + "    {"
        + "      \"name\": \"source\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"NULLABLE\""
        + "    },"
        + "    {"
        + "      \"name\": \"quote\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"REQUIRED\""
        + "    }"
        + "  ]"
        + "}";
# column_name:BIGQUERY_TYPE, ...
table_schema = 'source:STRING, quote:STRING'

Setting the insertion method

BigQueryIO supports two methods of inserting data into BigQuery: load jobs and streaming inserts. Each insertion method provides different tradeoffs of cost, quota, and data consistency. See the BigQuery documentation for different data ingestion options (specifically, load jobs and streaming inserts) for more information about these tradeoffs.

BigQueryIO chooses a default insertion method based on the input PCollection. You can use withMethod to specify the desired insertion method. See Write.Method for the list of the available methods and their restrictions.

BigQueryIO chooses a default insertion method based on the input PCollection. You can use method to specify the desired insertion method. See WriteToBigQuery for the list of the available methods and their restrictions.

BigQueryIO uses load jobs in the following situations:

  • When you apply a BigQueryIO write transform to a bounded PCollection.
  • When you specify load jobs as the insertion method using BigQueryIO.write().withMethod(FILE_LOADS).
  • When you apply a BigQueryIO write transform to a bounded PCollection.
  • When you specify load jobs as the insertion method using WriteToBigQuery(method='FILE_LOADS').

Note: If you use batch loads in a streaming pipeline:

You must use withTriggeringFrequency to specify a triggering frequency for initiating load jobs. Be careful about setting the frequency such that your pipeline doesn’t exceed the BigQuery load job quota limit.

You can either use withNumFileShards to explicitly set the number of file shards written, or use withAutoSharding to enable dynamic sharding (starting 2.29.0 release) and the number of shards may be determined and changed at runtime. The sharding behavior depends on the runners.

You must use triggering_frequency to specify a triggering frequency for initiating load jobs. Be careful about setting the frequency such that your pipeline doesn’t exceed the BigQuery load job quota limit.

You can set with_auto_sharding=True to enable dynamic sharding (starting 2.29.0 release). The number of shards may be determined and changed at runtime. The sharding behavior depends on the runners.

BigQueryIO uses streaming inserts in the following situations:

  • When you apply a BigQueryIO write transform to an unbounded PCollection.
  • When you specify streaming inserts as the insertion method using BigQueryIO.write().withMethod(STREAMING_INSERTS).
  • When you apply a BigQueryIO write transform to an unbounded PCollection.
  • When you specify streaming inserts as the insertion method using WriteToBigQuery(method='STREAMING_INSERTS').

Note: Streaming inserts by default enables BigQuery best-effort deduplication mechanism. You can disable that by setting ignoreInsertIds. The quota limitations are different when deduplication is enabled vs. disabled.

Streaming inserts applies a default sharding for each table destination. You can use withAutoSharding (starting 2.28.0 release) to enable dynamic sharding and the number of shards may be determined and changed at runtime. The sharding behavior depends on the runners.

Note: Streaming inserts by default enables BigQuery best-effort deduplication mechanism. You can disable that by setting ignore_insert_ids=True. The quota limitations are different when deduplication is enabled vs. disabled.

Streaming inserts applies a default sharding for each table destination. You can set with_auto_sharding=True (starting 2.29.0 release) to enable dynamic sharding. The number of shards may be determined and changed at runtime. The sharding behavior depends on the runners.

Writing to a table

To write to a BigQuery table, apply either a writeTableRows or write transform.

To write to a BigQuery table, apply the WriteToBigQuery transform. WriteToBigQuery supports both batch mode and streaming mode. You must apply the transform to a PCollection of dictionaries. In general, you’ll need to use another transform, such as ParDo, to format your output data into a collection.

The following examples use this PCollection that contains quotes.

The writeTableRows method writes a PCollection of BigQuery TableRow objects to a BigQuery table. Each element in the PCollection represents a single row in the table. This example uses writeTableRows to write elements to a PCollection<TableRow>. The write operation creates a table if needed. If the table already exists, it is replaced.

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.values.PCollection;

class BigQueryWriteToTable {
  public static void writeToTable(
      String project,
      String dataset,
      String table,
      TableSchema schema,
      PCollection<TableRow> rows) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // TableSchema schema = new TableSchema().setFields(Arrays.asList(...));

    // Pipeline pipeline = Pipeline.create();
    // PCollection<TableRow> rows = ...

    rows.apply(
        "Write to BigQuery",
        BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s", project, dataset, table))
            .withSchema(schema)
            // For CreateDisposition:
            // - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
            // required
            // - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            // For WriteDisposition:
            // - WRITE_EMPTY (default): raises an error if the table is not empty
            // - WRITE_APPEND: appends new rows to existing rows
            // - WRITE_TRUNCATE: deletes the existing rows before writing
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    // pipeline.run().waitUntilFinish();
  }
}
quotes = pipeline | beam.Create([
    {
        'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'
    },
    {
        'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."
    },
])

The following example code shows how to apply a WriteToBigQuery transform to write a PCollection of dictionaries to a BigQuery table. The write operation creates a table if needed. If the table already exists, it is replaced.

quotes | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

The write transform writes a PCollection of custom typed objects to a BigQuery table. Use .withFormatFunction(SerializableFunction) to provide a formatting function that converts each input element in the PCollection into a TableRow. This example uses write to write a PCollection<String>. The write operation creates a table if needed. If the table already exists, it is replaced.

quotes.apply(
    BigQueryIO.<Quote>write()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withFormatFunction(
            (Quote elem) ->
                new TableRow().set("source", elem.source).set("quote", elem.quote))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

When you use streaming inserts, you can decide what to do with failed records. You can either keep retrying, or return the failed records in a separate PCollection using the WriteResult.getFailedInserts() method.

Using the Storage Write API

Starting with version 2.36.0 of the Beam SDK for Java, you can use the BigQuery Storage Write API from the BigQueryIO connector.

Also after version 2.47.0 of Beam SDK for Python, SDK supports BigQuery Storage Write API.

BigQuery Storage Write API for Python SDK currently has some limitations on supported data types. As this method makes use of cross-language transforms, we are limited to the types supported at the cross-language boundary. For example, apache_beam.utils.timestamp.Timestamp is needed to write a TIMESTAMP BigQuery type. Also, some types (e.g. DATETIME) are not supported yet. For more details, please refer to the full type mapping.

Note: If you want to run WriteToBigQuery with Storage Write API from the source code, you need to run ./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build to build the expansion-service jar. If you are running from a released Beam SDK, the jar is already included.

Exactly-once semantics

To write to BigQuery using the Storage Write API, set withMethod to Method.STORAGE_WRITE_API. Here’s an example transform that writes to BigQuery using the Storage Write API and exactly-once semantics:

WriteResult writeResult = rows.apply("Save Rows to BigQuery",
BigQueryIO.writeTableRows()
        .to(options.getFullyQualifiedTableName())
        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withMethod(Method.STORAGE_WRITE_API)
);
quotes | "WriteTableWithStorageAPI" >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)

If you want to change the behavior of BigQueryIO so that all the BigQuery sinks for your pipeline use the Storage Write API by default, set the UseStorageWriteApi option.

If your pipeline needs to create the table (in case it doesn’t exist and you specified the create disposition as CREATE_IF_NEEDED), you must provide a table schema. The API uses the schema to validate data and convert it to a binary protocol.

TableSchema schema = new TableSchema().setFields(
        List.of(
            new TableFieldSchema()
                .setName("request_ts")
                .setType("TIMESTAMP")
                .setMode("REQUIRED"),
            new TableFieldSchema()
                .setName("user_name")
                .setType("STRING")
                .setMode("REQUIRED")));
table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

For streaming pipelines, you need to set two additional parameters: the number of streams and the triggering frequency.

BigQueryIO.writeTableRows()
        // ...
        .withTriggeringFrequency(Duration.standardSeconds(5))
        .withNumStorageWriteApiStreams(3)
);
# The Python SDK doesn't currently support setting the number of write streams
quotes | "StorageWriteAPIWithFrequency" >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
    triggering_frequency=5)

The number of streams defines the parallelism of the BigQueryIO Write transform and roughly corresponds to the number of Storage Write API streams that the pipeline uses. You can set it explicitly on the transform via withNumStorageWriteApiStreams or provide the numStorageWriteApiStreams option to the pipeline as defined in BigQueryOptions. Please note this is only supported for streaming pipelines.

Triggering frequency determines how soon the data is visible for querying in BigQuery. You can explicitly set it via withTriggeringFrequency or specify the number of seconds by setting the storageWriteApiTriggeringFrequencySec option.

The combination of these two parameters affects the size of the batches of rows that BigQueryIO creates before calling the Storage Write API. Setting the frequency too high can result in smaller batches, which can affect performance. As a general rule, a single stream should be able to handle throughput of at least 1Mb per second. Creating exclusive streams is an expensive operation for the BigQuery service, so you should use only as many streams as needed for your use case. Triggering frequency in single-digit seconds is a good choice for most pipelines.

Similar to streaming inserts, STORAGE_WRITE_API supports dynamically determining the number of parallel streams to write to BigQuery (starting 2.42.0). You can explicitly enable this using withAutoSharding.

STORAGE_WRITE_API defaults to dynamic sharding when numStorageWriteApiStreams is set to 0 or is unspecified.

When using STORAGE_WRITE_API, the PCollection returned by WriteResult.getFailedStorageApiInserts contains the rows that failed to be written to the Storage Write API sink.

At-least-once semantics

If your use case allows for potential duplicate records in the target table, you can use the STORAGE_API_AT_LEAST_ONCE method. This method doesn’t persist the records to be written to BigQuery into its shuffle storage, which is needed to provide the exactly-once semantics of the STORAGE_WRITE_API method. Therefore, for most pipelines, using this method is often less expensive and results in lower latency. If you use STORAGE_API_AT_LEAST_ONCE, you don’t need to specify the number of streams, and you can’t specify the triggering frequency.

Auto sharding is not applicable for STORAGE_API_AT_LEAST_ONCE.

When using STORAGE_API_AT_LEAST_ONCE, the PCollection returned by WriteResult.getFailedStorageApiInserts contains the rows that failed to be written to the Storage Write API sink.

Quotas

Before using the Storage Write API, be aware of the BigQuery Storage Write API quotas.

Using dynamic destinations

You can use the dynamic destinations feature to write elements in a PCollection to different BigQuery tables, possibly with different schemas.

The dynamic destinations feature groups your user type by a user-defined destination key, uses the key to compute a destination table and/or schema, and writes each group’s elements to the computed destination.

In addition, you can also write your own types that have a mapping function to TableRow, and you can use side inputs in all DynamicDestinations methods.

To use dynamic destinations, you must create a DynamicDestinations object and implement the following methods:

  • getDestination: Returns an object that getTable and getSchema can use as the destination key to compute the destination table and/or schema.

  • getTable: Returns the table (as a TableDestination object) for the destination key. This method must return a unique table for each unique destination.

  • getSchema: Returns the table schema (as a TableSchema object) for the destination key.

Then, use write().to with your DynamicDestinations object. This example uses a PCollection that contains weather data and writes the data into a different table for each year.

/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
  final long year;
  final long month;
  final long day;
  final double maxTemp;

  public WeatherData() {
    this.year = 0;
    this.month = 0;
    this.day = 0;
    this.maxTemp = 0.0f;
  }
  public WeatherData(long year, long month, long day, double maxTemp) {
    this.year = year;
    this.month = month;
    this.day = day;
    this.maxTemp = maxTemp;
  }
}
*/

PCollection<WeatherData> weatherData =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> {
                  GenericRecord record = elem.getRecord();
                  return new WeatherData(
                      (Long) record.get("year"),
                      (Long) record.get("month"),
                      (Long) record.get("day"),
                      (Double) record.get("max_temperature"));
                })
            .fromQuery(
                "SELECT year, month, day, max_temperature "
                    + "FROM [apache-beam-testing.samples.weather_stations] "
                    + "WHERE year BETWEEN 2007 AND 2009")
            .withCoder(AvroCoder.of(WeatherData.class)));

// We will send the weather data into different tables for every year.
weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(
            new DynamicDestinations<WeatherData, Long>() {
              @Override
              public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
                return elem.getValue().year;
              }

              @Override
              public TableDestination getTable(Long destination) {
                return new TableDestination(
                    new TableReference()
                        .setProjectId(writeProject)
                        .setDatasetId(writeDataset)
                        .setTableId(writeTable + "_" + destination),
                    "Table for year " + destination);
              }

              @Override
              public TableSchema getSchema(Long destination) {
                return new TableSchema()
                    .setFields(
                        ImmutableList.of(
                            new TableFieldSchema()
                                .setName("year")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("month")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("day")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("maxTemp")
                                .setType("FLOAT")
                                .setMode("NULLABLE")));
              }
            })
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
fictional_characters_view = beam.pvalue.AsDict(
    pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
                                                  ('Obi Wan Kenobi', True)]))

def table_fn(element, fictional_characters):
  if element in fictional_characters:
    return 'my_dataset.fictional_quotes'
  else:
    return 'my_dataset.real_quotes'

quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
    table_fn,
    schema=table_schema,
    table_side_inputs=(fictional_characters_view, ),
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

Using time partitioning

BigQuery time partitioning divides your table into smaller partitions, which is called a partitioned table. Partitioned tables make it easier for you to manage and query your data.

To use BigQuery time partitioning, use one of these two methods:

  • withTimePartitioning: This method takes a TimePartitioning class, and is only usable if you are writing to a single table.

  • withJsonTimePartitioning: This method is the same as withTimePartitioning, but takes a JSON-serialized String object.

This example generates one partition per day.

weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(tableSpec + "_partitioning")
        .withSchema(tableSchema)
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        // NOTE: an existing table without time partitioning set up will not work
        .withTimePartitioning(new TimePartitioning().setType("DAY"))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
quotes | 'WriteWithTimePartitioning' >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    additional_bq_parameters={'timePartitioning': {
        'type': 'HOUR'
    }})

Limitations

BigQueryIO currently has the following limitations.

  1. You can’t sequence the completion of a BigQuery write with other steps of your pipeline.

  2. If you are using the Beam SDK for Python, you might have import size quota issues if you write a very large dataset. As a workaround, you can partition the dataset (for example, using Beam’s Partition transform) and write to multiple BigQuery tables. The Beam SDK for Java does not have this limitation as it partitions your dataset for you.

  3. When you load data into BigQuery, these limits are applied. By default, BigQuery uses a shared pool of slots to load data. This means that the available capacity is not guaranteed, and your load may be queued until a slot becomes available. If a slot does not become available within 6 hours, the load will fail due to the limits set by BigQuery. To avoid this situation, it is highly recommended that you use BigQuery reservations, which ensure that your load does not get queued and fail due to capacity issues.

Additional examples

You can find additional examples that use BigQuery in Beam’s examples directories.

Java cookbook examples

These examples are from the Java cookbook examples directory.

Java complete examples

These examples are from the Java complete examples directory.

Python cookbook examples

These examples are from the Python cookbook examples directory.