Beam SQL extensions: CREATE EXTERNAL TABLE

Beam SQL’s CREATE EXTERNAL TABLE statement registers a virtual table that maps to an external storage system. For some storage systems, CREATE EXTERNAL TABLE does not create a physical table until a write occurs. After the physical table exists, you can access the table with the SELECT, JOIN, and INSERT INTO statements.

The CREATE EXTERNAL TABLE statement includes a schema and extended clauses.

Syntax

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE type
[LOCATION location]
[TBLPROPERTIES tblProperties]

simpleType: TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL | BOOLEAN | DATE | TIME | TIMESTAMP | CHAR | VARCHAR

fieldType: simpleType | MAP<simpleType, fieldType> | ARRAY<fieldType> | ROW<tableElement [, tableElement ]*>

tableElement: columnName fieldType [ NOT NULL ]

BigQuery

Syntax

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE bigquery
LOCATION '[PROJECT_ID]:[DATASET].[TABLE]'
TBLPROPERTIES '{"method": "DIRECT_READ"}'

Read Mode

Beam SQL supports reading columns with simple types (simpleType) and arrays of simple types (ARRAY<simpleType>).

When reading using EXPORT method the following pipeline options should be set:

When reading using DIRECT_READ method, an optimizer will attempt to perform project and predicate push-down, potentially reducing the time requited to read the data from BigQuery.

More information about the BigQuery Storage API can be found here.

Write Mode

if the table does not exist, Beam creates the table specified in location when the first record is written. If the table does exist, the specified columns must match the existing table.

Schema

Schema-related errors will cause the pipeline to crash. The Map type is not supported. Beam SQL types map to BigQuery Standard SQL types as follows:

Beam SQL TypeBigQuery Standard SQL Type
TINYINT, SMALLINT, INTEGER, BIGINT  INT64
FLOAT, DOUBLE, DECIMALFLOAT64
BOOLEANBOOL
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
CHAR, VARCHARSTRING
MAP(not supported)
ARRAYARRAY
ROWSTRUCT

Example

CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE bigquery
LOCATION 'testing-integration:apache.users'

Cloud Bigtable

Syntax

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
    key VARCHAR NOT NULL,
    family ROW<qualifier cells [, qualifier cells ]* >
    [, family ROW< qualifier cells [, qualifier cells ]* > ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'

An alternative syntax with a flat schema:

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
    key VARCHAR NOT NULL,
    qualifier SIMPLE_TYPE
    [, qualifier SIMPLE_TYPE ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'
TBLPROPERTIES '{
  "columnsMapping": "family:qualifier[,family:qualifier]*"
}'

Read Mode

Beam SQL supports reading rows with mandatory key field, at least one family with at least one qualifier. Cells are represented as simple types (SIMPLE_TYPE) or ROW type with a mandatory val field, optional timestampMicros and optional labels. Both read the latest cell in the column. Cells specified as Arrays of simple types (ARRAY<simpleType>) allow to read all the column’s values.

For flat schema only SIMPLE_TYPE values are allowed. Every field except for key must correspond to the key-values pairs specified in columnsMapping.

Not all existing column families and qualifiers have to be provided to the schema.

Filters are only allowed by key field with single LIKE statement with RE2 Syntax regex, e.g. SELECT * FROM table WHERE key LIKE '^key[012]{1}'

Write Mode

Supported for flat schema only.

Example

CREATE EXTERNAL TABLE beamTable(
  key VARCHAR NOT NULL,
  beamFamily ROW<
     boolLatest BOOLEAN NOT NULL,
     longLatestWithTs ROW<
        val BIGINT NOT NULL,
        timestampMicros BIGINT NOT NULL
      > NOT NULL,
      allStrings ARRAY<VARCHAR> NOT NULL,
      doubleLatestWithTsAndLabels ROW<
        val DOUBLE NOT NULL,
        timestampMicros BIGINT NOT NULL,
        labels ARRAY<VARCHAR> NOT NULL
      > NOT NULL,
      binaryLatestWithLabels ROW<
         val BINARY NOT NULL,
         labels ARRAY<VARCHAR> NOT NULL
      > NOT NULL
    > NOT NULL
  )
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/beamTable'

Flat schema example:

CREATE EXTERNAL TABLE flatTable(
  key VARCHAR NOT NULL,
  boolColumn BOOLEAN NOT NULL,
  longColumn BIGINT NOT NULL,
  stringColumn VARCHAR NOT NULL,
  doubleColumn DOUBLE NOT NULL,
  binaryColumn BINARY NOT NULL
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/flatTable'
TBLPROPERTIES '{
  "columnsMapping": "f:boolColumn,f:longColumn,f:stringColumn,f2:doubleColumn,f2:binaryColumn"
}'

Write example:

INSERT INTO writeTable(key, boolColumn, longColumn, stringColumn, doubleColumn)
  VALUES ('key', TRUE, 10, 'stringValue', 5.5)

Pub/Sub

Syntax

Nested mode

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
    event_timestamp TIMESTAMP,
    attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
    payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'

Flattened mode

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'

In nested mode, the following fields hold topic metadata. The presence of the attributes field triggers nested mode usage.

Read Mode

PubsubIO supports reading from topics by creating a new subscription.

Write Mode

PubsubIO supports writing to topics.

Schema

Pub/Sub messages have metadata associated with them, and you can reference this metadata in your queries. For each message, Pub/Sub exposes its publish time and a map of user-provided attributes in addition to the payload (unstructured in the general case). This information must be preserved and accessible from the SQL statements. Currently, this means that PubsubIO tables require you to declare a special set of columns, as shown below.

Supported Payload

Example

CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsub
LOCATION 'projects/testing-integration/topics/user-location'

Pub/Sub Lite

Syntax

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
    publish_timestamp DATETIME,
    event_timestamp DATETIME,
    message_key BYTES,
    attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
    payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsublite
// For writing
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
// For reading
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'

Read Mode

PubsubLiteIO supports reading from subscriptions.

Write Mode

PubsubLiteIO supports writing to topics.

Supported Payload

Example

CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsublite
LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'

Kafka

KafkaIO is experimental in Beam SQL.

Syntax

Flattened mode

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
    "topics": ["topic2", "topic3"],
    "format": "json"
}'

