Beam SQL DDL
Beam SQL Data Definition Language (DDL) provides a standard three-level hierarchy to manage metadata across external data sources, making it easy to explore available data structures and query data across different systems.
- Catalog: The top-level container representing an external metadata provider. For example, a Hive Metastore, AWS Glue, or a Lakehouse (formerly BigLake) Catalog.
- Database: A logical grouping within a Catalog. This typically maps to a “Schema” in traditional RDBMS or a “Namespace” in systems like Apache Iceberg.
- Table: The leaf node containing the schema definition and the underlying data.
Beam can resolve multiple Catalogs simultaneously. This structure enables Federated Querying, meaning you can execute complex pipelines that bridge disparate environments within a single SQL statement. For example, you could jointly query a production BigQuery table and a development Iceberg dataset in Cloud Storage.
By using fully qualified names (for example, catalog.database.table), you can perform cross-catalog joins or
migrate data between cloud providers without manual schema mapping or intermediate storage.
Below are details about metadata management at each level:
Catalogs
The Catalog is the entry point for external metadata. When you initialize Beam SQL, you start off with a default Catalog that contains a default Database.
You can register new Catalogs, switch between them, and modify their configurations.
Registers a new Catalog instance
Note: Creating a Catalog does not automatically switch to it. Remember
to run USE CATALOG afterwards to set it.
Example: Creating a Hadoop Catalog (Local Storage)
Example: Registering a Lakehouse Catalog (GCS)
CREATE CATALOG prod_iceberg
TYPE iceberg
PROPERTIES (
'type' = 'rest',
'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse' = 'gs://my-company-bucket/warehouse',
'header.x-goog-user-project' = 'my_prod_project',
'rest.auth.type' = 'google',
'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'header.X-Iceberg-Access-Delegation' = 'vended-credentials'
);Sets the active Catalog for the current session. This simplifies queries by allowing you
to reference Databases directly without their fully-qualified names. For example, you can use my_db instead of my_catalog.my_db.
Tip: run SHOW CURRENT CATALOG to view the currently active Catalog.
Note: All subsequent DATABASE and TABLE commands will be executed under this Catalog, unless fully qualified.
- SET: Adds new properties or updates existing ones.
- RESET / UNSET: Removes properties.
Can be used to either:
- List Catalogs registered in this Beam SQL session.
- View the currently active Catalog.
Example: List all Catalogs
Example: List Catalogs matching a pattern
Example: Verify which Catalog is currently active
Unregisters a Catalog from the current Beam SQL session. This does not destroy external data.
Databases
A Database lives inside a Catalog and may contain a number of Tables.
Creates a new Database within the current Catalog (default), or the specified Catalog.
Note: Creating a Database does not automatically switch to it. Remember
to run USE DATABASE afterwards to set it.
Example: Create a Database in the current active Catalog
Example: Create a Database in a specified registered Catalog
Sets the active Database for the current session. This simplifies queries by allowing you
to reference Databases directly without their fully-qualified names (for example, my_db instead of my_catalog.my_db)
Note: All subsequent TABLE commands will be executed under this Database, unless fully qualified.
Switch to a Database in a specified Catalog.
Note: this also switches the default to that Catalog
Can be used to either:
- List Databases within the currently active Catalog, or a specified Catalog.
- View the currently active Database.
Example: List Databases in the currently active Catalog
Example: List Databases in a specified Catalog
Example: List Databases matching a pattern
Example: Verify which Database is currently active
Unregisters a Database from the current session. For some connectors, this will also delete the Database from the external data source.
- RESTRICT (Default): Fails if the Database is not empty.
- CASCADE: Drops the Database and all tables contained within it. Use with caution.
Tables
The actual entity containing data, and is described by a schema. Some data sources also let you apply a partition spec or attach table-specific properties.
Creates a new Table within the current Catalog and Database (default), or the Catalog and Database you specify.
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] [ catalog. ][ db. ]table_name (
col_name col_type [ NOT NULL ] [ COMMENT 'col_comment' ],
...
)
TYPE 'type_name'
[ PARTITIONED BY ( 'partition_field' [, ... ] ) ]
[ COMMENT 'table_comment' ]
[ LOCATION 'location_uri' ]
[ TBLPROPERTIES 'properties_json_string' ];- TYPE: the table type (for example,
'iceberg','text','kafka'). - PARTITIONED BY: an ordered list of fields describing the partition spec.
- LOCATION: explicitly sets the location of the table. This overrides the inferred
catalog.db.table_namelocation. - TBLPROPERTIES: configuration properties used when creating the table or setting up its IO connection.
Example: Creating an Iceberg Table
CREATE EXTERNAL TABLE prod_iceberg.sales_data.orders (
order_id BIGINT NOT NULL COMMENT 'Unique order identifier',
amount DECIMAL(10, 2),
order_date TIMESTAMP,
region_id VARCHAR
)
TYPE 'iceberg'
PARTITIONED BY ( 'region_id', 'day(order_date)' )
COMMENT 'Daily sales transactions'
TBLPROPERTIES '{
"write.format.default": "parquet",
"read.split.target-size": 268435456",
"beam.write.triggering_frequency_seconds": 60"
}';- This creates an Iceberg table named
ordersunder the namespacesales_data, within theprod_icebergcatalog. - The table is partitioned by
region_id, then by the day value oforder_date(using Iceberg's hidden partitioning). - The table is created with the appropriate properties
"write.format.default"and"read.split.target-size". The Beam property"beam.write.triggering_frequency_seconds"configures the Iceberg sink. - Beam sink and source configuration properties are prefixed with
"beam.write."and"beam.read.", respectively.
Example: Add or remove columns
Example: Modify partition spec
Example: Modify table properties
Lists tables under the currently active database, or a database you specify.
Example: List tables in the currently active database and catalog
Example: List tables in a specified database
Example: List tables matching a pattern

