DataFrame API Preview now Available!

We’re excited to announce that a preview of the Beam Python SDK’s new DataFrame API is now available in Beam 2.26.0. Much like SqlTransform (Java, Python), the DataFrame API gives Beam users a way to express complex relational logic much more concisely than previously possible.

A more expressive API

Beam’s new DataFrame API aims to be compatible with the well known Pandas DataFrame API, with a few caveats detailed below. With this new API a simple pipeline that reads NYC taxiride data from a CSV, performs a grouped aggregation, and writes the output to CSV, can be expressed very concisely:

from apache_beam.dataframe.io import read_csv

with beam.Pipeline() as p:
  df = p | read_csv("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
                    usecols=['passenger_count' , 'DOLocationID'])
  # Count the number of passengers dropped off per LocationID
  agg = df.groupby('DOLocationID').sum()
  agg.to_csv(output)

Compare this to the same logic implemented as a conventional Beam python pipeline with a CombinePerKey:

with beam.Pipeline() as p:
  (p | beam.io.ReadFromText("gs://apache-beam-samples/nyc_taxi/2019/*.csv",
                            skip_header_lines=1)
     | beam.Map(lambda line: line.split(','))
     # Parse CSV, create key - value pairs
     | beam.Map(lambda splits: (int(splits[8] or 0),  # DOLocationID
                                int(splits[3] or 0))) # passenger_count
     # Sum values per key
     | beam.CombinePerKey(sum)
     | beam.MapTuple(lambda loc_id, pc: f'{loc_id},{pc}')
     | beam.io.WriteToText(known_args.output))

The DataFrame example is much easier to quickly inspect and understand, as it allows you to concisely express grouped aggregations without using the low-level CombinePerKey.

In addition to being more expressive, a pipeline written with the DataFrame API can often be more efficient than a conventional Beam pipeline. This is because the DataFrame API defers to the very efficient, columnar Pandas implementation as much as possible.

DataFrames as a DSL

You may already be aware of Beam SQL, which is a Domain-Specific Language (DSL) built with Beam’s Java SDK. SQL is considered a DSL because it’s possible to express a full pipeline, including IOs and complex operations, entirely with SQL. 

Similarly, the DataFrame API is a DSL built with the Python SDK. You can see that the above example is written without traditional Beam constructs like IOs, ParDo, or CombinePerKey. In fact the only traditional Beam type is the Pipeline instance! Otherwise this pipeline is written completely using the DataFrame API. This is possible because the DataFrame API doesn’t just implement Pandas' computation operations, it also includes IOs based on the Pandas native implementations (pd.read_{csv,parquet,...} and pd.DataFrame.to_{csv,parquet,...}).

Like SQL, it’s also possible to embed the DataFrame API into a larger pipeline by using schemas. A schema-aware PCollection can be converted to a DataFrame, processed, and the result converted back to another schema-aware PCollection. For example, if you wanted to use traditional Beam IOs rather than one of the DataFrame IOs you could rewrite the above pipeline like this:

from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

with beam.Pipeline() as p:
  ...
  schema_pc = (p | beam.ReadFromText(..)
                 # Use beam.Select to assign a schema
                 | beam.Select(DOLocationID=lambda line: int(...),
                               passenger_count=lambda line: int(...)))
  df = to_dataframe(schema_pc)
  agg = df.groupby('DOLocationID').sum()
  agg_pc = to_pcollection(pc)

  # agg_pc has a schema based on the structure of agg
  (agg_pc | beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
          | beam.WriteToText(..))

It’s also possible to use the DataFrame API by passing a function to DataframeTransform:

from apache_beam.dataframe.transforms import DataframeTransform

with beam.Pipeline() as p:
  ...
  | beam.Select(DOLocationID=lambda line: int(..),
                passenger_count=lambda line: int(..))
  | DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
  | beam.Map(lambda row: f'{row.DOLocationID},{row.passenger_count}')
  ...

Caveats

As hinted above, there are some differences between Beam’s DataFrame API and the Pandas API. The most significant difference is that the Beam DataFrame API is deferred, just like the rest of the Beam API. This means that you can’t print() a DataFrame instance in order to inspect the data, because we haven’t computed the data yet! The computation doesn’t take place until the pipeline is run(). Before that, we only know about the shape/schema of the result (i.e. the names and types of the columns), and not the result itself.

There are a few common exceptions you will likely see when attempting to use certain Pandas operations:

  • NotImplementedError: Indicates this is an operation or argument that we haven’t had time to look at yet. We’ve tried to make as many Pandas operations as possible available in the Preview offering of this new API, but there’s still a long tail of operations to go.
  • WontImplementError: Indicates this is an operation or argument we do not intend to support in the near-term because it’s incompatible with the Beam model. The largest class of operations that raise this error are those that are order sensitive (e.g. shift, cummax, cummin, head, tail, etc..). These cannot be trivially mapped to Beam because PCollections, representing distributed datasets, are unordered. Note that even some of these operations may get implemented in the future - we actually have some ideas for how we might support order sensitive operations - but it’s a ways off.

Finally, it’s important to note that this is a preview of a new feature that will get hardened over the next few Beam releases. We would love for you to try it out now and give us some feedback, but we do not yet recommend it for use in production workloads.

How to get involved

The easiest way to get involved with this effort is to try out DataFrames and let us know what you think! You can send questions to user@beam.apache.org, or file bug reports and feature requests in jira. In particular, it would be really helpful to know if there’s an operation we haven’t implemented yet that you’d find useful, so that we can prioritize it.

If you’d like to learn more about how the DataFrame API works under the hood and get involved with the development we recommend you take a look at the design doc and our Beam summit presentation. From there the best way to help is to knock out some of those not implemented operations. We’re coordinating that work in BEAM-9547.