Beam SQL Walkthrough
This page illustrates the usage of Beam SQL with example code.
Beam Schemas and Rows
A SQL query can only be applied to a PCollection<T>
where T
has a schema registered, or a PCollection<Row>
. See the
schema documentation in
the Beam Programming Guide for details on registering a schema for a type T
.
If you don’t have an existing type T
, a PCollection<Row>
can be obtained
multiple ways, for example:
From in-memory data (typically for unit testing).
Note: you have to explicitly specify the
Row
coder. In this example we’re doing it by callingCreate.of(..)
:// Define the schema for the records. Schema appSchema = Schema .builder() .addInt32Field("appId") .addStringField("description") .addDateTimeField("rowtime") .build(); // Create a concrete row with that type. Row row = Row .withSchema(appSchema) .addValues(1, "Some cool app", new Date()) .build(); // Create a source PCollection containing only that row PCollection<Row> testApps = PBegin .in(p) .apply(Create .of(row) .withCoder(RowCoder.of(appSchema)));
From a
PCollection<T>
of records of some other type (i.e.T
is not already aRow
), by applying aParDo
that converts input records toRow
format:// An example POJO class. class AppPojo { Integer appId; String description; Date timestamp; } // Acquire a collection of POJOs somehow. PCollection<AppPojo> pojos = ... // Convert them to Rows with the same schema as defined above via a DoFn. PCollection<Row> apps = pojos .apply( ParDo.of(new DoFn<AppPojo, Row>() { @ProcessElement public void processElement(ProcessContext c) { // Get the current POJO instance AppPojo pojo = c.element(); // Create a Row with the appSchema schema // and values from the current POJO Row appRow = Row .withSchema(appSchema) .addValues( pojo.appId, pojo.description, pojo.timestamp) .build(); // Output the Row representing the current POJO c.output(appRow); } })).setRowSchema(appSchema);
As an output of another
SqlTransform
. Details in the next section.
Once you have a PCollection<Row>
in hand, you may use SqlTransform
to apply SQL queries to it.
SqlTransform
SqlTransform.query(queryString)
method is the only API to create a PTransform
from a string representation of the SQL query. You can apply this PTransform
to either a single PCollection
or a PCollectionTuple
which holds multiple
PCollections
:
when applying to a single
PCollection
it can be referenced via the table namePCOLLECTION
in the query:when applying to a
PCollectionTuple
, the tuple tag for eachPCollection
in the tuple defines the table name that may be used to query it. Note that table names are bound to the specificPCollectionTuple
, and thus are only valid in the context of queries applied to it.For example, you can join two
PCollections
:// Create the schema for reviews Schema reviewSchema = Schema .builder() .addInt32Field("appId") .addInt32Field("reviewerId") .addFloatField("rating") .addDateTimeField("rowtime") .build(); // Obtain the reviews records with this schema PCollection<Row> reviewsRows = ... // Create a PCollectionTuple containing both PCollections. // TupleTags IDs will be used as table names in the SQL query PCollectionTuple namesAndFoods = PCollectionTuple .of(new TupleTag<>("Apps"), appsRows) // appsRows from the previous example .and(new TupleTag<>("Reviews"), reviewsRows); // Compute the total number of reviews // and average rating per app // by joining two PCollections PCollection<Row> output = namesAndFoods.apply( SqlTransform.query( "SELECT Apps.appId, COUNT(Reviews.rating), AVG(Reviews.rating) " + "FROM Apps INNER JOIN Reviews ON Apps.appId = Reviews.appId " + "GROUP BY Apps.appId"));
BeamSqlExample in the code repository shows basic usage of both APIs.