Rate limiting patterns

Apache Beam is built to maximize throughput by scaling workloads across thousands of workers. However, this massive parallelism requires coordination when pipelines interact with external systems that enforce strict quotas, such as 3rd-party REST APIs, databases, or internal microservices. Without a centralized rate limiting mechanism, independent workers might exceed the capacity of these systems, resulting in service degradation or broad IP blocking.

Centralized Rate Limit Service

The recommended approach for global rate limiting in Beam is using a centralized Rate Limit Service (RLS).

A production-ready Terraform module to deploy this service on GKE is available in the beam repository: envoy-ratelimiter

To deploy the rate-limiting infrastructure on GKE:

  1. Update terraform.tfvars with your project variables to adjust rules and domains.
  2. Run the helper deploy script: ./deploy.sh

This script automates deployment and, upon completion, returns the Internal Load Balancer IP address for your deployment that you will use in your pipeline.


Using RateLimiter

To rate limit requests in your pipeline, you can create a RateLimiter client in your DoFn’s setup phase and acquire permits before making calls in the process phase.

In Java, use the RateLimiter interface and EnvoyRateLimiterFactory implementation to coordinate with the Envoy service. Create RateLimiterOptions with your service address, initialize the client in @Setup using EnvoyRateLimiterFactory, and call rateLimiter.allow(batchSize) in @ProcessElement to acquire a batch of permits.

static class CallExternalServiceFn extends DoFn<String, String> {
  private final String rlsAddress;
  private final String rlsDomain;
  private transient @Nullable RateLimiter rateLimiter;
  private static final Logger LOG = LoggerFactory.getLogger(CallExternalServiceFn.class);

  public CallExternalServiceFn(String rlsAddress, String rlsDomain) {
    this.rlsAddress = rlsAddress;
    this.rlsDomain = rlsDomain;
  }

  @Setup
  public void setup() {
    // Create the RateLimiterOptions.
    RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build();

    // Static RateLimtier with pre-configured domain and descriptors
    RateLimiterFactory factory = new EnvoyRateLimiterFactory(options);
    RateLimiterContext context =
        EnvoyRateLimiterContext.builder()
            .setDomain(rlsDomain)
            .addDescriptor("database", "users")
            .build();
    this.rateLimiter = factory.getLimiter(context);
  }

  @Teardown
  public void teardown() {
    if (rateLimiter != null) {
      try {
        rateLimiter.close();
      } catch (Exception e) {
        LOG.warn("Failed to close RateLimiter", e);
      }
    }
  }

  @ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    String element = c.element();
    try {
      Preconditions.checkNotNull(rateLimiter).allow(1);
    } catch (Exception e) {
      throw new RuntimeException("Failed to acquire rate limit token", e);
    }

    // Simulate external API call
    LOG.info("Processing: {}", element);
    Thread.sleep(100);
    c.output("Processed: " + element);
  }
}

In Python, use the EnvoyRateLimiter and Shared to coordinate a single client instance shared across threads. Initialize client in setup() using shared, and call self.rate_limiter.allow() in process() to acquire rate-limiting permits before executing API calls.

class SampleApiDoFn(beam.DoFn):
  """A DoFn that simulates calling an external API with rate limiting."""
  def __init__(self, rls_address, domain, descriptors):
    self.rls_address = rls_address
    self.domain = domain
    self.descriptors = descriptors
    self._shared = shared.Shared()
    self.rate_limiter = None

  def setup(self):
    # Initialize the rate limiter in setup()
    # We use shared.Shared() to ensure only one RateLimiter instance is created
    # per worker and shared across threads.
    def init_limiter():
      logging.info("Connecting to Envoy RLS at %s", self.rls_address)
      return EnvoyRateLimiter(
          service_address=self.rls_address,
          domain=self.domain,
          descriptors=self.descriptors,
          namespace='example_pipeline')

    self.rate_limiter = self._shared.acquire(init_limiter)

  def process(self, element):
    self.rate_limiter.allow()

    # Process the element mock API call
    logging.info("Processing element: %s", element)
    time.sleep(0.1)
    yield element

If you are using RunInference for remote model inference (e.g., Vertex AI), you can pass the EnvoyRateLimiter directly to the ModelHandler. The model handler coordinates the rate limit internally across your distributed workers.

# Initialize the EnvoyRateLimiter
rate_limiter = EnvoyRateLimiter(
    service_address=known_args.rls_address,
    domain="mongo_cps",
    descriptors=[{
        "database": "users"
    }],
    namespace='example_pipeline')

# Initialize the VertexAIModelHandler with the rate limiter
model_handler = VertexAIModelHandlerJSON(
    endpoint_id=known_args.endpoint_id,
    project=known_args.project,
    location=known_args.location,
    rate_limiter=rate_limiter)

Running Example Pipelines with RateLimiter

Once your Rate Limiter Service is deployed and has an Internal IP, you can run your pipeline pointing to that address.

# Get the IP from your RLS deployment
export RLS_ADDRESS="<INTERNAL_IP>:8081"

./gradlew :examples:java:exec -DmainClass=org.apache.beam.examples.RateLimiterSimple \
  -Dexec.args="--runner=<RUNNER> \
  --rateLimiterAddress=${RLS_ADDRESS} \
  --rateLimiterDomain=mongo_cps"
# Get the IP from your RLS deployment
export RLS_ADDRESS="<INTERNAL_IP>:8081"

python -m apache_beam.examples.rate_limiter_simple \
  --runner=<RUNNER> \
  --rls_address=${RLS_ADDRESS}

AutoScaler Integration

The throttling time and signals from the RateLimiter has to be picked up by the autoscaler. This allows the autoscaler to scale down the workers when the pipeline is being throttled by the external service, preventing unnecessary resource usage.

Dataflow currently supports this AutoScaler integration for Batch RunnerV2. Note that AutoScaler integration for Streaming mode is a known limitation.