Source code for apache_beam.io.components.adaptive_throttler

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Utility functions & classes that are _not_ specific to the datastore client.
#
# For internal use only; no backwards-compatibility guarantees.

# pytype: skip-file

import random

from apache_beam.io.components import util


[docs] class AdaptiveThrottler(object): """Implements adaptive throttling. See https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg for a full discussion of the use case and algorithm applied. """ # The target minimum number of requests per samplePeriodMs, even if no # requests succeed. Must be greater than 0, else we could throttle to zero. # Because every decision is probabilistic, there is no guarantee that the # request rate in any given interval will not be zero. (This is the +1 from # the formula in # https://landing.google.com/sre/book/chapters/handling-overload.html ) MIN_REQUESTS = 1 def __init__(self, window_ms, bucket_ms, overload_ratio): """Initializes AdaptiveThrottler. Args: window_ms: int, length of history to consider, in ms, to set throttling. bucket_ms: int, granularity of time buckets that we store data in, in ms. overload_ratio: float, the target ratio between requests sent and successful requests. This is "K" in the formula in https://landing.google.com/sre/book/chapters/handling-overload.html. """ self._all_requests = util.MovingSum(window_ms, bucket_ms) self._successful_requests = util.MovingSum(window_ms, bucket_ms) self._overload_ratio = float(overload_ratio) self._random = random.Random() def _throttling_probability(self, now): if not self._all_requests.has_data(now): return 0 all_requests = self._all_requests.sum(now) successful_requests = self._successful_requests.sum(now) return max( 0, (all_requests - self._overload_ratio * successful_requests) / (all_requests + AdaptiveThrottler.MIN_REQUESTS))
[docs] def throttle_request(self, now): """Determines whether one RPC attempt should be throttled. This should be called once each time the caller intends to send an RPC; if it returns true, drop or delay that request (calling this function again after the delay). Args: now: int, time in ms since the epoch Returns: bool, True if the caller should throttle or delay the request. """ throttling_probability = self._throttling_probability(now) self._all_requests.add(now, 1) return self._random.uniform(0, 1) < throttling_probability
[docs] def successful_request(self, now): """Notifies the throttler of a successful request. Must be called once for each request (for which throttle_request was previously called) that succeeded. Args: now: int, time in ms since the epoch """ self._successful_requests.add(now, 1)