Beam SQL Walkthrough
This page illustrates the usage of Beam SQL with example code.
Beam Schemas and Rows
A SQL query can only be applied to a PCollection<T>
where T has a schema registered, or a PCollection<Row>. See the
schema documentation in
the Beam Programming Guide for details on registering a schema for a type T.
If you don’t have an existing type T, a PCollection<Row> can be obtained
multiple ways, for example:
From in-memory data (typically for unit testing).
Note: you have to explicitly specify the
Rowcoder. In this example we’re doing it by callingCreate.of(..):// Define the schema for the records. Schema appSchema = Schema .builder() .addInt32Field("appId") .addStringField("description") .addDateTimeField("rowtime") .build(); // Create a concrete row with that type. Row row = Row .withSchema(appSchema) .addValues(1, "Some cool app", new Date()) .build(); // Create a source PCollection containing only that row PCollection<Row> testApps = PBegin .in(p) .apply(Create .of(row) .withCoder(RowCoder.of(appSchema)));From a
PCollection<T>of records of some other type (i.e.Tis not already aRow), by applying aParDothat converts input records toRowformat:// An example POJO class. class AppPojo { Integer appId; String description; Date timestamp; } // Acquire a collection of POJOs somehow. PCollection<AppPojo> pojos = ... // Convert them to Rows with the same schema as defined above via a DoFn. PCollection<Row> apps = pojos .apply( ParDo.of(new DoFn<AppPojo, Row>() { @ProcessElement public void processElement(ProcessContext c) { // Get the current POJO instance AppPojo pojo = c.element(); // Create a Row with the appSchema schema // and values from the current POJO Row appRow = Row .withSchema(appSchema) .addValues( pojo.appId, pojo.description, pojo.timestamp) .build(); // Output the Row representing the current POJO c.output(appRow); } })).setRowSchema(appSchema);As an output of another
SqlTransform. Details in the next section.
Once you have a PCollection<Row> in hand, you may use SqlTransform to apply SQL queries to it.
SqlTransform
SqlTransform.query(queryString) method is the only API to create a PTransform
from a string representation of the SQL query. You can apply this PTransform
to either a single PCollection or a PCollectionTuple which holds multiple
PCollections:
when applying to a single
PCollectionit can be referenced via the table namePCOLLECTIONin the query:when applying to a
PCollectionTuple, the tuple tag for eachPCollectionin the tuple defines the table name that may be used to query it. Note that table names are bound to the specificPCollectionTuple, and thus are only valid in the context of queries applied to it.For example, you can join two
PCollections:// Create the schema for reviews Schema reviewSchema = Schema .builder() .addInt32Field("appId") .addInt32Field("reviewerId") .addFloatField("rating") .addDateTimeField("rowtime") .build(); // Obtain the reviews records with this schema PCollection<Row> reviewsRows = ... // Create a PCollectionTuple containing both PCollections. // TupleTags IDs will be used as table names in the SQL query PCollectionTuple namesAndFoods = PCollectionTuple .of(new TupleTag<>("Apps"), appsRows) // appsRows from the previous example .and(new TupleTag<>("Reviews"), reviewsRows); // Compute the total number of reviews // and average rating per app // by joining two PCollections PCollection<Row> output = namesAndFoods.apply( SqlTransform.query( "SELECT Apps.appId, COUNT(Reviews.rating), AVG(Reviews.rating) " + "FROM Apps INNER JOIN Reviews ON Apps.appId = Reviews.appId " + "GROUP BY Apps.appId"));
BeamSqlExample in the code repository shows basic usage of both APIs.

