#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import io
import itertools
import re
import yaml
from apache_beam.portability.api import schema_pb2
from apache_beam.utils import subprocess_server
from apache_beam.yaml import json_utils
from apache_beam.yaml import yaml_provider
def _singular(name):
# Simply removing an 's' (or 'es', or 'ies', ...) may result in surprising
# manglings. Better to play it safe and leave a correctly-spelled plural
# than a botched singular in our examples configs.
return {
'args': 'arg',
'attributes': 'attribute',
'elements': 'element',
'fields': 'field',
}.get(name, name)
def _fake_value(name, beam_type):
type_info = beam_type.WhichOneof("type_info")
if type_info == "atomic_type":
if beam_type.atomic_type == schema_pb2.STRING:
return f'"{name}"'
elif beam_type.atomic_type == schema_pb2.BOOLEAN:
return "true|false"
else:
return name
elif type_info == "array_type":
return [
_fake_value(_singular(name), beam_type.array_type.element_type),
_fake_value(_singular(name), beam_type.array_type.element_type),
'...'
]
elif type_info == "iterable_type":
return [
_fake_value(_singular(name), beam_type.iterable_type.element_type),
_fake_value(_singular(name), beam_type.iterable_type.element_type),
'...'
]
elif type_info == "map_type":
if beam_type.map_type.key_type.atomic_type == schema_pb2.STRING:
return {
'a': _fake_value(name + '_value_a', beam_type.map_type.value_type),
'b': _fake_value(name + '_value_b', beam_type.map_type.value_type),
'c': '...',
}
else:
return {
_fake_value(name + '_key', beam_type.map_type.key_type): _fake_value(
name + '_value', beam_type.map_type.value_type)
}
elif type_info == "row_type":
return _fake_row(beam_type.row_type.schema)
elif type_info == "logical_type":
return name
else:
raise ValueError(f"Unrecognized type_info: {type_info!r}")
def _fake_row(schema):
if schema is None:
return '...'
return {f.name: _fake_value(f.name, f.type) for f in schema.fields}
[docs]
def pretty_example(provider, t, base_t=None):
spec = {'type': base_t or t}
try:
requires_inputs = provider.requires_inputs(t, {})
except Exception:
requires_inputs = False
if requires_inputs:
spec['input'] = '...'
config_schema = provider.config_schema(t)
if config_schema is None or config_schema.fields:
spec['config'] = _fake_row(config_schema)
s = yaml.dump(spec, sort_keys=False)
return s.replace("'", "")
[docs]
def config_docs(schema):
if schema is None:
return ''
elif not schema.fields:
return 'No configuration parameters.'
def pretty_type(beam_type):
type_info = beam_type.WhichOneof("type_info")
if type_info == "atomic_type":
return schema_pb2.AtomicType.Name(beam_type.atomic_type).lower()
elif type_info == "array_type":
return f'Array[{pretty_type(beam_type.array_type.element_type)}]'
elif type_info == "iterable_type":
return f'Iterable[{pretty_type(beam_type.iterable_type.element_type)}]'
elif type_info == "map_type":
return (
f'Map[{pretty_type(beam_type.map_type.key_type)}, '
f'{pretty_type(beam_type.map_type.value_type)}]')
elif type_info == "row_type":
return 'Row'
else:
return '?'
def maybe_row_parameters(t):
if t.WhichOneof("type_info") == "row_type":
return indent('\n\nRow fields:\n\n' + config_docs(t.row_type.schema), 4)
else:
return ''
def maybe_optional(t):
return " (Optional)" if t.nullable else ""
def lines():
for f in schema.fields:
yield ''.join([
f'**{f.name}** `{pretty_type(f.type)}`',
maybe_optional(f.type),
indent(': ' + f.description if f.description else '', 2),
maybe_row_parameters(f.type),
])
return '\n\n'.join('*' + indent(line, 2) for line in lines()).strip()
[docs]
def indent(lines, size):
return '\n'.join(' ' * size + line for line in lines.split('\n'))
[docs]
def longest(func, xs):
return max([func(x) or '' for x in xs], key=len)
[docs]
def io_grouping_key(transform_name):
"""Place reads and writes next to each other, after all other transforms."""
if transform_name.startswith('ReadFrom'):
return 1, transform_name[8:], 0
elif transform_name.startswith('WriteTo'):
return 1, transform_name[7:], 1
else:
return 0, transform_name
SKIP = [
'Combine',
'Filter',
'MapToFields',
]
[docs]
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--markdown_file')
parser.add_argument('--html_file')
parser.add_argument('--schema_file')
parser.add_argument('--include', default='.*')
parser.add_argument('--exclude', default='')
options = parser.parse_args()
include = re.compile(options.include).match
exclude = (
re.compile(options.exclude).match if options.exclude else lambda _: False)
with subprocess_server.SubprocessServer.cache_subprocesses():
json_config_schemas = []
markdown_out = io.StringIO()
providers = yaml_provider.standard_providers()
for transform_base, transforms in itertools.groupby(
sorted(providers.keys(), key=io_grouping_key),
key=lambda s: s.split('-')[0]):
transforms = list(transforms)
if include(transform_base) and not exclude(transform_base):
print(transform_base)
if options.markdown_file or options.html_file:
if '-' in transforms[0]:
extra_docs = 'Supported languages: ' + ', '.join(
t.split('-')[-1] for t in sorted(transforms))
else:
extra_docs = ''
markdown_out.write(
transform_docs(transform_base, transforms, providers, extra_docs))
markdown_out.write('\n\n')
if options.schema_file:
for transform in transforms:
schema = providers[transform][0].config_schema(transform)
if schema:
json_config_schemas.append({
'if': {
'properties': {
'type': {
'const': transform
}
}
},
'then': {
'properties': {
'config': {
'type': 'object',
'properties': {
'__line__': {
'type': 'integer'
},
'__uuid__': {},
**{
f.name: #
json_utils.beam_type_to_json_type(f.type)
for f in schema.fields
}
},
'additionalProperties': False,
}
}
}
})
if options.schema_file:
with open(options.schema_file, 'w') as fout:
yaml.dump(json_config_schemas, fout, sort_keys=False)
if options.markdown_file:
with open(options.markdown_file, 'w') as fout:
fout.write(markdown_out.getvalue())
if options.html_file:
import markdown
import markdown.extensions.toc
import pygments.formatters
title = 'Beam YAML Transform Index'
md = markdown.Markdown(
extensions=[
markdown.extensions.toc.TocExtension(toc_depth=2),
'codehilite',
])
html = md.convert(markdown_out.getvalue())
pygments_style = pygments.formatters.HtmlFormatter().get_style_defs(
'.codehilite')
extra_style = '''
.nav {
height: 100%;
width: 12em;
position: fixed;
top: 0;
left: 0;
overflow-x: hidden;
}
.nav a {
color: #333;
padding: .2em;
display: block;
text-decoration: none;
}
.nav a:hover {
color: #888;
}
.nav li {
list-style-type: none;
margin: 0;
padding: 0;
}
.content {
margin-left: 12em;
}
h2 {
margin-top: 2em;
}
'''
with open(options.html_file, 'w') as fout:
fout.write(
f'''
<html>
<head>
<title>{title}</title>
<style>
{pygments_style}
{extra_style}
</style>
</head>
<body>
<div class="nav">
{md.toc}
</div>
<div class="content">
<h1>{title}</h1>
{html}
</div>
</body>
</html>
''')
if __name__ == '__main__':
main()