Beam YAML Aggregations

Beam YAML has the ability to do aggregations to group and combine values across records. The is accomplished via the Combine transform type.

For example, one can write

- type: Combine
  config:
    group_by: col1
    combine:
      total:
        value: col2
        fn:
          type: sum

If the function has no configuration requirements, it can be provided directly as a string

- type: Combine
  config:
    group_by: col1
    combine:
      total:
        value: col2
        fn: sum

This can be simplified further if the output field name is the same as the input field name

- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum

One can aggregate over many fields at once

- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

and/or group by more than one field

- type: Combine
  config:
    group_by: [col1, col2]
    combine:
      col3: sum

or none at all (which will result in a global combine with a single output)

- type: Combine
  config:
    group_by: []
    combine:
      col2: sum
      col3: max

Windowed aggregation

As with all transforms, Combine can take a windowing parameter

- type: Combine
  windowing:
    type: fixed
    size: 60s
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

If no windowing specification is provided, it inherits the windowing parameters from upstream, e.g.

- type: WindowInto
  windowing:
    type: fixed
    size: 60s
- type: Combine
  config:
    group_by: col1
    combine:
      col2: sum
      col3: max

is equivalent to the previous example.

Custom aggregation functions

One can use aggregation functions defined in Python by setting the language parameter.

- type: Combine
  config:
    language: python
    group_by: col1
    combine:
      biggest:
        value: "col2 + col2"
        fn:
          type: 'apache_beam.transforms.combiners.TopCombineFn'
          config:
            n: 10

SQL-style aggregations

By setting the language to SQL, one can provide full SQL snippets as the combine fn.

- type: Combine
  config:
    language: sql
    group_by: col1
    combine:
      num_values: "count(*)"
      total: "sum(col2)"

One can of course also use the Sql transform type and provide a query directly.