Beam YAML mappings
Beam YAML has the ability to do simple transformations which can be used to
get data into the correct shape. The simplest of these is MapToFields
which creates records with new fields defined in terms of the input fields.
Field renames
To rename fields one can write
- type: MapToFields
config:
fields:
new_col1: col1
new_col2: col2
will result in an output where each record has two fields,
new_col1
and new_col2
, whose values are those of col1
and col2
respectively (which are the names of two fields from the input schema).
One can specify the append parameter which indicates the original fields should
be retained similar to the use of *
in an SQL select statement. For example
- type: MapToFields
config:
append: true
fields:
new_col1: col1
new_col2: col2
will output records that have new_col1
and new_col2
as additional
fields. When the append field is specified, one can drop fields as well, e.g.
- type: MapToFields
config:
append: true
drop:
- col3
fields:
new_col1: col1
new_col2: col2
which includes all original fields except col3 in addition to outputting the two new ones.
Mapping functions
Of course one may want to do transformations beyond just dropping and renaming fields. Beam YAML has the ability to inline simple UDFs. This requires a language specification. For example, we can provide a Python expression referencing the input fields
- type: MapToFields
config:
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
In addition, one can provide a full Python callable that takes the row as an argument to do more complex mappings (see PythonCallableSource for acceptable formats). Thus, one can write
- type: MapToFields
config:
language: python
fields:
new_col:
callable: |
import re
def my_mapping(row):
if re.match("[0-9]+", row.col1) and row.col2 > 0:
return "good"
else:
return "bad"
Once one reaches a certain level of complexity, it may be preferable to package this up as a dependency and simply refer to it by fully qualified name, e.g.
- type: MapToFields
config:
language: python
fields:
new_col:
callable: pkg.module.fn
It is also possible to store the function logic in a file and point to the function name, e.g.
- type: MapToFields
config:
language: python
fields:
new_col:
path: /path/to/some/udf.py
name: my_mapping
Currently, in addition to Python, Java, SQL, and JavaScript (experimental) expressions are supported as well
Java
When using Java mappings, the UDF type must be declared, even for simple expressions, e.g.
- type: MapToFields
config:
language: java
fields:
new_col:
expression: col1.toUpperCase()
For callable UDFs, Java requires that the function be declared as a class that implements
java.util.function.Function
, e.g.
- type: MapToFields
config:
language: java
fields:
new_col:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
public class MyFunction implements Function<Row, String> {
public String apply(Row row) {
return row.getString("col1").toUpperCase();
}
}
SQL
When SQL is used for a MapToFields
UDF, it is essentially the SQL SELECT
statement.
For example, the query SELECT UPPER(col1) AS new_col, "col2 + col3" AS another_col FROM PCOLLECTION
would
look like:
- type: MapToFields
config:
language: sql
fields:
new_col: "UPPER(col1)"
another_col: "col2 + col3"
keeping in mind that any fields not intended to be included in the output should be added to the drop
field.
If one wanted to select a field that collides with a reserved SQL keyword, the field(s) must be surrounded in backticks. For example, say the incoming PCollection has a field “timestamp”, one would have to write:
- type: MapToFields
config:
language: sql
fields:
new_col: "`timestamp`"
Note: the field mapping tags and fields defined in drop
do not need to be escaped. Only the UDF itself
needs to be a valid SQL statement.
Generic
If a language is not specified the set of expressions is limited to pre-existing fields and integer, floating point, or string literals. For example
- type: MapToFields
config:
fields:
new_col: col1
int_literal: 389
float_litera: 1.90216
str_literal: '"example"' # note the double quoting
FlatMap
Sometimes it may be desirable to emit more (or less) than one record for each
input record. This can be accomplished by mapping to an iterable type and
following the mapping with an Explode
operation, e.g.
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "col2 + col3"
- type: Explode
config:
fields: new_col
will result in three output records for every input record.
If more than one record is to be exploded, one must specify whether the cross product over all fields should be taken. For example
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: true
will emit nine records whereas
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: false
will only emit three.
The Explode
operation can be used on its own if the field in question is
already an iterable type.
- type: Explode
config:
fields: [col1]
Filtering
Sometimes it can be desirable to only keep records that satisfy a certain
criteria. This can be accomplished with a Filter
transform, e.g.
- type: Filter
config:
keep: "col2 > 0"
For anything more complicated than a simple comparison between existing
fields and numeric literals a language
parameter must be provided, e.g.
- type: Filter
config:
language: python
keep: "col2 + col3 > 0"
For more complicated filtering functions, one can provide a full Python callable that takes the row as an argument to do more complex mappings (see PythonCallableSource for acceptable formats). Thus, one can write
- type: Filter
config:
language: python
keep:
callable: |
import re
def my_filter(row):
return re.match("[0-9]+", row.col1) and row.col2 > 0
Once one reaches a certain level of complexity, it may be preferable to package this up as a dependency and simply refer to it by fully qualified name, e.g.
- type: Filter
config:
language: python
keep:
callable: pkg.module.fn
It is also possible to store the function logic in a file and point to the function name, e.g.
- type: Filter
config:
language: python
keep:
path: /path/to/some/udf.py
name: my_filter
Currently, in addition to Python, Java, SQL, and JavaScript (experimental) expressions are supported as well
Java
When using Java filtering, the UDF type must be declared, even for simple expressions, e.g.
- type: Filter
config:
language: java
keep:
expression: col2 > 0
For callable UDFs, Java requires that the function be declared as a class that implements
java.util.function.Function
, e.g.
- type: Filter
config:
language: java
keep:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
import java.util.regex.Pattern;
public class MyFunction implements Function<Row, Boolean> {
public Boolean apply(Row row) {
Pattern pattern = Pattern.compile("[0-9]+");
return pattern.matcher(row.getString("col1")).matches() && row.getInt64("col2") > 0;
}
}
SQL
Similar to Mapping Functions, when SQL is used for a MapToFields
UDF, it is essentially theSQL WHERE
statement.
For example, the query SELECT * FROM PCOLLECTION WHERE col2 > 0
would
look like:
- type: Filter
config:
language: sql
keep: "col2 > 0"
If one wanted to filter on a field that collides with a reserved SQL keyword, the field(s) must be surrounded in backticks. For example, say the incoming PCollection has a field “timestamp”, one would have to write:
- type: Filter
config:
language: sql
keep: "`timestamp` > 0"
Partitioning
It can also be useful to send different elements to different places
(similar to what is done with side outputs in other SDKs).
While this can be done with a set of Filter
operations, if every
element has a single destination it can be more natural to use a Partition
transform instead which sends every element to a unique output.
For example, this will send all elements where col1
is equal to "a"
to the
output Partition.a
.
- type: Partition
input: input
config:
by: col1
outputs: ['a', 'b', 'c']
- type: SomeTransform
input: Partition.a
config:
param: ...
- type: AnotherTransform
input: Partition.b
config:
param: ...
One can also specify the destination as a function, e.g.
- type: Partition
input: input
config:
by: "'even' if col2 % 2 == 0 else 'odd'"
language: python
outputs: ['even', 'odd']
One can optionally provide a catch-all output which will capture all elements that are not in the named outputs (which would otherwise be an error):
- type: Partition
input: input
config:
by: col1
outputs: ['a', 'b', 'c']
unknown_output: 'other'
Sometimes one wants to split a PCollection into multiple PCollections
that aren’t necessarily disjoint. To send elements to multiple (or no) outputs,
one could use an iterable column and precede the Partition
with an Explode
.
- type: Explode
input: input
config:
fields: col1
- type: Partition
input: Explode
config:
by: col1
outputs: ['a', 'b', 'c']
Types
Beam will try to infer the types involved in the mappings, but sometimes this is not possible. In these cases one can explicitly denote the expected output type, e.g.
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
The expected type is given in json schema notation, with the addition that
a top-level basic types may be given as a literal string rather than requiring
a {type: 'basic_type_name'}
nesting.
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
another_col:
expression: "beam.Row(a=col1, b=[col2])"
output_type:
type: 'object'
properties:
a:
type: 'string'
b:
type: 'array'
items:
type: 'number'
This can be especially useful to resolve errors involving the inability to
handle the beam:logical:pythonsdk_any:v1
type.