Кэширование: от HTTP-заголовков до distributed cache
«There are only two hard things in Computer Science: cache invalidation and naming things» — Фил Карлтон. Фраза уже стала мемом, но проблема не исчезла. Кэши ускоряют системы на порядки, но неправильная инвалидация превращает их в источник багов, которые воспроизводятся только на production и только у части пользователей.
Кэширование — это всегда компромисс между свежестью данных и производительностью. Понять, где именно поставить кэш и как его инвалидировать, важнее, чем знать API конкретной библиотеки. Начнём с самого близкого к пользователю уровня и пойдём вглубь стека.
HTTP caching: Cache-Control и условные запросы
HTTP-кэширование встроено в протокол. Правильно выставленные заголовки позволяют браузерам и CDN хранить ответы без изменений в коде приложения. Это самый дешёвый вид кэша — никакой инфраструктуры, просто заголовки.
Cache-Control — главный директивный заголовок:
# Статические ресурсы (JS, CSS, изображения с хэшем в имени):
# Кэшировать на год, immutable — браузер не делает revalidation
Cache-Control: public, max-age=31536000, immutable
# HTML-страницы (актуальность важна):
# Кэшировать 10 минут, потом revalidate
Cache-Control: public, max-age=600, stale-while-revalidate=60
# Приватные данные (пользовательский профиль, корзина):
# Только в браузерном кэше, не в CDN, 5 минут
Cache-Control: private, max-age=300
# Никогда не кэшировать (API с реал-тайм данными, health-эндпоинты):
Cache-Control: no-store
# Кэшировать, но всегда revalidate перед использованием:
Cache-Control: no-cache # misleading name: это НЕ "не кэшировать"
# stale-while-revalidate: отдавать устаревший кэш пока обновляем в фоне
Cache-Control: public, max-age=60, stale-while-revalidate=600
# stale-if-error: использовать устаревший кэш если origin недоступен
Cache-Control: public, max-age=300, stale-if-error=86400
ETag и условные запросы — механизм revalidation без передачи полного тела ответа:
# FastAPI: ETag-based caching
import hashlib
from fastapi import Request, Response
from fastapi.responses import JSONResponse
async def get_article(article_id: int, request: Request, response: Response):
article = await db.get_article(article_id)
if not article:
raise HTTPException(status_code=404)
# Вычисляем ETag из содержимого (или времени обновления)
content = article.model_dump_json()
etag = f'"{hashlib.md5(content.encode()).hexdigest()}"'
# Проверяем If-None-Match от клиента
if request.headers.get("If-None-Match") == etag:
# Контент не изменился — отвечаем 304 без тела
return Response(status_code=304, headers={"ETag": etag})
response.headers["ETag"] = etag
response.headers["Cache-Control"] = "public, max-age=300"
response.headers["Last-Modified"] = article.updated_at.strftime(
"%a, %d %b %Y %H:%M:%S GMT"
)
return JSONResponse(content=article.model_dump(), headers=response.headers)
# Middleware для автоматических ETag:
# (применять для GET-ответов с детерминированным контентом)
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
import hashlib
class ETagMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
if request.method != "GET" or response.status_code != 200:
return response
body = b""
async for chunk in response.body_iterator:
body += chunk
etag = f'"{hashlib.md5(body).hexdigest()}"'
if request.headers.get("If-None-Match") == etag:
return Response(status_code=304, headers={"ETag": etag})
from starlette.responses import Response as SR
return SR(
content=body,
status_code=response.status_code,
headers={**dict(response.headers), "ETag": etag},
media_type=response.media_type,
)
CDN caching: edge caching и стратегии purge
CDN (Content Delivery Network) кэширует ответы на edge-серверах, географически близких к пользователям. Запрос из Алматы к origin-серверу в Лондоне — 200+ мс. Тот же запрос к CDN edge во Франкфурте — 20–30 мс. Но CDN добавляет слой сложности: теперь инвалидация означает очистку кэша не на одном сервере, а на десятках edge-точек по всему миру.
# nginx: управление CDN-кэшированием через заголовки
server {
location /api/articles/ {
proxy_pass http://backend;
# Vary: Authorization гарантирует раздельное кэширование
# для аутентифицированных и анонимных пользователей
add_header Vary "Accept-Encoding, Authorization";
# Surrogate-Control для CDN (Fastly, Varnish понимают этот заголовок)
# Браузер получает Cache-Control, CDN использует Surrogate-Control
add_header Surrogate-Control "max-age=3600";
add_header Cache-Control "public, max-age=60, s-maxage=3600";
}
location /api/articles/*/comments {
proxy_pass http://backend;
# Комментарии меняются чаще — короче TTL
add_header Cache-Control "public, max-age=30, stale-while-revalidate=60";
}
location /static/ {
alias /var/www/static/;
expires 1y;
add_header Cache-Control "public, max-age=31536000, immutable";
add_header Vary "Accept-Encoding";
}
}
Cache tagging и purge по тегам — ключевой инструмент точечной инвалидации в CDN:
# Fastly: Surrogate-Key header для тегирования
# При обновлении статьи — инвалидируем все связанные кэши
import httpx
async def get_article_with_cache_tags(article_id: int, response: Response):
article = await db.get_article(article_id)
category = article.category
# Тегируем ответ — CDN запомнит, что эта страница связана с этими тегами
response.headers["Surrogate-Key"] = (
f"article-{article_id} "
f"category-{category} "
f"author-{article.author_id}"
)
return article
async def on_article_updated(article_id: int, category: str):
"""При обновлении статьи — инвалидируем все связанные кэши на CDN."""
async with httpx.AsyncClient() as client:
# Fastly API: purge по Surrogate-Key
await client.post(
f"https://api.fastly.com/service/{FASTLY_SERVICE_ID}/purge",
headers={
"Fastly-Key": FASTLY_API_KEY,
"Surrogate-Key": f"article-{article_id}",
}
)
async def on_category_updated(category: str):
"""При изменении категории — инвалидируем все статьи категории."""
async with httpx.AsyncClient() as client:
await client.post(
f"https://api.fastly.com/service/{FASTLY_SERVICE_ID}/purge",
headers={
"Fastly-Key": FASTLY_API_KEY,
"Surrogate-Key": f"category-{category}",
}
)
stale-while-revalidate — элегантный паттерн для балансирования свежести и производительности. CDN отдаёт устаревший кэш немедленно, одновременно обновляя его в фоне. Пользователь не ждёт — максимум один запрос за TTL уходит на origin.
Application-level cache: in-memory, Redis, Memcached
In-memory кэш (LRU) — самый быстрый, живёт в памяти процесса. Подходит для небольших неизменных данных: конфигурации, lookup-таблицы, результаты дорогих вычислений.
# Python: functools.lru_cache и cachetools
from functools import lru_cache
from cachetools import TTLCache, cached
import threading
# lru_cache: простое кэширование без TTL (живёт до рестарта процесса)
@lru_cache(maxsize=1000)
def get_country_by_code(code: str) -> dict:
# Дорогой lookup, который делается редко
return db.query_sync(f"SELECT * FROM countries WHERE code = '{code}'")
# TTLCache: LRU + TTL (элементы вытесняются по времени)
_config_cache = TTLCache(maxsize=100, ttl=300) # 5 минут
_lock = threading.Lock()
@cached(_config_cache, lock=_lock)
def get_feature_flags() -> dict:
return db.query_sync("SELECT key, value FROM feature_flags")
# asyncio-совместимый кэш:
from asyncio import Lock as AsyncLock
_async_cache: dict[str, tuple] = {} # key → (value, expires_at)
_async_lock = AsyncLock()
async def get_cached_or_fetch(key: str, fetch_fn, ttl: int = 300):
import time
async with _async_lock:
if key in _async_cache:
value, expires_at = _async_cache[key]
if time.monotonic() < expires_at:
return value
value = await fetch_fn()
_async_cache[key] = (value, time.monotonic() + ttl)
return value
Ограничения in-memory: данные не переживают рестарт, не разделяются между инстансами (проблема при горизонтальном скейлинге). Для production-систем с несколькими инстансами — только Redis.
Redis паттерны: cache-aside, write-through, write-behind
Cache-Aside (Lazy Loading) — самый распространённый паттерн. Приложение сначала проверяет кэш, при промахе загружает из БД и кладёт в кэш.
# Python: Redis cache-aside с aiocache
import json
import redis.asyncio as aioredis
from typing import TypeVar, Callable, Any
redis = aioredis.from_url("redis://localhost:6379", decode_responses=True)
T = TypeVar("T")
async def cache_aside(
key: str,
fetch_fn: Callable[[], Any],
ttl: int = 3600,
serializer=json,
) -> Any:
"""Универсальная cache-aside функция."""
# 1. Пробуем кэш
cached = await redis.get(key)
if cached is not None:
return serializer.loads(cached)
# 2. Cache miss: идём в источник
value = await fetch_fn()
# 3. Кладём в кэш
if value is not None:
await redis.setex(key, ttl, serializer.dumps(value))
return value
# Использование:
async def get_user_profile(user_id: int) -> dict:
return await cache_aside(
key=f"user:{user_id}:profile",
fetch_fn=lambda: db.get_user(user_id),
ttl=1800, # 30 минут
)
async def invalidate_user_cache(user_id: int) -> None:
"""Инвалидация после обновления пользователя."""
keys = await redis.keys(f"user:{user_id}:*")
if keys:
await redis.delete(*keys)
Write-Through — при записи в БД одновременно обновляется кэш. Данные всегда актуальны, но каждая запись дороже.
async def update_user_profile(user_id: int, data: dict) -> dict:
# Обновляем БД
updated = await db.update_user(user_id, data)
# Сразу обновляем кэш (write-through)
await redis.setex(
f"user:{user_id}:profile",
1800,
json.dumps(updated)
)
return updated
Write-Behind (Write-Back) — запись идёт сначала в кэш, а в БД — асинхронно, с задержкой. Максимальная производительность записи, но риск потери данных при падении кэша. Подходит для некритичных данных: счётчики просмотров, аналитика, лайки.
# Write-behind для счётчика просмотров:
async def increment_view_count(article_id: int) -> None:
# Инкрементируем в Redis (fast, atomic)
count = await redis.incr(f"article:{article_id}:views")
# Каждые 100 просмотров — flush в PostgreSQL
if count % 100 == 0:
await db.execute(
"UPDATE articles SET view_count = view_count + 100 WHERE id = :id",
{"id": article_id}
)
# Или через периодическую задачу (ARQ worker раз в минуту):
# SELECT все ключи article:*:views, batch UPDATE в PostgreSQL, DEL ключи
Стратегии инвалидации: TTL, event-driven, versioned keys
Инвалидация — сложнейшая часть кэширования. Три основных подхода с разными трейдоффами:
TTL (Time-To-Live) — самый простой. Данные живут N секунд и вытесняются автоматически. Минус: может отдавать устаревшие данные до истечения TTL.
# Выбор TTL зависит от допустимой "устарелости":
TTL_STATIC_DATA = 86400 # 24 часа — справочники, которые меняются редко
TTL_CATALOG = 3600 # 1 час — каталог товаров
TTL_USER_PROFILE = 1800 # 30 минут — профиль пользователя
TTL_FEED = 300 # 5 минут — лента новостей
TTL_REALTIME = 30 # 30 секунд — курсы, цены
TTL_RATE_LIMIT_WINDOW = 60 # 1 минута — окно rate limiting
Event-driven invalidation — инвалидация при событии изменения данных. Самый точный подход: кэш устарел ровно в момент изменения.
# Event-driven через database triggers или application events:
# Паттерн: при любом обновлении модели — публикуем событие
from dataclasses import dataclass
from typing import Callable
@dataclass
class CacheInvalidationEvent:
entity: str
entity_id: int
action: str # created, updated, deleted
# Event bus (упрощённый):
_handlers: dict[str, list[Callable]] = {}
def on_invalidation(entity: str):
def decorator(fn):
_handlers.setdefault(entity, []).append(fn)
return fn
return decorator
async def emit_invalidation(event: CacheInvalidationEvent):
for handler in _handlers.get(event.entity, []):
await handler(event)
# Регистрируем обработчики:
@on_invalidation("article")
async def invalidate_article_caches(event: CacheInvalidationEvent):
article_id = event.entity_id
keys_to_delete = [
f"article:{article_id}",
f"article:{article_id}:comments",
f"article:{article_id}:related",
]
# Если статья в листинге категории:
article = await db.get_article(article_id)
if article:
keys_to_delete.append(f"category:{article.category_id}:listing")
await redis.delete(*keys_to_delete)
# В сервисе обновления:
async def update_article(article_id: int, data: dict) -> dict:
updated = await db.update_article(article_id, data)
await emit_invalidation(CacheInvalidationEvent("article", article_id, "updated"))
return updated
Versioned keys — вместо удаления ключа меняем его версию. Старые данные остаются в кэше до вытеснения по TTL, но больше не читаются.
# Versioned keys через Redis-атомарный инкремент версии:
async def get_version(entity: str, entity_id: int) -> int:
v = await redis.get(f"version:{entity}:{entity_id}")
return int(v) if v else 0
async def bump_version(entity: str, entity_id: int) -> int:
return await redis.incr(f"version:{entity}:{entity_id}")
async def get_article_cached(article_id: int) -> dict | None:
version = await get_version("article", article_id)
key = f"article:{article_id}:v{version}"
cached = await redis.get(key)
if cached:
return json.loads(cached)
article = await db.get_article(article_id)
if article:
await redis.setex(key, 3600, json.dumps(article))
return article
async def invalidate_article(article_id: int) -> None:
# Просто инкрементируем версию — старый ключ больше не читается
await bump_version("article", article_id)
# Старые данные вытеснятся по TTL автоматически
Distributed cache: consistent hashing и cache stampede
При горизонтальном скейлинге Redis возникают два класса проблем: как распределить ключи между нодами, и что происходит, когда кэш протухает одновременно под большой нагрузкой.
Consistent hashing — алгоритм распределения ключей, при котором добавление/удаление ноды требует перехэширования только ~1/N ключей (вместо всех). Redis Cluster использует hash slots (16384 слота), но концепция та же.
# Redis Cluster: автоматическое шардирование
# redis-py автоматически маршрутизирует команды на правильную ноду
from redis.cluster import RedisCluster
rc = RedisCluster(
startup_nodes=[
{"host": "redis-1", "port": 6379},
{"host": "redis-2", "port": 6379},
{"host": "redis-3", "port": 6379},
],
decode_responses=True,
skip_full_coverage_check=True,
)
# Ключи с одинаковым хэш-тегом попадают на одну ноду
# (нужно для multi-key операций):
# {user:123}:profile и {user:123}:sessions → одна нода
await rc.set("{user:123}:profile", json.dumps(profile))
await rc.set("{user:123}:sessions", json.dumps(sessions))
# Транзакции работают только в рамках одной ноды:
pipe = rc.pipeline()
pipe.get("{user:123}:profile")
pipe.get("{user:123}:sessions")
results = pipe.execute()
Cache stampede (thundering herd) — ситуация, когда кэш протухает под нагрузкой и сотни запросов одновременно идут в БД. Три паттерна защиты:
import asyncio
import time
# 1. Mutex (один запрос обновляет, остальные ждут):
_locks: dict[str, asyncio.Lock] = {}
async def get_with_mutex(key: str, fetch_fn, ttl: int = 300):
cached = await redis.get(key)
if cached:
return json.loads(cached)
lock = _locks.setdefault(key, asyncio.Lock())
async with lock:
# Повторная проверка после захвата блокировки
cached = await redis.get(key)
if cached:
return json.loads(cached)
value = await fetch_fn()
await redis.setex(key, ttl, json.dumps(value))
return value
# 2. Probabilistic early expiration (XFetch алгоритм):
# Вероятность обновления растёт по мере приближения к TTL
async def get_with_xfetch(key: str, fetch_fn, ttl: int = 300, beta: float = 1.0):
"""XFetch: стохастическая ранняя инвалидация."""
data = await redis.get(key)
if data:
entry = json.loads(data)
remaining_ttl = await redis.ttl(key)
delta = entry.get("_fetch_time", 0) # время последнего fetch
# Вероятность обновления = exp(-delta * remaining_ttl / beta)
import math
if remaining_ttl > 0 and (
-delta * math.log(remaining_ttl / ttl) < beta
or remaining_ttl <= 0
):
return entry["value"]
value = await fetch_fn()
await redis.setex(key, ttl, json.dumps({"value": value, "_fetch_time": time.time()}))
return value
# 3. Background refresh (stale-while-revalidate на уровне приложения):
_refresh_tasks: set[str] = set()
async def get_with_background_refresh(key: str, fetch_fn, ttl: int = 300, stale: int = 60):
data = await redis.get(key)
remaining = await redis.ttl(key)
if data and remaining > 0:
# Если близко к истечению — обновляем в фоне, отдаём стейл
if remaining <= stale and key not in _refresh_tasks:
_refresh_tasks.add(key)
asyncio.create_task(_refresh_cache(key, fetch_fn, ttl))
return json.loads(data)
# Cache miss — блокирующий fetch
value = await fetch_fn()
await redis.setex(key, ttl, json.dumps(value))
return value
async def _refresh_cache(key: str, fetch_fn, ttl: int):
try:
value = await fetch_fn()
await redis.setex(key, ttl, json.dumps(value))
finally:
_refresh_tasks.discard(key)
Выбор стратегии защиты от stampede зависит от контекста. Mutex — самый простой и корректный, но вводит задержку для ожидающих. Background refresh — лучший UX (пользователь никогда не ждёт), но требует принять staleness. XFetch — математически оптимален, но сложен в понимании.
Кэш — это ложь, которую система говорит сама себе ради производительности. Ваша задача — контролировать, насколько давно была сказана эта ложь и когда пора обновить правду.