Nested mode

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
  event_timestamp DATETIME,
  message_key BYTES,
  headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
  payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
    "topics": ["topic2", "topic3"],
    "format": "json"
}'

The presence of the headers field triggers nested mode usage.

Read Mode

Read Mode supports reading from a topic.

Write Mode

Write Mode supports writing to a topic.

Supported Formats

Schema

For CSV only simple types are supported.

Parquet

Syntax

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE parquet
LOCATION '/path/to/files/'

Read Mode

Supports reading from Parquet files specified by the LOCATION. Predicate and projection push-down are supported to improve performance.

Write Mode

Supports writing to a set of sharded Parquet files in a specified directory.

Schema

The specified schema is used to read and write Parquet files. The schema is converted to an Avro schema internally for ParquetIO. Beam SQL types map to Avro types as follows:

Beam SQL TypeAvro Type
TINYINT, SMALLINT, INTEGER, BIGINT  long
FLOAT, DOUBLEdouble
DECIMALbytes (with logical type)
BOOLEANboolean
DATE, TIME, TIMESTAMPlong (with logical type)
CHAR, VARCHARstring
ARRAYarray
ROWrecord

Example

CREATE EXTERNAL TABLE daily_orders (
  order_id BIGINT,
  product_name VARCHAR,
  purchase_ts TIMESTAMP
)
TYPE parquet
LOCATION '/gcs/my-data/orders/2025-07-14/*';

MongoDB

Syntax

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE mongodb
LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]'

Read Mode

Read Mode supports reading from a collection.

Write Mode

Write Mode supports writing to a collection.

Schema

Only simple types are supported. MongoDB documents are mapped to Beam SQL types via JsonToRow transform.

Example

CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE mongodb
LOCATION 'mongodb://localhost:27017/apache/users'

Text

TextIO is experimental in Beam SQL. Read Mode and Write Mode do not currently access the same underlying data.

Syntax

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE text
LOCATION '/home/admin/orders'
TBLPROPERTIES '{"format: "Excel"}'
Value for formatField delimiterQuoteRecord separatorIgnore empty lines?Allow missing column names?
default,"\r\nYesNo
rfc4180,"\r\nNoNo
excel,"\r\nNoYes
tdf\t"\r\nYesNo
mysql\tnone\nNoNo

Read Mode

Read Mode supports reading from a file.

Write Mode

Write Mode supports writing to a set of files. TextIO creates file on writes.

Supported Payload

Schema

Only simple types are supported.

Example

CREATE EXTERNAL TABLE orders (id INTEGER, price INTEGER)
TYPE text
LOCATION '/home/admin/orders'

DataGen

The DataGen connector allows for creating tables based on in-memory data generation. This is useful for developing and testing queries locally without requiring access to external systems. The DataGen connector is built-in; no additional dependencies are required.It is available for Beam 2.67.0+

Tables can be either bounded (generating a fixed number of rows) or unbounded (generating a stream of rows at a specific rate). The connector provides fine-grained controls to customize the generated values for each field, including support for event-time windowing.

Syntax

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE datagen
[TBLPROPERTIES tblProperties]

