#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import operator
import re
from inspect import cleandoc
from inspect import getfullargspec
from inspect import isclass
from inspect import ismodule
from inspect import unwrap
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import pandas as pd
from apache_beam.dataframe import expressions
from apache_beam.dataframe import partitionings
[docs]
class DeferredBase(object):
_pandas_type_map: Dict[Union[type, None], type] = {}
def __init__(self, expr):
self._expr = expr
@classmethod
def _register_for(cls, pandas_type):
def wrapper(deferred_type):
cls._pandas_type_map[pandas_type] = deferred_type
return deferred_type
return wrapper
[docs]
@classmethod
def wrap(cls, expr, split_tuples=True):
proxy_type = type(expr.proxy())
if proxy_type is tuple and split_tuples:
def get(ix):
return expressions.ComputedExpression(
# yapf: disable
'get_%d' % ix,
lambda t: t[ix],
[expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Singleton())
return tuple(cls.wrap(get(ix)) for ix in range(len(expr.proxy())))
elif proxy_type in cls._pandas_type_map:
wrapper_type = cls._pandas_type_map[proxy_type]
else:
if expr.requires_partition_by() != partitionings.Singleton():
raise ValueError(
'Scalar expression %s of type %s partitoned by non-singleton %s' %
(expr, proxy_type, expr.requires_partition_by()))
wrapper_type = _DeferredScalar
return wrapper_type(expr)
def _elementwise(
self, func, name=None, other_args=(), other_kwargs=None, inplace=False):
other_kwargs = other_kwargs or {}
return _elementwise_function(
func, name, inplace=inplace)(self, *other_args, **other_kwargs)
def __reduce__(self):
return UnusableUnpickledDeferredBase, (str(self), )
[docs]
class UnusableUnpickledDeferredBase(object):
"""Placeholder object used to break the transitive pickling chain in case a
DeferredBase accidentially gets pickled (e.g. as part of globals).
Trying to use this object after unpickling is a bug and will result in an
error.
"""
def __init__(self, name):
self._name = name
def __repr__(self):
return 'UnusablePickledDeferredBase(%r)' % self.name
[docs]
class DeferredFrame(DeferredBase):
pass
class _DeferredScalar(DeferredBase):
def apply(self, func, name=None, args=()):
if name is None:
name = func.__name__
with expressions.allow_non_parallel_operations(
all(isinstance(arg, _DeferredScalar) for arg in args) or None):
return DeferredFrame.wrap(
expressions.ComputedExpression(
name,
func, [self._expr] + [arg._expr for arg in args],
requires_partition_by=partitionings.Singleton()))
def __neg__(self):
return self.apply(operator.neg)
def __pos__(self):
return self.apply(operator.pos)
def __invert__(self):
return self.apply(operator.invert)
def __repr__(self):
return f"DeferredScalar[type={type(self._expr.proxy())}]"
def __bool__(self):
# TODO(BEAM-11951): Link to documentation
raise TypeError(
"Testing the truth value of a deferred scalar is not "
"allowed. It's not possible to branch on the result of "
"deferred operations.")
def _scalar_binop(op):
def binop(self, other):
if not isinstance(other, DeferredBase):
return self.apply(lambda left: getattr(left, op)(other), name=op)
elif isinstance(other, _DeferredScalar):
return self.apply(
lambda left, right: getattr(left, op)(right), name=op, args=[other])
else:
return NotImplemented
return binop
for op in ['__add__',
'__sub__',
'__mul__',
'__div__',
'__truediv__',
'__floordiv__',
'__mod__',
'__divmod__',
'__pow__',
'__and__',
'__or__']:
setattr(_DeferredScalar, op, _scalar_binop(op))
DeferredBase._pandas_type_map[None] = _DeferredScalar
[docs]
def name_and_func(method: Union[str, Callable]) -> Tuple[str, Callable]:
"""For the given method name or method, return the method name and the method
itself.
For internal use only. No backwards compatibility guarantees."""
if isinstance(method, str):
method_str = method
func = lambda df, *args, **kwargs: getattr(df, method_str)(*args, **kwargs)
return method, func
else:
return method.__name__, method
def _elementwise_method(
func, name=None, restrictions=None, inplace=False, base=None):
return _proxy_method(
func,
name,
restrictions,
inplace,
base,
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary())
def _proxy_method(
func,
name=None,
restrictions=None,
inplace=False,
base=None,
*,
requires_partition_by: partitionings.Partitioning,
preserves_partition_by: partitionings.Partitioning,
):
if name is None:
name, func = name_and_func(func)
if base is None:
raise ValueError("base is required for _proxy_method")
return _proxy_function(
func,
name,
restrictions,
inplace,
base,
requires_partition_by=requires_partition_by,
preserves_partition_by=preserves_partition_by)
def _elementwise_function(
func, name=None, restrictions=None, inplace=False, base=None):
return _proxy_function(
func,
name,
restrictions,
inplace,
base,
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary())
def _proxy_function(
func: Union[Callable, str],
name: Optional[str] = None,
restrictions: Optional[Dict[str, Union[Any, List[Any]]]] = None,
inplace: bool = False,
base: Optional[type] = None,
*,
requires_partition_by: partitionings.Partitioning,
preserves_partition_by: partitionings.Partitioning,
):
if name is None:
if isinstance(func, str):
name = func
else:
name = func.__name__
if restrictions is None:
restrictions = {}
def wrapper(*args, **kwargs):
for key, values in restrictions.items():
if key in kwargs:
value = kwargs[key]
else:
try:
ix = getfullargspec(func).args.index(key)
except ValueError:
# TODO: fix for delegation?
continue
if len(args) <= ix:
continue
value = args[ix]
if callable(values):
check = values
elif isinstance(values, list):
check = lambda x, values=values: x in values
else:
check = lambda x, value=value: x == value
if not check(value):
raise NotImplementedError(
'%s=%s not supported for %s' % (key, value, name))
deferred_arg_indices = []
deferred_arg_exprs = []
constant_args = [None] * len(args)
from apache_beam.dataframe.frames import _DeferredIndex
for ix, arg in enumerate(args):
if isinstance(arg, DeferredBase):
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(arg._expr)
elif isinstance(arg, _DeferredIndex):
# TODO(robertwb): Consider letting indices pass through as indices.
# This would require updating the partitioning code, as indices don't
# have indices.
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(
expressions.ComputedExpression(
'index_as_series',
lambda ix: ix.index.to_series(), # yapf break
[arg._frame._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Arbitrary()))
elif isinstance(arg, pd.core.generic.NDFrame):
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(expressions.ConstantExpression(arg, arg[0:0]))
else:
constant_args[ix] = arg
deferred_kwarg_keys = []
deferred_kwarg_exprs = []
constant_kwargs = {key: None for key in kwargs}
for key, arg in kwargs.items():
if isinstance(arg, DeferredBase):
deferred_kwarg_keys.append(key)
deferred_kwarg_exprs.append(arg._expr)
elif isinstance(arg, pd.core.generic.NDFrame):
deferred_kwarg_keys.append(key)
deferred_kwarg_exprs.append(
expressions.ConstantExpression(arg, arg[0:0]))
else:
constant_kwargs[key] = arg
deferred_exprs = deferred_arg_exprs + deferred_kwarg_exprs
if inplace:
actual_func = _copy_and_mutate(func)
else:
actual_func = func
def apply(*actual_args):
actual_args, actual_kwargs = (actual_args[:len(deferred_arg_exprs)],
actual_args[len(deferred_arg_exprs):])
full_args = list(constant_args)
for ix, arg in zip(deferred_arg_indices, actual_args):
full_args[ix] = arg
full_kwargs = dict(constant_kwargs)
for key, arg in zip(deferred_kwarg_keys, actual_kwargs):
full_kwargs[key] = arg
return actual_func(*full_args, **full_kwargs)
if (requires_partition_by.is_subpartitioning_of(partitionings.Index()) and
sum(isinstance(arg.proxy(), pd.core.generic.NDFrame)
for arg in deferred_exprs) > 1):
# Implicit join on index if there is more than one indexed input.
actual_requires_partition_by = partitionings.JoinIndex()
else:
actual_requires_partition_by = requires_partition_by
result_expr = expressions.ComputedExpression(
name,
apply,
deferred_exprs,
requires_partition_by=actual_requires_partition_by,
preserves_partition_by=preserves_partition_by)
if inplace:
args[0]._expr = result_expr
else:
return DeferredFrame.wrap(result_expr)
wrapper.__name__ = name
if restrictions:
wrapper.__doc__ = "\n".join(
f"Only {kw}={value!r} is supported"
for (kw, value) in restrictions.items())
if base is not None:
return with_docs_from(base)(wrapper)
else:
return wrapper
def _prettify_pandas_type(pandas_type):
if pandas_type in (pd.DataFrame, pd.Series):
return f'pandas.{pandas_type.__name__}'
elif isclass(pandas_type):
return f'{pandas_type.__module__}.{pandas_type.__name__}'
elif ismodule(pandas_type):
return pandas_type.__name__
else:
raise TypeError(pandas_type)
[docs]
def wont_implement_method(base_type, name, reason=None, explanation=None):
"""Generate a stub method that raises WontImplementError.
Note either reason or explanation must be specified. If both are specified,
explanation is ignored.
Args:
base_type: The pandas type of the method that this is trying to replicate.
name: The name of the method that this is aiming to replicate.
reason: If specified, use data from the corresponding entry in
``_WONT_IMPLEMENT_REASONS`` to generate a helpful exception message
and docstring for the method.
explanation: If specified, use this string as an explanation for why
this operation is not supported when generating an exception message
and docstring.
"""
if reason is not None:
if reason not in _WONT_IMPLEMENT_REASONS:
raise AssertionError(
f"reason must be one of {list(_WONT_IMPLEMENT_REASONS.keys())}, "
f"got {reason!r}")
reason_data = _WONT_IMPLEMENT_REASONS[reason]
elif explanation is not None:
reason_data = {'explanation': explanation}
else:
raise ValueError("One of (reason, explanation) must be specified")
def wrapper(*args, **kwargs):
raise WontImplementError(
f"'{name}' is not yet supported {reason_data['explanation']}",
reason=reason)
wrapper.__name__ = name
wrapper.__doc__ = (
f":meth:`{_prettify_pandas_type(base_type)}.{name}` is not yet supported "
f"in the Beam DataFrame API {reason_data['explanation']}")
if 'url' in reason_data:
wrapper.__doc__ += f"\n\n For more information see {reason_data['url']}."
return wrapper
[docs]
def not_implemented_method(op, issue='20318', base_type=None):
"""Generate a stub method for ``op`` that simply raises a NotImplementedError.
For internal use only. No backwards compatibility guarantees."""
assert base_type is not None, "base_type must be specified"
issue_url = f"https://issues.apache.org/jira/{issue}." if issue.startswith(
"BEAM-") else f"https://github.com/apache/beam/issues/{issue}"
def wrapper(*args, **kwargs):
raise NotImplementedError(
f"{op!r} is not implemented yet. "
f"If support for {op!r} is important to you, please let the Beam "
"community know by writing to user@beam.apache.org "
"(see https://beam.apache.org/community/contact-us/) or commenting on "
f"{issue_url}")
wrapper.__name__ = op
wrapper.__doc__ = (
f":meth:`{_prettify_pandas_type(base_type)}.{op}` is not implemented yet "
"in the Beam DataFrame API.\n\n"
f"If support for {op!r} is important to you, please let the Beam "
"community know by `writing to user@beam.apache.org "
"<https://beam.apache.org/community/contact-us/>`_ or commenting on "
f"`{issue} <{issue_url}>`_.")
return wrapper
def _copy_and_mutate(func):
def wrapper(self, *args, **kwargs):
copy = self.copy()
func(copy, *args, **kwargs)
return copy
return wrapper
[docs]
def maybe_inplace(func):
"""Handles the inplace= kwarg available in many pandas operations.
This decorator produces a new function handles the inplace kwarg. When
`inplace=False`, the new function simply yields the result of `func`
directly.
When `inplace=True`, the output of `func` is used to replace this instances
expression. The result is that any operations applied to this instance after
the inplace operation will refernce the updated expression.
For internal use only. No backwards compatibility guarantees."""
@functools.wraps(func)
def wrapper(self, inplace=False, **kwargs):
result = func(self, **kwargs)
if inplace:
self._expr = result._expr
else:
return result
return wrapper
[docs]
def args_to_kwargs(base_type, removed_method=False, removed_args=None):
"""Convert all args to kwargs before calling the decorated function.
When applied to a function, this decorator creates a new function
that always calls the wrapped function with *only* keyword arguments. It
inspects the argspec for the identically-named method on `base_type` to
determine the name to use for arguments that are converted to keyword
arguments.
For internal use only. No backwards compatibility guarantees.
Args:
base_type: The pandas type of the method that this is trying to replicate.
removed_method: Whether this method has been removed in the running
Pandas version.
removed_args: If not empty, which arguments have been dropped in the
running Pandas version.
"""
def wrap(func):
if removed_method:
# Do no processing, let Beam function itself raise the error if called.
return func
removed_arg_names = removed_args if removed_args is not None else []
# We would need to add position only arguments if they ever become a thing
# in Pandas (as of 2.1 currently they aren't).
base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
base_arg_names = base_arg_spec.args
# Some arguments are keyword only and we still want to check against those.
all_possible_base_arg_names = base_arg_names + base_arg_spec.kwonlyargs
beam_arg_names = getfullargspec(func).args
if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) -
set(removed_arg_names)):
raise TypeError(
f"Beam definition of {func.__name__} has arguments that are not found"
f" in the base version of the function: {not_found}")
@functools.wraps(func)
def wrapper(*args, **kwargs):
if len(args) > len(base_arg_names):
raise TypeError(f"{func.__name__} got too many positioned arguments.")
for name, value in zip(base_arg_names, args):
if name in kwargs:
raise TypeError(
"%s() got multiple values for argument '%s'" %
(func.__name__, name))
kwargs[name] = value
# Still have to populate these for the Beam function signature.
if removed_args:
for name in removed_args:
if name not in kwargs:
kwargs[name] = None
return func(**kwargs)
return wrapper
return wrap
BEAM_SPECIFIC = "Differences from pandas"
SECTION_ORDER = [
'Parameters',
'Returns',
'Raises',
BEAM_SPECIFIC,
'See Also',
'Notes',
'Examples'
]
EXAMPLES_DISCLAIMER = (
"**NOTE:** These examples are pulled directly from the pandas "
"documentation for convenience. Usage of the Beam DataFrame API will look "
"different because it is a deferred API.")
EXAMPLES_DIFFERENCES = EXAMPLES_DISCLAIMER + (
" In addition, some arguments shown here may not be supported, see "
f"**{BEAM_SPECIFIC!r}** for details.")
[docs]
def with_docs_from(base_type, name=None, removed_method=False):
"""Decorator that updates the documentation from the wrapped function to
duplicate the documentation from the identically-named method in `base_type`.
Any docstring on the original function will be included in the new function
under a "Differences from pandas" heading.
removed_method used in cases where a method has been removed in a later
version of Pandas.
"""
def wrap(func):
if removed_method:
func.__doc__ = (
"This method has been removed in the current version of Pandas.")
return func
fn_name = name or func.__name__
orig_doc = getattr(base_type, fn_name).__doc__
if orig_doc is None:
return func
orig_doc = cleandoc(orig_doc)
section_splits = re.split(r'^(.*)$\n^-+$\n', orig_doc, flags=re.MULTILINE)
intro = section_splits[0].strip()
sections = dict(zip(section_splits[1::2], section_splits[2::2]))
beam_has_differences = bool(func.__doc__)
for header, content in sections.items():
content = content.strip()
# Replace references to version numbers so its clear they reference
# *pandas* versions
content = re.sub(r'([Vv]ersion\s+[\d\.]+)', r'pandas \1', content)
if header == "Examples":
content = '\n\n'.join([
(
EXAMPLES_DIFFERENCES
if beam_has_differences else EXAMPLES_DISCLAIMER),
# Indent the examples under a doctest heading,
# add skipif option. This makes sure our doctest
# framework doesn't run these pandas tests.
(".. doctest::\n"
" :skipif: True"),
re.sub(r"^", " ", content, flags=re.MULTILINE),
])
elif "Examples" in content and ">>>" in content:
# some new examples don't have the correct heading
# this catches those examples
split_content = content.split("Examples")
content = '\n\n'.join([
split_content[0],
"Examples\n",
# Indent the code snippet under a doctest heading,
# add skipif option. This makes sure our doctest
# framework doesn't run these pandas tests.
(".. doctest::\n"
" :skipif: True"),
re.sub(r"^", " ", content, flags=re.MULTILINE),
split_content[1]
])
else:
content = content.replace('DataFrame', 'DeferredDataFrame').replace(
'Series', 'DeferredSeries')
sections[header] = content
if beam_has_differences:
sections[BEAM_SPECIFIC] = cleandoc(func.__doc__)
else:
sections[BEAM_SPECIFIC] = (
"This operation has no known divergences from the "
"pandas API.")
def format_section(header):
return '\n'.join([header, ''.join('-' for _ in header), sections[header]])
func.__doc__ = '\n\n'.join([intro] + [
format_section(header) for header in SECTION_ORDER if header in sections
])
return func
return wrap
[docs]
def populate_defaults(base_type, removed_method=False, removed_args=None):
"""Populate default values for keyword arguments in decorated function.
When applied to a function, this decorator creates a new function
with default values for all keyword arguments, based on the default values
for the identically-named method on `base_type`.
For internal use only. No backwards compatibility guarantees.
Args:
base_type: The pandas type of the method that this is trying to replicate.
removed_method: Whether this method has been removed in the running
Pandas version.
removed_args: If not empty, which arguments have been dropped in the
running Pandas version.
"""
def wrap(func):
if removed_method:
return func
base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
if not base_argspec.defaults and not base_argspec.kwonlydefaults:
return func
arg_to_default = {}
if base_argspec.defaults:
arg_to_default.update(
zip(
base_argspec.args[-len(base_argspec.defaults):],
base_argspec.defaults))
if base_argspec.kwonlydefaults:
arg_to_default.update(base_argspec.kwonlydefaults)
unwrapped_func = unwrap(func)
# args that do not have defaults in func, but do have defaults in base
func_argspec = getfullargspec(unwrapped_func)
num_non_defaults = len(func_argspec.args) - len(func_argspec.defaults or ())
defaults_to_populate = set(
func_argspec.args[:num_non_defaults]).intersection(
arg_to_default.keys())
if removed_args:
defaults_to_populate -= set(removed_args)
# In pandas 2, many methods rely on the default copy=None
# to mean that copy is the value of copy_on_write. Since
# copy_on_write will always be true for Beam, just fill it
# in here. In pandas 1, the default was True anyway.
if 'copy' in arg_to_default and arg_to_default['copy'] is None:
arg_to_default['copy'] = True
@functools.wraps(func)
def wrapper(**kwargs):
for name in defaults_to_populate:
if name not in kwargs:
kwargs[name] = arg_to_default[name]
return func(**kwargs)
return wrapper
return wrap
_WONT_IMPLEMENT_REASONS = {
'order-sensitive': {
'explanation': "because it is sensitive to the order of the data.",
'url': 'https://s.apache.org/dataframe-order-sensitive-operations',
},
'non-deferred-columns': {
'explanation': (
"because the columns in the output DataFrame depend "
"on the data."),
'url': 'https://s.apache.org/dataframe-non-deferred-columns',
},
'non-deferred-result': {
'explanation': (
"because it produces an output type that is not "
"deferred."),
'url': 'https://s.apache.org/dataframe-non-deferred-result',
},
'plotting-tools': {
'explanation': "because it is a plotting tool.",
'url': 'https://s.apache.org/dataframe-plotting-tools',
},
'event-time-semantics': {
'explanation': (
"because implementing it would require integrating with Beam "
"event-time semantics"),
'url': 'https://s.apache.org/dataframe-event-time-semantics',
},
'deprecated': {
'explanation': "because it is deprecated in pandas.",
},
'experimental': {
'explanation': "because it is experimental in pandas.",
},
}
[docs]
class WontImplementError(NotImplementedError):
"""An subclass of NotImplementedError to raise indicating that implementing
the given method is not planned.
Raising this error will also prevent this doctests from being validated
when run with the beam dataframe validation doctest runner.
"""
def __init__(self, msg, reason=None):
if reason is not None:
if reason not in _WONT_IMPLEMENT_REASONS:
raise AssertionError(
f"reason must be one of {list(_WONT_IMPLEMENT_REASONS.keys())}, "
f"got {reason!r}")
reason_data = _WONT_IMPLEMENT_REASONS[reason]
if 'url' in reason_data:
msg = f"{msg}\nFor more information see {reason_data['url']}."
super().__init__(msg)