API Rate Limiting Strategies

October 15, 2018

APIs without rate limiting are vulnerable. A single client can monopolize resources, intentionally or accidentally. Denial of service attacks become trivial. Misbehaving integrations can take down your entire service.

Rate limiting protects your API and ensures fair resource allocation. Here’s how to implement it effectively.

Why Rate Limit

Protection from Abuse

Without limits:

Fair Resource Allocation

Limited capacity should be shared fairly:

Cost Management

API calls cost money:

Rate limits prevent unexpected cost spikes.

Rate Limiting Algorithms

Fixed Window

Count requests in fixed time windows:

class FixedWindowRateLimiter:
    def __init__(self, redis, limit, window_seconds):
        self.redis = redis
        self.limit = limit
        self.window_seconds = window_seconds

    def is_allowed(self, client_id):
        window = int(time.time() / self.window_seconds)
        key = f"rate:{client_id}:{window}"

        current = self.redis.incr(key)
        if current == 1:
            self.redis.expire(key, self.window_seconds)

        return current <= self.limit

Pros:

Cons:

Sliding Window Log

Track timestamp of each request:

class SlidingWindowLogRateLimiter:
    def __init__(self, redis, limit, window_seconds):
        self.redis = redis
        self.limit = limit
        self.window_seconds = window_seconds

    def is_allowed(self, client_id):
        now = time.time()
        window_start = now - self.window_seconds
        key = f"rate:{client_id}"

        # Remove old entries
        self.redis.zremrangebyscore(key, 0, window_start)

        # Count current entries
        count = self.redis.zcard(key)

        if count < self.limit:
            self.redis.zadd(key, {str(uuid.uuid4()): now})
            self.redis.expire(key, self.window_seconds)
            return True

        return False

Pros:

Cons:

Sliding Window Counter

Weighted combination of current and previous windows:

class SlidingWindowCounterRateLimiter:
    def __init__(self, redis, limit, window_seconds):
        self.redis = redis
        self.limit = limit
        self.window_seconds = window_seconds

    def is_allowed(self, client_id):
        now = time.time()
        current_window = int(now / self.window_seconds)
        previous_window = current_window - 1
        window_progress = (now % self.window_seconds) / self.window_seconds

        current_key = f"rate:{client_id}:{current_window}"
        previous_key = f"rate:{client_id}:{previous_window}"

        current_count = int(self.redis.get(current_key) or 0)
        previous_count = int(self.redis.get(previous_key) or 0)

        # Weighted count
        effective_count = previous_count * (1 - window_progress) + current_count

        if effective_count < self.limit:
            self.redis.incr(current_key)
            self.redis.expire(current_key, self.window_seconds * 2)
            return True

        return False

Pros:

Cons:

Token Bucket

Tokens accumulate over time, consumed by requests:

class TokenBucketRateLimiter:
    def __init__(self, redis, capacity, refill_rate):
        self.redis = redis
        self.capacity = capacity
        self.refill_rate = refill_rate  # tokens per second

    def is_allowed(self, client_id, tokens=1):
        key = f"bucket:{client_id}"
        now = time.time()

        bucket = self.redis.hgetall(key)
        last_update = float(bucket.get('last_update', now))
        available = float(bucket.get('tokens', self.capacity))

        # Refill tokens
        elapsed = now - last_update
        available = min(self.capacity, available + elapsed * self.refill_rate)

        if available >= tokens:
            available -= tokens
            self.redis.hset(key, mapping={
                'tokens': available,
                'last_update': now
            })
            self.redis.expire(key, int(self.capacity / self.refill_rate) + 60)
            return True

        return False

Pros:

Cons:

Leaky Bucket

Requests processed at constant rate; excess queued or rejected:

class LeakyBucketRateLimiter:
    def __init__(self, redis, capacity, drain_rate):
        self.redis = redis
        self.capacity = capacity
        self.drain_rate = drain_rate  # requests per second

    def is_allowed(self, client_id):
        key = f"leaky:{client_id}"
        now = time.time()

        bucket = self.redis.hgetall(key)
        last_update = float(bucket.get('last_update', now))
        water_level = float(bucket.get('water_level', 0))

        # Drain water
        elapsed = now - last_update
        water_level = max(0, water_level - elapsed * self.drain_rate)

        if water_level < self.capacity:
            water_level += 1
            self.redis.hset(key, mapping={
                'water_level': water_level,
                'last_update': now
            })
            return True

        return False

Pros:

Cons:

Implementation Strategies

Where to Implement

API Gateway:

Application Layer:

Both:

What to Limit By

def get_rate_limit_key(request):
    # By IP (unauthenticated)
    if not request.user:
        return f"ip:{request.remote_addr}"

    # By user (authenticated)
    return f"user:{request.user.id}"

    # By API key
    return f"key:{request.api_key}"

    # By organization
    return f"org:{request.user.organization_id}"

Consider:

Response Headers

Communicate limits to clients:

def add_rate_limit_headers(response, limiter, client_id):
    limit_info = limiter.get_info(client_id)

    response.headers['X-RateLimit-Limit'] = limit_info.limit
    response.headers['X-RateLimit-Remaining'] = limit_info.remaining
    response.headers['X-RateLimit-Reset'] = limit_info.reset_timestamp

    if limit_info.remaining <= 0:
        response.headers['Retry-After'] = limit_info.retry_after

Handling Limit Exceeded

@app.before_request
def check_rate_limit():
    client_id = get_rate_limit_key(request)

    if not rate_limiter.is_allowed(client_id):
        response = jsonify({
            'error': 'rate_limit_exceeded',
            'message': 'Too many requests. Please slow down.',
            'retry_after': rate_limiter.get_retry_after(client_id)
        })
        response.status_code = 429
        add_rate_limit_headers(response, rate_limiter, client_id)
        return response

Always return:

Tiered Limits

Different limits for different plans:

RATE_LIMITS = {
    'free': {'requests_per_minute': 60, 'requests_per_day': 1000},
    'pro': {'requests_per_minute': 600, 'requests_per_day': 50000},
    'enterprise': {'requests_per_minute': 6000, 'requests_per_day': None},
}

def get_rate_limit(user):
    plan = user.subscription_plan
    return RATE_LIMITS.get(plan, RATE_LIMITS['free'])

Endpoint-Specific Limits

Some endpoints need different limits:

ENDPOINT_LIMITS = {
    '/api/search': {'per_minute': 30},  # Expensive
    '/api/users': {'per_minute': 100},   # Standard
    '/api/health': {'per_minute': 1000}, # High limit
}

@app.before_request
def check_rate_limit():
    endpoint_limit = ENDPOINT_LIMITS.get(request.path)
    if endpoint_limit:
        # Apply endpoint-specific limit
        pass

Cost-Based Limiting

Weight requests by cost:

ENDPOINT_COSTS = {
    '/api/simple': 1,
    '/api/search': 10,
    '/api/report': 100,
}

def check_rate_limit(request):
    cost = ENDPOINT_COSTS.get(request.path, 1)
    return token_bucket.is_allowed(client_id, tokens=cost)

Distributed Rate Limiting

Centralized with Redis

Redis provides atomic operations for rate limiting:

# Lua script for atomic token bucket
TOKEN_BUCKET_SCRIPT = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local tokens_requested = tonumber(ARGV[4])

local bucket = redis.call('HGETALL', key)
-- ... implement token bucket logic ...
"""

Local + Sync

For ultra-low latency:

Approximate Distributed

Each instance enforces limit / num_instances:

Key Takeaways

Rate limiting is essential infrastructure. Implement it before you need it, not during an incident.