This guide shows you how to cap how often each user can call your AI endpoints, so one client cannot run up your bill or overload your service, in under twenty minutes. You will build a limiter in memory first, return a proper HTTP 429 from FastAPI, upgrade it to a token bucket, and finally move it to Redis so the limit holds across many processes.
Every AI call costs real money and takes real time. Without a per-user cap, a single buggy script in a retry loop, or one customer who decides to scrape, can hand you a four-figure provider bill overnight and slow every other user down. A rate limit is the guardrail that makes the cost of any one account predictable. This guide is part of the SaaS MVP with Python and AI section, which builds the surrounding auth and billing pieces.
Prerequisites
You need Python 3.10 or newer (python --version to check) and the FastAPI service from the SaaS MVP with Python and AI guide, or any FastAPI app where you can identify the caller. Identifying the caller is the job of authentication, covered in Add User Authentication to a Python AI App; a rate limit is per user, so you must know who is asking before you can count them.
Install what the in-memory steps need now, and Redis only when you reach step 4:
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "fastapi>=0.110" "uvicorn[standard]>=0.29" "redis>=5.0"
There are no secret keys in this guide itself, but the service you attach it to loads an OPENAI_API_KEY from a .env file. If you keep one, add .env to your .gitignore immediately so the file with your key is never committed:
echo ".env" >> .gitignore
Step 1: Build an in-memory fixed-window limiter
The simplest limiter is a fixed window: pick a window length (say 60 seconds) and a limit (say 30 requests), and count each user's calls inside the current window. When the clock ticks into the next window, the count resets. It is a handful of lines and needs no extra services, which makes it perfect for a single-process MVP.
# limiter.py
import time
from collections import defaultdict
# user_id -> (window_start_epoch, request_count_in_window)
_windows: dict[str, tuple[float, int]] = defaultdict(lambda: (0.0, 0))
def check_fixed_window(user_id: str, limit: int = 30, window: int = 60) -> int:
"""Return seconds to wait if over limit, else 0 (the call is allowed)."""
now = time.monotonic()
start, count = _windows[user_id]
if now - start >= window: # window expired: start a fresh one
_windows[user_id] = (now, 1)
return 0
if count >= limit: # over the limit inside this window
return int(window - (now - start)) + 1
_windows[user_id] = (start, count + 1)
return 0
The function returns 0 when the call is allowed and the number of seconds to wait when it is not. Using time.monotonic() rather than time.time() matters: a monotonic clock never jumps backward when the system time is adjusted, so your windows cannot be skewed by a clock change. The downside of a fixed window is the boundary burst: a user can make 30 calls in the last second of one window and 30 more in the first second of the next, briefly doing double the rate. Step 3 fixes that.
Step 2: Return HTTP 429 with Retry-After in FastAPI
A limiter is only useful when your endpoint enforces it. Wrap the check in a FastAPI dependency so any protected route runs it automatically. When the user is over the limit, raise an HTTPException with status 429 (the standard "too many requests" code) and a Retry-After header telling the client how many seconds to back off.
# app.py
from fastapi import Depends, FastAPI, Header, HTTPException
from limiter import check_fixed_window
app = FastAPI(title="Rate-limited AI SaaS")
# Stand-in for a real user table; resolve the API key to a user id.
USERS = {"key_pro_xyz": "u_2", "key_free_abc": "u_1"}
def rate_limited_user(x_api_key: str = Header(...)) -> str:
user_id = USERS.get(x_api_key)
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid API key")
retry_after = check_fixed_window(user_id, limit=30, window=60)
if retry_after:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": str(retry_after)},
)
return user_id
@app.post("/v1/generate")
def generate(user_id: str = Depends(rate_limited_user)) -> dict:
# Your paid AI work goes here; it only runs if the limit check passed.
return {"ok": True, "user": user_id}
Run it with uvicorn app:app --reload and send more than 30 POST requests to /v1/generate within a minute using the header X-API-Key: key_pro_xyz. The 31st gets a 429 with a Retry-After header. The check runs as a dependency, so the limit is enforced before the expensive model call ever starts, which is the whole point: you reject over-limit traffic before spending a cent on it. Note that the provider can also return its own 429 from inside the AI call; that is a different limit, and handling it is covered in Fix the 429 Rate-Limit Error in Python.
Step 3: Switch to a token bucket for smoother bursts
A token bucket avoids the boundary-burst problem and feels fairer to users. Picture a bucket that holds up to capacity tokens and refills at a steady rate tokens per second. Each request spends one token; if the bucket is empty, the request is refused. A user who has been quiet builds up a small reserve and can spend a short burst, but their sustained rate can never exceed the refill rate.
# token_bucket.py
import time
# user_id -> (tokens_available, last_refill_timestamp)
_buckets: dict[str, tuple[float, float]] = {}
def check_token_bucket(user_id: str, rate: float = 0.5, capacity: int = 30) -> int:
"""rate = tokens added per second; capacity = max burst. 0 means allowed."""
now = time.monotonic()
tokens, last = _buckets.get(user_id, (float(capacity), now))
tokens = min(capacity, tokens + (now - last) * rate) # refill since last call
if tokens < 1:
_buckets[user_id] = (tokens, now)
return int((1 - tokens) / rate) + 1 # seconds until 1 token
_buckets[user_id] = (tokens - 1, now) # spend one token
return 0
With rate=0.5 and capacity=30, a user gets one new request every two seconds on average but can fire up to 30 in quick succession after a quiet spell. To use it, swap check_fixed_window for check_token_bucket inside rate_limited_user. The refill is computed lazily on each call from the elapsed time, so there is no background timer to run and the bucket state stays tiny. This smooth behaviour is why token buckets are the common choice for public APIs.
Step 4: Move the limiter to Redis for multiple workers
Both versions above keep their state in a Python dict, which lives inside one process. The moment you run multiple uvicorn workers, or more than one server, each has its own dict and its own count, so your effective limit multiplies by the number of workers and stops being enforced. The fix is a shared store. Redis is the standard choice because it is fast, lives outside your app processes, and can expire keys for you, which makes the window reset free.
Start a Redis instance (docker run -p 6379:6379 redis is the quickest), then use an atomic increment so two simultaneous requests cannot both read a stale count:
# redis_limiter.py
import redis
from fastapi import HTTPException
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
def check_redis_window(user_id: str, limit: int = 30, window: int = 60) -> None:
key = f"ratelimit:{user_id}"
count = r.incr(key) # atomic: increments and returns new value
if count == 1:
r.expire(key, window) # first hit starts the window's countdown
if count > limit:
ttl = r.ttl(key) # seconds left until Redis clears the key
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": str(max(ttl, 1))},
)
r.incr is atomic, so even under concurrent load each request gets a unique, correct count. The first request to create the key also sets its expiry, and Redis deletes the key automatically when the window ends, giving you a self-resetting fixed window with no cleanup code. The ttl (time to live) is exactly the seconds left in the window, which is the perfect value for Retry-After. Drop check_redis_window(user_id) into your FastAPI dependency in place of the in-memory check, and the same limit now holds across every worker and server you run.
Parameter quick reference
These are the three knobs you tune for any of the limiters above.
| Parameter | Limit | Window | Scope |
|---|---|---|---|
limit / capacity | Max calls allowed before a 429 | n/a | Per user, per window |
window | n/a | Length of the counting interval in seconds | Resets per window |
rate | Sustained calls per second (token bucket) | Continuous refill | Per user, ongoing |
A common starting point for an AI SaaS is limit=30, window=60 per user on a paid plan, and something far tighter, such as limit=5, window=60, on a free tier to blunt abuse before anyone has paid you anything.
Troubleshooting
- Every worker enforces its own limit, so users get roughly N times the cap — Each process has its own in-memory dict. Move the counter to Redis (step 4) so all workers share one count.
- The
Retry-Afterheader is missing from the 429 response — You raised theHTTPExceptionwithout theheadersargument. Passheaders={"Retry-After": str(seconds)}so clients know when to retry instead of hammering you. - Two concurrent requests both slip past the limit — A read-then-write check has a race between reading the count and writing the new one. Use Redis
incr, which increments atomically in a single operation, so no two requests can read the same stale value. - Redis keys never expire and the limit jams permanently — You called
incrbut never set an expiry, so the count climbs forever. Callr.expire(key, window)when the count is 1, and confirm withredis-cli ttl ratelimit:<user_id>.
When to use this vs. alternatives
- In-memory limiter — Use it for a single-process MVP, local development, or a demo. It is zero extra infrastructure and a few lines of code, but it does not survive a restart and is not shared across workers, so it cannot enforce a true limit once you scale out.
- Redis limiter — Use it the moment you run more than one worker or server, which is most real deployments. It enforces one shared, atomic count everywhere and resets windows for free via key expiry, at the cost of running and connecting to a Redis instance.
- Gateway or platform rate limiting — Tools like an API gateway, a reverse proxy, or your cloud's edge can rate-limit before traffic even reaches your code. Reach for this when you want to shed abusive load early or limit by IP, but keep an application-level limit too, because only your code knows the per-user plan and can return a precise
Retry-After.
Next steps
With per-user limits in place, wire them into the rest of your service: identify callers reliably with Add User Authentication to a Python AI App, then turn the requests you do allow into revenue with Add Stripe Billing to an AI SaaS with Python. When the provider's own throttle trips inside your AI call, handle it cleanly with Fix the 429 Rate-Limit Error in Python.
Back to SaaS MVP with Python and AI.