#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Helper functions for easier logging.
This module provides a few convenient logging methods, some of which
were adopted from
https://github.com/abseil/abseil-py/blob/master/absl/logging/__init__.py
in
https://github.com/facebookresearch/detectron2/blob/main/detectron2/utils/logger.py
"""
import logging
import os
import sys
import time
from collections import Counter
from types import FrameType
from typing import Optional
from typing import Union
def _find_caller() -> tuple[str, tuple]:
"""
Returns:
str: module name of the caller
tuple: a hashable key to be used to identify different callers
"""
frame: Optional[FrameType] = sys._getframe(2)
while frame:
code = frame.f_code
if os.path.join("utils", "logger.") not in code.co_filename:
mod_name = frame.f_globals["__name__"]
if mod_name == "__main__":
mod_name = "apache_beam"
return mod_name, (code.co_filename, frame.f_lineno, code.co_name)
frame = frame.f_back
# To appease mypy. Code returns earlier in practice.
return "unknown", ("unknown", 0, "unknown")
_LOG_COUNTER = Counter()
_LOG_TIMER = {}
[docs]
def log_first_n(
lvl: int,
msg: str,
*args,
n: int = 1,
name: Optional[str] = None,
key: Union[str, tuple[str]] = "caller") -> None:
"""
Log only for the first n times.
Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module
by default.
key (str or tuple[str]): the string(s) can be one of "caller" or
"message", which defines how to identify duplicated logs.
For example, if called with `n=1, key="caller"`, this function
will only log the first call from the same caller, regardless of
the message content.
If called with `n=1, key="message"`, this function will log the
same content only once, even if they are called from different
places. If called with `n=1, key=("caller", "message")`, this
function will not log only if the same caller has logged the same
message before.
"""
key_tuple = (key, ) if isinstance(key, str) else key
assert len(key_tuple) > 0
caller_module, caller_key = _find_caller()
hash_key: tuple = ()
if "caller" in key_tuple:
hash_key = hash_key + caller_key
if "message" in key_tuple:
hash_key = hash_key + (msg, )
_LOG_COUNTER[hash_key] += 1
if _LOG_COUNTER[hash_key] <= n:
logging.getLogger(name or caller_module).log(lvl, msg, *args)
[docs]
def log_every_n(
lvl: int, msg: str, *args, n: int = 1, name: Optional[str] = None) -> None:
"""
Log once per n times.
Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module
by default.
"""
caller_module, key = _find_caller()
_LOG_COUNTER[key] += 1
if n == 1 or _LOG_COUNTER[key] % n == 1:
logging.getLogger(name or caller_module).log(lvl, msg, *args)
[docs]
def log_every_n_seconds(
lvl: int, msg: str, *args, n: int = 1, name: Optional[str] = None) -> None:
"""
Log no more than once per n seconds.
Args:
lvl (int): the logging level
msg (str):
n (int):
name (str): name of the logger to use. Will use the caller's module
by default.
"""
caller_module, key = _find_caller()
last_logged = _LOG_TIMER.get(key, None)
current_time = time.time()
if last_logged is None or current_time - last_logged >= n:
logging.getLogger(name or caller_module).log(lvl, msg, *args)
_LOG_TIMER[key] = current_time