#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Timestamp utilities.
For internal use only; no backwards-compatibility guarantees.
"""
# pytype: skip-file
# mypy: disallow-untyped-defs
import datetime
import time
from typing import Union
from typing import overload
import dateutil.parser
import pytz
from google.protobuf import duration_pb2
from google.protobuf import timestamp_pb2
from apache_beam.portability import common_urns
# types compatible with Timestamp.of()
TimestampTypes = Union[int, float, 'Timestamp']
# types compatible with Duration.of()
DurationTypes = Union[int, float, 'Duration']
TimestampDurationTypes = Union[int, float, 'Duration', 'Timestamp']
[docs]
class Timestamp(object):
"""Represents a Unix second timestamp with microsecond granularity.
Can be treated in common timestamp arithmetic operations as a numeric type.
Internally stores a time interval as an int of microseconds. This strategy
is necessary since floating point values lose precision when storing values,
especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
to 0.0999999994448885).
"""
def __init__(
self,
seconds: Union[int, float] = 0,
micros: Union[int, float] = 0) -> None:
if not isinstance(seconds, (int, float)):
raise TypeError(
'Cannot interpret %s %s as seconds.' % (seconds, type(seconds)))
if not isinstance(micros, (int, float)):
raise TypeError(
'Cannot interpret %s %s as micros.' % (micros, type(micros)))
self.micros = int(seconds * 1000000) + int(micros)
[docs]
@staticmethod
def of(seconds: TimestampTypes) -> 'Timestamp':
"""Return the Timestamp for the given number of seconds.
If the input is already a Timestamp, the input itself will be returned.
Args:
seconds: Number of seconds as int, float, long, or Timestamp.
Returns:
Corresponding Timestamp object.
"""
if isinstance(seconds, Timestamp):
return seconds
elif isinstance(seconds, (int, float)):
return Timestamp(seconds)
elif isinstance(seconds, datetime.datetime):
return Timestamp.from_utc_datetime(seconds)
else:
raise TypeError(
'Cannot interpret %s %s as Timestamp.' % (seconds, type(seconds)))
[docs]
@staticmethod
def now() -> 'Timestamp':
return Timestamp(seconds=time.time())
@staticmethod
def _epoch_datetime_utc() -> datetime.datetime:
return datetime.datetime.fromtimestamp(0, pytz.utc)
[docs]
@classmethod
def from_utc_datetime(cls, dt: datetime.datetime) -> 'Timestamp':
"""Create a ``Timestamp`` instance from a ``datetime.datetime`` object.
Args:
dt: A ``datetime.datetime`` object in UTC (offset-aware).
"""
if dt.tzinfo is None:
raise ValueError(
"dt has no timezone info " +
"(https://docs.python.org/3/library/datetime.html" +
"#aware-and-naive-objects): %s" % dt)
if dt.tzinfo != pytz.utc and dt.tzinfo != datetime.timezone.utc:
raise ValueError('dt not in UTC: %s' % dt)
duration = dt - cls._epoch_datetime_utc()
return Timestamp(duration.total_seconds())
[docs]
@classmethod
def from_rfc3339(cls, rfc3339: str) -> 'Timestamp':
"""Create a ``Timestamp`` instance from an RFC 3339 compliant string.
.. note::
All timezones are implicitly converted to UTC.
Args:
rfc3339: String in RFC 3339 form.
"""
try:
dt = dateutil.parser.isoparse(rfc3339).astimezone(pytz.UTC)
except ValueError as e:
raise ValueError(
"Could not parse RFC 3339 string '{}' due to error: '{}'.".format(
rfc3339, e))
return cls.from_utc_datetime(dt)
[docs]
def seconds(self) -> int:
"""Returns the timestamp in seconds."""
return self.micros // 1000000
[docs]
def predecessor(self) -> 'Timestamp':
"""Returns the largest timestamp smaller than self."""
return Timestamp(micros=self.micros - 1)
[docs]
def successor(self) -> 'Timestamp':
"""Returns the smallest timestamp larger than self."""
return Timestamp(micros=self.micros + 1)
def __repr__(self) -> str:
micros = self.micros
sign = ''
if micros < 0:
sign = '-'
micros = -micros
int_part = micros // 1000000
frac_part = micros % 1000000
if frac_part:
return 'Timestamp(%s%d.%06d)' % (sign, int_part, frac_part)
return 'Timestamp(%s%d)' % (sign, int_part)
[docs]
def to_utc_datetime(self, has_tz: bool = False) -> datetime.datetime:
"""Returns a ``datetime.datetime`` object of UTC for this Timestamp.
Note that this method returns a ``datetime.datetime`` object without a
timezone info by default, as builtin `datetime.datetime.utcnow` method. If
this is used as part of the processed data, one should set has_tz=True to
avoid offset due to default timezone mismatch.
Args:
has_tz: whether the timezone info is attached, default to False.
Returns:
a ``datetime.datetime`` object of UTC for this Timestamp.
"""
# We can't easily construct a datetime object from microseconds, so we
# create one at the epoch and add an appropriate timedelta interval.
epoch = self._epoch_datetime_utc()
if not has_tz:
epoch = epoch.replace(tzinfo=None)
return epoch + datetime.timedelta(microseconds=self.micros)
[docs]
def to_rfc3339(self) -> str:
# Append 'Z' for UTC timezone.
return self.to_utc_datetime().isoformat() + 'Z'
[docs]
def to_proto(self) -> timestamp_pb2.Timestamp:
"""Returns the `google.protobuf.timestamp_pb2` representation."""
secs = self.micros // 1000000
nanos = (self.micros % 1000000) * 1000
return timestamp_pb2.Timestamp(seconds=secs, nanos=nanos)
[docs]
@staticmethod
def from_proto(timestamp_proto: timestamp_pb2.Timestamp) -> 'Timestamp':
"""Creates a Timestamp from a `google.protobuf.timestamp_pb2`.
Note that the google has a sub-second resolution of nanoseconds whereas this
class has a resolution of microsends. This class will truncate the
nanosecond resolution down to the microsecond.
"""
if timestamp_proto.nanos % 1000 != 0:
# TODO(https://github.com/apache/beam/issues/19922): Better define
# timestamps.
raise ValueError(
"Cannot convert from nanoseconds to microseconds " +
"because this loses precision. Please make sure that " +
"this is the correct behavior you want and manually " +
"truncate the precision to the nearest microseconds. " +
"See [https://github.com/apache/beam/issues/19922] for " +
"more information.")
return Timestamp(
seconds=timestamp_proto.seconds, micros=timestamp_proto.nanos // 1000)
def __float__(self) -> float:
# Note that the returned value may have lost precision.
return self.micros / 1000000
def __int__(self) -> int:
# Note that the returned value may have lost precision.
return self.micros // 1000000
def __eq__(self, other: object) -> bool:
# Allow comparisons between Duration and Timestamp values.
if isinstance(other, (Duration, Timestamp)):
return self.micros == other.micros
elif isinstance(other, (int, float)):
return self.micros == Timestamp.of(other).micros
else:
# Support equality with other types
return NotImplemented
def __lt__(self, other: TimestampDurationTypes) -> bool:
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Duration):
other = Timestamp.of(other)
return self.micros < other.micros
def __gt__(self, other: TimestampDurationTypes) -> bool:
return not (self < other or self == other)
def __le__(self, other: TimestampDurationTypes) -> bool:
return self < other or self == other
def __ge__(self, other: TimestampDurationTypes) -> bool:
return not self < other
def __hash__(self) -> int:
return hash(self.micros)
def __add__(self, other: DurationTypes) -> 'Timestamp':
other = Duration.of(other)
return Timestamp(micros=self.micros + other.micros)
def __radd__(self, other: DurationTypes) -> 'Timestamp':
return self + other
@overload
def __sub__(self, other: DurationTypes) -> 'Timestamp':
pass
@overload
def __sub__(self, other: 'Timestamp') -> 'Duration':
pass
def __sub__(
self, other: Union[DurationTypes,
'Timestamp']) -> Union['Timestamp', 'Duration']:
if isinstance(other, Timestamp):
return Duration(micros=self.micros - other.micros)
other = Duration.of(other)
return Timestamp(micros=self.micros - other.micros)
def __mod__(self, other: DurationTypes) -> 'Duration':
other = Duration.of(other)
return Duration(micros=self.micros % other.micros)
MIN_TIMESTAMP = Timestamp(
micros=int(common_urns.constants.MIN_TIMESTAMP_MILLIS.constant) * 1000)
MAX_TIMESTAMP = Timestamp(
micros=int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) * 1000)
[docs]
class Duration(object):
"""Represents a second duration with microsecond granularity.
Can be treated in common arithmetic operations as a numeric type.
Internally stores a time interval as an int of microseconds. This strategy
is necessary since floating point values lose precision when storing values,
especially after arithmetic operations (for example, 10000000 % 0.1 evaluates
to 0.0999999994448885).
"""
def __init__(
self,
seconds: Union[int, float] = 0,
micros: Union[int, float] = 0) -> None:
self.micros = int(seconds * 1000000) + int(micros)
[docs]
@staticmethod
def of(seconds: DurationTypes) -> 'Duration':
"""Return the Duration for the given number of seconds since Unix epoch.
If the input is already a Duration, the input itself will be returned.
Args:
seconds: Number of seconds as int, float or Duration.
Returns:
Corresponding Duration object.
"""
if isinstance(seconds, Timestamp):
raise TypeError('Cannot interpret %s as Duration.' % seconds)
if isinstance(seconds, Duration):
return seconds
return Duration(seconds)
[docs]
def to_proto(self) -> duration_pb2.Duration:
"""Returns the `google.protobuf.duration_pb2` representation."""
secs = self.micros // 1000000
nanos = (self.micros % 1000000) * 1000
return duration_pb2.Duration(seconds=secs, nanos=nanos)
[docs]
@staticmethod
def from_proto(duration_proto: duration_pb2.Duration) -> 'Duration':
"""Creates a Duration from a `google.protobuf.duration_pb2`.
Note that the google has a sub-second resolution of nanoseconds whereas this
class has a resolution of microsends. This class will truncate the
nanosecond resolution down to the microsecond.
"""
if duration_proto.nanos % 1000 != 0:
# TODO(https://github.com/apache/beam/issues/19922): Better define
# durations.
raise ValueError(
"Cannot convert from nanoseconds to microseconds " +
"because this loses precision. Please make sure that " +
"this is the correct behavior you want and manually " +
"truncate the precision to the nearest microseconds. " +
"See [https://github.com/apache/beam/issues/19922] for " +
"more information.")
return Duration(
seconds=duration_proto.seconds, micros=duration_proto.nanos // 1000)
def __repr__(self) -> str:
micros = self.micros
sign = ''
if micros < 0:
sign = '-'
micros = -micros
int_part = micros // 1000000
frac_part = micros % 1000000
if frac_part:
return 'Duration(%s%d.%06d)' % (sign, int_part, frac_part)
return 'Duration(%s%d)' % (sign, int_part)
def __float__(self) -> float:
# Note that the returned value may have lost precision.
return self.micros / 1000000
def __eq__(self, other: object) -> bool:
# Allow comparisons between Duration and Timestamp values.
if isinstance(other, (Duration, Timestamp)):
return self.micros == other.micros
elif isinstance(other, (int, float)):
return self.micros == Duration.of(other).micros
else:
# Support equality with other types
return NotImplemented
def __lt__(self, other: TimestampDurationTypes) -> bool:
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Timestamp):
other = Duration.of(other)
return self.micros < other.micros
def __gt__(self, other: TimestampDurationTypes) -> bool:
return not (self < other or self == other)
def __le__(self, other: TimestampDurationTypes) -> bool:
return self < other or self == other
def __ge__(self, other: TimestampDurationTypes) -> bool:
return not self < other
def __hash__(self) -> int:
return hash(self.micros)
def __neg__(self) -> 'Duration':
return Duration(micros=-self.micros)
def __add__(self, other: DurationTypes) -> 'Duration':
if isinstance(other, Timestamp):
# defer to Timestamp.__add__
return NotImplemented
other = Duration.of(other)
return Duration(micros=self.micros + other.micros)
def __radd__(self, other: DurationTypes) -> 'Duration':
return self + other
def __sub__(self, other: DurationTypes) -> 'Duration':
other = Duration.of(other)
return Duration(micros=self.micros - other.micros)
def __rsub__(self, other: DurationTypes) -> 'Duration':
return -(self - other)
def __mul__(self, other: DurationTypes) -> 'Duration':
other = Duration.of(other)
return Duration(micros=self.micros * other.micros // 1000000)
def __rmul__(self, other: DurationTypes) -> 'Duration':
return self * other
def __mod__(self, other: DurationTypes) -> 'Duration':
other = Duration.of(other)
return Duration(micros=self.micros % other.micros)
# The minimum granularity / interval expressible in a Timestamp / Duration
# object.
TIME_GRANULARITY = Duration(micros=1)