Providers
Though we aim to offer a large suite of built-in transforms, it is inevitable that people will need to author their own. This is made possible through the notion of Providers which leverage expansion services and vend catalogues of schema transforms.
Java
Exposing transform in Java that can be used in a YAML pipeline consists of four main steps:
- Defining the transformation itself as a PTransform that consumes and produces zero or more schema’d PCollections.
- Exposing this transform via a SchemaTransformProvider which provides an identifier used to refer to this transform later as well as metadata like a human-readable description and its configuration parameters.
- Building a Jar that contains these classes and vends them via the Service Loader infrastructure.
- Writing a provider specification that tells Beam YAML where to find this jar and what it contains.
If the transform is already exposed as a cross language transform or schema transform then steps 1-3 have been done for you. One then uses this transform as follows:
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: MyCustomTransform
config:
arg: whatever
sink:
type: WriteToJson
config:
path: /path/to/output.json
providers:
- type: javaJar
config:
jar: /path/or/url/to/myExpansionService.jar
transforms:
MyCustomTransform: "urn:registered:in:expansion:service"
We provide a full cloneable example of how to build a java provider that can be used to get started.
Python
Arbitrary Python transforms can be provided as well, using the syntax
providers:
- type: pythonPackage
config:
packages:
- my_pypi_package>=version
- /path/to/local/package.zip
transforms:
MyCustomTransform: "pkg.module.PTransformClassOrCallable"
which can then be used as
- type: MyCustomTransform
config:
num: 3
arg: whatever
This will cause the dependencies to be installed before the transform is
imported (via its given fully qualified name) and instantiated
with the config values passed as keyword arguments (e.g. in this case
pkg.module.PTransformClassOrCallable(num=3, arg="whatever")
).
We offer a python provider starter project that serves as a complete example for how to do this.
YAML
New, re-usable transforms can be defined in YAML as well. This type of provider simply has a mapping of names to their YAML definitions. Jinja2 templatization of their string representations is used to parameterize them.
The config_schema
section of the transform definition specifies what
parameters are required (with their types) and the body
section gives
the implementation in terms of other YAML transforms.
- type: yaml
transforms:
# Define the first transform of type "RaiseElementToPower"
RaiseElementToPower:
config_schema:
properties:
n: {type: integer}
body:
type: MapToFields
config:
language: python
append: true
fields:
power: "element ** {{n}}"
# Define a second transform that produces consecutive integers.
Range:
config_schema:
properties:
end: {type: integer}
# Setting this parameter lets this transform type be used as a source.
requires_inputs: false
body: |
type: Create
config:
elements:
{% for ix in range(end) %}
- {{ix}}
{% endfor %}
Note that in this second example the body
of Range is defined as a
block string literal
to prevent any attempt by the system to parse the {%
and %}
pragmas used
for control statements before a specialization with a concrete value for end
is instantiated and the loop is expanded.
These could then be used in a pipeline as
transforms:
- type: Range
config:
end: 10
- type: RaiseElementToPower
input: Range
config:
n: 3
...
One can define composite transforms as well, e.g. in a provider listing one could have
- type: yaml
transforms:
ConsecutivePowers:
# This takes two parameters.
config_schema:
properties:
end: {type: integer}
n: {type: integer}
# It can be used as a source transform.
requires_inputs: false
# The body uses the transforms defined above linked together in a chain.
body: |
type: chain
transforms:
- type: Range
config:
end: {{end}}
- type: RaiseElementToPower
config:
n: {{n}}
which allows one to use this whole fragment as
type: ConsecutivePowers
config:
end: 10
n: 3
Note that YAML-defined transforms work better in a listing file than directly
in the providers
block of a pipeline file as pipeline files are always
pre-processed with Jinja2 themselves which would necessitate double escaping.
YAML Provider listing files
One can reference an external listings of providers in the yaml pipeline file via the syntax
providers:
- include: "file:///path/to/local/providers.yaml"
- include: "gs://path/to/remote/providers.yaml"
- include: "https://example.com/hosted/providers.yaml"
...
where providers.yaml
is simply a yaml file containing a list of providers
in the same format as those inlined in this providers block.
See, for example, the provider listing here.
In fact, this is how many of the the built in transforms are declared, see for example the builtin io listing file.
Hosting these listing files (together with their required artifacts) allows one to easily share catalogues of transforms that can be directly used by others in their YAML pipelines.