Table Properties (TBLPROPERTIES)

The TBLPROPERTIES JSON object is used to configure the generator’s behavior.

General Options

KeyRequiredDescription
number-of-rowsYes (or rows-per-second)Creates a bounded table with a specified total number of rows.
rows-per-secondYes (or number-of-rows)Creates an unbounded table that generates rows at the specified rate.

Event-Time and Watermark Configuration

KeyRequiredDescription
timestamp.behaviorNoSpecifies the time handling. Can be 'processing-time' (default) or 'event-time'.
event-time.timestamp-columnYes, if timestamp.behavior is event-timeThe name of the column that will be used to drive the event-time watermark for the stream.
event-time.max-out-of-ordernessNoWhen using event-time, this sets the maximum out-of-orderness in milliseconds for generated timestamps (e.g., '5000' for 5 seconds). Defaults to 0.

Field-Specific Options

You can customize the generation logic for each column by providing properties with the prefix fields.<columnName>.*.

KeyDescription
kindThe type of generator to use. Can be 'random' (default) or 'sequence'.
null-rateA double between 0.0 and 1.0 indicating the probability that the generated value for this field will be NULL. Defaults to 0.0.
lengthFor VARCHAR fields with kind: 'random', specifies the exact length of the generated string. Defaults to 10.
min, maxFor numeric types (BIGINT, INTEGER, DOUBLE, etc.) with kind: 'random', specifies the inclusive minimum and maximum values for the generated number.
start, endFor BIGINT fields with kind: 'sequence', specifies the inclusive start and end values of the sequence. The sequence will cycle from start to end.
max-pastFor TIMESTAMP fields, specifies the maximum duration in milliseconds in the past to generate a random timestamp from. If not set, timestamps are generated for the current time.

Data Type Behavior

Examples

Bounded Table with Random Data

This example creates a bounded table with 1000 rows. The id will be a random BIGINT and product_name will be a random VARCHAR of length 10.

CREATE EXTERNAL TABLE Orders (
    id BIGINT,
    product_name VARCHAR
)
TYPE datagen
TBLPROPERTIES '{
  "number-of-rows": "1000"
}'

Unbounded Streaming Table

This example creates a streaming table that generates 10 rows per second.

CREATE EXTERNAL TABLE user_impressions (
    user_id VARCHAR,
    impression_time TIMESTAMP
)
TYPE datagen
TBLPROPERTIES '{
  "rows-per-second": "10"
}'

Bounded Table with Custom Field Generation

This is a comprehensive example demonstrating various field-level customizations. The table is bounded because a sequence generator is used.

CREATE EXTERNAL TABLE user_clicks (
    event_id BIGINT,
    user_id VARCHAR,
    click_timestamp TIMESTAMP,
    score DOUBLE
)
TYPE 'datagen'
TBLPROPERTIES '{
  "number-of-rows": "1000000",
  "fields.event_id.kind": "sequence",
  "fields.event_id.start": "1",
  "fields.event_id.end": "1000000",
  "fields.user_id.kind": "random",
  "fields.user_id.length": "12",
  "fields.click_timestamp.kind": "random",
  "fields.click_timestamp.max-past": "60000",
  "fields.score.kind": "random",
  "fields.score.min": "0.0",
  "fields.score.max": "1.0",
  "fields.score.null-rate": "0.1"
}'

Unbounded Streaming Table with Event Time

This example creates a streaming table that generates 10 rows per second. It uses the click_timestamp column to drive the event-time watermark, allowing for up to 5 seconds of out-of-order data. The ingestion_timestamp column is populated separately with the processing time.

CREATE EXTERNAL TABLE user_clicks (
    event_id BIGINT,
    user_id VARCHAR,
    click_timestamp TIMESTAMP,
    ingestion_timestamp TIMESTAMP
)
TYPE 'datagen'
TBLPROPERTIES '{
  "rows-per-second": "10",
  "timestamp.behavior": "event-time",
  "event-time.timestamp-column": "click_timestamp",
  "event-time.max-out-of-orderness": "5000",
  "fields.event_id.kind": "sequence",
  "fields.event_id.start": "1",
  "fields.event_id.end": "1000000",
  "fields.user_id.kind": "random",
  "fields.user_id.length": "12",
  "fields.ingestion_timestamp.kind": "timestamp"
}'

Generic Payload Handling

Certain data sources and sinks support generic payload handling. This handling parses a byte array payload field into a table schema. The following schemas are supported by this handling. All require at least setting "format": "<type>", and may require other properties.

Generic DLQ Handling

Sources and sinks which support generic DLQ handling specify a parameter with the format "<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]". The following types of DLQ handling are supported: