← все статьи
8 мин

Кэширование: от HTTP-заголовков до distributed cache

«There are only two hard things in Computer Science: cache invalidation and naming things» — Фил Карлтон. Фраза уже стала мемом, но проблема не исчезла. Кэши ускоряют системы на порядки, но неправильная инвалидация превращает их в источник багов, которые воспроизводятся только на production и только у части пользователей.

Кэширование — это всегда компромисс между свежестью данных и производительностью. Понять, где именно поставить кэш и как его инвалидировать, важнее, чем знать API конкретной библиотеки. Начнём с самого близкого к пользователю уровня и пойдём вглубь стека.

HTTP caching: Cache-Control и условные запросы

HTTP-кэширование встроено в протокол. Правильно выставленные заголовки позволяют браузерам и CDN хранить ответы без изменений в коде приложения. Это самый дешёвый вид кэша — никакой инфраструктуры, просто заголовки.

Cache-Control — главный директивный заголовок:

# Статические ресурсы (JS, CSS, изображения с хэшем в имени):
# Кэшировать на год, immutable — браузер не делает revalidation
Cache-Control: public, max-age=31536000, immutable

# HTML-страницы (актуальность важна):
# Кэшировать 10 минут, потом revalidate
Cache-Control: public, max-age=600, stale-while-revalidate=60

# Приватные данные (пользовательский профиль, корзина):
# Только в браузерном кэше, не в CDN, 5 минут
Cache-Control: private, max-age=300

# Никогда не кэшировать (API с реал-тайм данными, health-эндпоинты):
Cache-Control: no-store

# Кэшировать, но всегда revalidate перед использованием:
Cache-Control: no-cache  # misleading name: это НЕ "не кэшировать"

# stale-while-revalidate: отдавать устаревший кэш пока обновляем в фоне
Cache-Control: public, max-age=60, stale-while-revalidate=600

# stale-if-error: использовать устаревший кэш если origin недоступен
Cache-Control: public, max-age=300, stale-if-error=86400

ETag и условные запросы — механизм revalidation без передачи полного тела ответа:

# FastAPI: ETag-based caching
import hashlib
from fastapi import Request, Response
from fastapi.responses import JSONResponse


async def get_article(article_id: int, request: Request, response: Response):
    article = await db.get_article(article_id)
    if not article:
        raise HTTPException(status_code=404)

    # Вычисляем ETag из содержимого (или времени обновления)
    content = article.model_dump_json()
    etag = f'"{hashlib.md5(content.encode()).hexdigest()}"'

    # Проверяем If-None-Match от клиента
    if request.headers.get("If-None-Match") == etag:
        # Контент не изменился — отвечаем 304 без тела
        return Response(status_code=304, headers={"ETag": etag})

    response.headers["ETag"] = etag
    response.headers["Cache-Control"] = "public, max-age=300"
    response.headers["Last-Modified"] = article.updated_at.strftime(
        "%a, %d %b %Y %H:%M:%S GMT"
    )
    return JSONResponse(content=article.model_dump(), headers=response.headers)
# Middleware для автоматических ETag:
# (применять для GET-ответов с детерминированным контентом)
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
import hashlib


class ETagMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        response = await call_next(request)

        if request.method != "GET" or response.status_code != 200:
            return response

        body = b""
        async for chunk in response.body_iterator:
            body += chunk

        etag = f'"{hashlib.md5(body).hexdigest()}"'
        if request.headers.get("If-None-Match") == etag:
            return Response(status_code=304, headers={"ETag": etag})

        from starlette.responses import Response as SR
        return SR(
            content=body,
            status_code=response.status_code,
            headers={**dict(response.headers), "ETag": etag},
            media_type=response.media_type,
        )

CDN caching: edge caching и стратегии purge

CDN (Content Delivery Network) кэширует ответы на edge-серверах, географически близких к пользователям. Запрос из Алматы к origin-серверу в Лондоне — 200+ мс. Тот же запрос к CDN edge во Франкфурте — 20–30 мс. Но CDN добавляет слой сложности: теперь инвалидация означает очистку кэша не на одном сервере, а на десятках edge-точек по всему миру.

# nginx: управление CDN-кэшированием через заголовки
server {
    location /api/articles/ {
        proxy_pass http://backend;

        # Vary: Authorization гарантирует раздельное кэширование
        # для аутентифицированных и анонимных пользователей
        add_header Vary "Accept-Encoding, Authorization";

        # Surrogate-Control для CDN (Fastly, Varnish понимают этот заголовок)
        # Браузер получает Cache-Control, CDN использует Surrogate-Control
        add_header Surrogate-Control "max-age=3600";
        add_header Cache-Control "public, max-age=60, s-maxage=3600";
    }

    location /api/articles/*/comments {
        proxy_pass http://backend;
        # Комментарии меняются чаще — короче TTL
        add_header Cache-Control "public, max-age=30, stale-while-revalidate=60";
    }

    location /static/ {
        alias /var/www/static/;
        expires 1y;
        add_header Cache-Control "public, max-age=31536000, immutable";
        add_header Vary "Accept-Encoding";
    }
}

Cache tagging и purge по тегам — ключевой инструмент точечной инвалидации в CDN:

# Fastly: Surrogate-Key header для тегирования
# При обновлении статьи — инвалидируем все связанные кэши

import httpx


async def get_article_with_cache_tags(article_id: int, response: Response):
    article = await db.get_article(article_id)
    category = article.category

    # Тегируем ответ — CDN запомнит, что эта страница связана с этими тегами
    response.headers["Surrogate-Key"] = (
        f"article-{article_id} "
        f"category-{category} "
        f"author-{article.author_id}"
    )
    return article


async def on_article_updated(article_id: int, category: str):
    """При обновлении статьи — инвалидируем все связанные кэши на CDN."""
    async with httpx.AsyncClient() as client:
        # Fastly API: purge по Surrogate-Key
        await client.post(
            f"https://api.fastly.com/service/{FASTLY_SERVICE_ID}/purge",
            headers={
                "Fastly-Key": FASTLY_API_KEY,
                "Surrogate-Key": f"article-{article_id}",
            }
        )


async def on_category_updated(category: str):
    """При изменении категории — инвалидируем все статьи категории."""
    async with httpx.AsyncClient() as client:
        await client.post(
            f"https://api.fastly.com/service/{FASTLY_SERVICE_ID}/purge",
            headers={
                "Fastly-Key": FASTLY_API_KEY,
                "Surrogate-Key": f"category-{category}",
            }
        )

stale-while-revalidate — элегантный паттерн для балансирования свежести и производительности. CDN отдаёт устаревший кэш немедленно, одновременно обновляя его в фоне. Пользователь не ждёт — максимум один запрос за TTL уходит на origin.

Application-level cache: in-memory, Redis, Memcached

In-memory кэш (LRU) — самый быстрый, живёт в памяти процесса. Подходит для небольших неизменных данных: конфигурации, lookup-таблицы, результаты дорогих вычислений.

# Python: functools.lru_cache и cachetools
from functools import lru_cache
from cachetools import TTLCache, cached
import threading


# lru_cache: простое кэширование без TTL (живёт до рестарта процесса)
@lru_cache(maxsize=1000)
def get_country_by_code(code: str) -> dict:
    # Дорогой lookup, который делается редко
    return db.query_sync(f"SELECT * FROM countries WHERE code = '{code}'")


# TTLCache: LRU + TTL (элементы вытесняются по времени)
_config_cache = TTLCache(maxsize=100, ttl=300)  # 5 минут
_lock = threading.Lock()


@cached(_config_cache, lock=_lock)
def get_feature_flags() -> dict:
    return db.query_sync("SELECT key, value FROM feature_flags")


# asyncio-совместимый кэш:
from asyncio import Lock as AsyncLock

_async_cache: dict[str, tuple] = {}  # key → (value, expires_at)
_async_lock = AsyncLock()


async def get_cached_or_fetch(key: str, fetch_fn, ttl: int = 300):
    import time
    async with _async_lock:
        if key in _async_cache:
            value, expires_at = _async_cache[key]
            if time.monotonic() < expires_at:
                return value

        value = await fetch_fn()
        _async_cache[key] = (value, time.monotonic() + ttl)
        return value

Ограничения in-memory: данные не переживают рестарт, не разделяются между инстансами (проблема при горизонтальном скейлинге). Для production-систем с несколькими инстансами — только Redis.

Redis паттерны: cache-aside, write-through, write-behind

Cache-Aside (Lazy Loading) — самый распространённый паттерн. Приложение сначала проверяет кэш, при промахе загружает из БД и кладёт в кэш.

# Python: Redis cache-aside с aiocache
import json
import redis.asyncio as aioredis
from typing import TypeVar, Callable, Any

redis = aioredis.from_url("redis://localhost:6379", decode_responses=True)
T = TypeVar("T")


async def cache_aside(
    key: str,
    fetch_fn: Callable[[], Any],
    ttl: int = 3600,
    serializer=json,
) -> Any:
    """Универсальная cache-aside функция."""
    # 1. Пробуем кэш
    cached = await redis.get(key)
    if cached is not None:
        return serializer.loads(cached)

    # 2. Cache miss: идём в источник
    value = await fetch_fn()

    # 3. Кладём в кэш
    if value is not None:
        await redis.setex(key, ttl, serializer.dumps(value))

    return value


# Использование:
async def get_user_profile(user_id: int) -> dict:
    return await cache_aside(
        key=f"user:{user_id}:profile",
        fetch_fn=lambda: db.get_user(user_id),
        ttl=1800,  # 30 минут
    )


async def invalidate_user_cache(user_id: int) -> None:
    """Инвалидация после обновления пользователя."""
    keys = await redis.keys(f"user:{user_id}:*")
    if keys:
        await redis.delete(*keys)

Write-Through — при записи в БД одновременно обновляется кэш. Данные всегда актуальны, но каждая запись дороже.

async def update_user_profile(user_id: int, data: dict) -> dict:
    # Обновляем БД
    updated = await db.update_user(user_id, data)

    # Сразу обновляем кэш (write-through)
    await redis.setex(
        f"user:{user_id}:profile",
        1800,
        json.dumps(updated)
    )
    return updated

Write-Behind (Write-Back) — запись идёт сначала в кэш, а в БД — асинхронно, с задержкой. Максимальная производительность записи, но риск потери данных при падении кэша. Подходит для некритичных данных: счётчики просмотров, аналитика, лайки.

# Write-behind для счётчика просмотров:
async def increment_view_count(article_id: int) -> None:
    # Инкрементируем в Redis (fast, atomic)
    count = await redis.incr(f"article:{article_id}:views")

    # Каждые 100 просмотров — flush в PostgreSQL
    if count % 100 == 0:
        await db.execute(
            "UPDATE articles SET view_count = view_count + 100 WHERE id = :id",
            {"id": article_id}
        )

    # Или через периодическую задачу (ARQ worker раз в минуту):
    # SELECT все ключи article:*:views, batch UPDATE в PostgreSQL, DEL ключи

Стратегии инвалидации: TTL, event-driven, versioned keys

Инвалидация — сложнейшая часть кэширования. Три основных подхода с разными трейдоффами:

TTL (Time-To-Live) — самый простой. Данные живут N секунд и вытесняются автоматически. Минус: может отдавать устаревшие данные до истечения TTL.

# Выбор TTL зависит от допустимой "устарелости":
TTL_STATIC_DATA = 86400        # 24 часа — справочники, которые меняются редко
TTL_CATALOG = 3600             # 1 час — каталог товаров
TTL_USER_PROFILE = 1800        # 30 минут — профиль пользователя
TTL_FEED = 300                 # 5 минут — лента новостей
TTL_REALTIME = 30              # 30 секунд — курсы, цены
TTL_RATE_LIMIT_WINDOW = 60     # 1 минута — окно rate limiting

Event-driven invalidation — инвалидация при событии изменения данных. Самый точный подход: кэш устарел ровно в момент изменения.

# Event-driven через database triggers или application events:

# Паттерн: при любом обновлении модели — публикуем событие
from dataclasses import dataclass
from typing import Callable


@dataclass
class CacheInvalidationEvent:
    entity: str
    entity_id: int
    action: str  # created, updated, deleted


# Event bus (упрощённый):
_handlers: dict[str, list[Callable]] = {}


def on_invalidation(entity: str):
    def decorator(fn):
        _handlers.setdefault(entity, []).append(fn)
        return fn
    return decorator


async def emit_invalidation(event: CacheInvalidationEvent):
    for handler in _handlers.get(event.entity, []):
        await handler(event)


# Регистрируем обработчики:
@on_invalidation("article")
async def invalidate_article_caches(event: CacheInvalidationEvent):
    article_id = event.entity_id
    keys_to_delete = [
        f"article:{article_id}",
        f"article:{article_id}:comments",
        f"article:{article_id}:related",
    ]
    # Если статья в листинге категории:
    article = await db.get_article(article_id)
    if article:
        keys_to_delete.append(f"category:{article.category_id}:listing")

    await redis.delete(*keys_to_delete)


# В сервисе обновления:
async def update_article(article_id: int, data: dict) -> dict:
    updated = await db.update_article(article_id, data)
    await emit_invalidation(CacheInvalidationEvent("article", article_id, "updated"))
    return updated

Versioned keys — вместо удаления ключа меняем его версию. Старые данные остаются в кэше до вытеснения по TTL, но больше не читаются.

# Versioned keys через Redis-атомарный инкремент версии:
async def get_version(entity: str, entity_id: int) -> int:
    v = await redis.get(f"version:{entity}:{entity_id}")
    return int(v) if v else 0


async def bump_version(entity: str, entity_id: int) -> int:
    return await redis.incr(f"version:{entity}:{entity_id}")


async def get_article_cached(article_id: int) -> dict | None:
    version = await get_version("article", article_id)
    key = f"article:{article_id}:v{version}"
    cached = await redis.get(key)
    if cached:
        return json.loads(cached)

    article = await db.get_article(article_id)
    if article:
        await redis.setex(key, 3600, json.dumps(article))
    return article


async def invalidate_article(article_id: int) -> None:
    # Просто инкрементируем версию — старый ключ больше не читается
    await bump_version("article", article_id)
    # Старые данные вытеснятся по TTL автоматически

Distributed cache: consistent hashing и cache stampede

При горизонтальном скейлинге Redis возникают два класса проблем: как распределить ключи между нодами, и что происходит, когда кэш протухает одновременно под большой нагрузкой.

Consistent hashing — алгоритм распределения ключей, при котором добавление/удаление ноды требует перехэширования только ~1/N ключей (вместо всех). Redis Cluster использует hash slots (16384 слота), но концепция та же.

# Redis Cluster: автоматическое шардирование
# redis-py автоматически маршрутизирует команды на правильную ноду

from redis.cluster import RedisCluster

rc = RedisCluster(
    startup_nodes=[
        {"host": "redis-1", "port": 6379},
        {"host": "redis-2", "port": 6379},
        {"host": "redis-3", "port": 6379},
    ],
    decode_responses=True,
    skip_full_coverage_check=True,
)

# Ключи с одинаковым хэш-тегом попадают на одну ноду
# (нужно для multi-key операций):
# {user:123}:profile и {user:123}:sessions → одна нода
await rc.set("{user:123}:profile", json.dumps(profile))
await rc.set("{user:123}:sessions", json.dumps(sessions))

# Транзакции работают только в рамках одной ноды:
pipe = rc.pipeline()
pipe.get("{user:123}:profile")
pipe.get("{user:123}:sessions")
results = pipe.execute()

Cache stampede (thundering herd) — ситуация, когда кэш протухает под нагрузкой и сотни запросов одновременно идут в БД. Три паттерна защиты:

import asyncio
import time


# 1. Mutex (один запрос обновляет, остальные ждут):
_locks: dict[str, asyncio.Lock] = {}


async def get_with_mutex(key: str, fetch_fn, ttl: int = 300):
    cached = await redis.get(key)
    if cached:
        return json.loads(cached)

    lock = _locks.setdefault(key, asyncio.Lock())
    async with lock:
        # Повторная проверка после захвата блокировки
        cached = await redis.get(key)
        if cached:
            return json.loads(cached)

        value = await fetch_fn()
        await redis.setex(key, ttl, json.dumps(value))
        return value


# 2. Probabilistic early expiration (XFetch алгоритм):
# Вероятность обновления растёт по мере приближения к TTL
async def get_with_xfetch(key: str, fetch_fn, ttl: int = 300, beta: float = 1.0):
    """XFetch: стохастическая ранняя инвалидация."""
    data = await redis.get(key)
    if data:
        entry = json.loads(data)
        remaining_ttl = await redis.ttl(key)
        delta = entry.get("_fetch_time", 0)  # время последнего fetch

        # Вероятность обновления = exp(-delta * remaining_ttl / beta)
        import math
        if remaining_ttl > 0 and (
            -delta * math.log(remaining_ttl / ttl) < beta
            or remaining_ttl <= 0
        ):
            return entry["value"]

    value = await fetch_fn()
    await redis.setex(key, ttl, json.dumps({"value": value, "_fetch_time": time.time()}))
    return value


# 3. Background refresh (stale-while-revalidate на уровне приложения):
_refresh_tasks: set[str] = set()


async def get_with_background_refresh(key: str, fetch_fn, ttl: int = 300, stale: int = 60):
    data = await redis.get(key)
    remaining = await redis.ttl(key)

    if data and remaining > 0:
        # Если близко к истечению — обновляем в фоне, отдаём стейл
        if remaining <= stale and key not in _refresh_tasks:
            _refresh_tasks.add(key)
            asyncio.create_task(_refresh_cache(key, fetch_fn, ttl))
        return json.loads(data)

    # Cache miss — блокирующий fetch
    value = await fetch_fn()
    await redis.setex(key, ttl, json.dumps(value))
    return value


async def _refresh_cache(key: str, fetch_fn, ttl: int):
    try:
        value = await fetch_fn()
        await redis.setex(key, ttl, json.dumps(value))
    finally:
        _refresh_tasks.discard(key)

Выбор стратегии защиты от stampede зависит от контекста. Mutex — самый простой и корректный, но вводит задержку для ожидающих. Background refresh — лучший UX (пользователь никогда не ждёт), но требует принять staleness. XFetch — математически оптимален, но сложен в понимании.


Кэш — это ложь, которую система говорит сама себе ради производительности. Ваша задача — контролировать, насколько давно была сказана эта ложь и когда пора обновить правду.

← Предыдущая Тестирование backend... Следующая →Error handling в распределённых системах