Error handling в распределённых системах
В монолите ошибка — исключение. В распределённой системе ошибка — нормальное рабочее состояние. Сеть ненадёжна, сервисы падают, таймауты случаются, дата-центры теряют питание. Единственный вопрос — не «произойдёт ли ошибка», а «как система на неё реагирует».
Большинство команд обнаруживают это в момент первого серьёзного инцидента: один сервис упал, и вся система легла каскадом. Платёжный сервис не отвечает — API-gateway ждёт таймаута 30 секунд, накапливает потоки, кончаются коннекты к БД, падает весь бэкенд. Из-за одного сервиса.
Разберём паттерны, которые предотвращают каскадные отказы и делают систему устойчивой к частичным сбоям.
Circuit Breaker: автоматический выключатель
Circuit breaker — паттерн, заимствованный из электротехники. Автоматический выключатель размыкает цепь при перегрузке, защищая оборудование. В программировании он защищает систему от каскадного распространения сбоев.
Три состояния автоматического выключателя:
- Closed (замкнут) — нормальная работа, запросы проходят. Ошибки считаются. Если ошибок становится больше порога за временное окно — переходим в Open.
- Open (разомкнут) — запросы блокируются немедленно, без обращения к внешнему сервису. После timeout-периода переходим в Half-Open.
- Half-Open (полуоткрыт) — пропускаем небольшое количество пробных запросов. Если они успешны — возвращаемся в Closed. Если нет — снова Open.
Реализация на Python с использованием библиотеки pybreaker:
import pybreaker
import httpx
from functools import wraps
# Создаём circuit breaker: открывается после 5 ошибок,
# остаётся открытым 30 секунд
payment_breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=30,
name="payment-service",
)
# Listener для мониторинга переходов состояний
class BreakerListener(pybreaker.CircuitBreakerListener):
def state_change(self, cb, old_state, new_state):
# Отправляем метрику в Prometheus/Datadog
print(f"[CircuitBreaker] {cb.name}: {old_state.name} → {new_state.name}")
def failure(self, cb, exc):
print(f"[CircuitBreaker] {cb.name} failure: {exc}")
payment_breaker.add_listeners(BreakerListener())
@payment_breaker
async def charge_payment(order_id: str, amount: float) -> dict:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post(
"http://payment-svc/charge",
json={"order_id": order_id, "amount": amount},
)
response.raise_for_status()
return response.json()
async def process_order(order_id: str, amount: float):
try:
result = await charge_payment(order_id, amount)
return {"status": "charged", "transaction_id": result["tx_id"]}
except pybreaker.CircuitBreakerError:
# Breaker открыт — fallback без обращения к сервису
return {"status": "pending", "message": "Payment service temporarily unavailable"}
except httpx.HTTPStatusError as e:
# Ошибка от сервиса — засчитывается в статистику breaker'а
raise
В экосистеме .NET аналогичный функционал даёт Polly — библиотека resilience-политик:
# Эквивалент на TypeScript (для справки архитектуры)
# Polly .NET: аналогичный паттерн
# services.AddHttpClient("PaymentService")
# .AddPolicyHandler(GetCircuitBreakerPolicy());
#
# static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy() =>
# HttpPolicyExtensions
# .HandleTransientHttpError()
# .CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));
#
# В Java — resilience4j:
# CircuitBreakerConfig config = CircuitBreakerConfig.custom()
# .failureRateThreshold(50)
# .waitDurationInOpenState(Duration.ofSeconds(30))
# .slidingWindowSize(10)
# .build();
Ключевой момент: circuit breaker должен быть per-downstream-service. Один breaker на все внешние вызовы бесполезен — он не изолирует сбои. Для каждого зависимого сервиса — свой экземпляр с независимыми настройками.
Retry с экспоненциальным backoff и jitter
Наивный retry убивает упавший сервис. Представьте: 1000 клиентов получили ошибку, каждый немедленно повторяет запрос. Сервис, который едва поднимается после перегрузки, получает ещё 1000 одновременных запросов и падает снова. Это называется thundering herd problem.
Решение — exponential backoff с jitter:
import asyncio
import random
import httpx
from typing import TypeVar, Callable, Awaitable
T = TypeVar("T")
async def retry_with_backoff(
fn: Callable[[], Awaitable[T]],
max_attempts: int = 4,
base_delay: float = 0.5, # секунды
max_delay: float = 30.0,
jitter: bool = True,
) -> T:
"""
Retry с exponential backoff + full jitter.
Delays (без jitter): 0.5s, 1s, 2s, 4s
Delays (с jitter): случайное в [0, delay]
"""
last_exc: Exception | None = None
for attempt in range(max_attempts):
try:
return await fn()
except (httpx.TimeoutException, httpx.NetworkError) as exc:
last_exc = exc
if attempt == max_attempts - 1:
raise
# Exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)
# Full jitter: равномерное распределение в [0, delay]
# Лучше, чем "decorrelated jitter" для большинства случаев
if jitter:
delay = random.uniform(0, delay)
print(f"Attempt {attempt + 1} failed: {exc}. Retrying in {delay:.2f}s...")
await asyncio.sleep(delay)
raise last_exc # type: ignore
# Пример использования
async def get_user_profile(user_id: str) -> dict:
async def _fetch():
async with httpx.AsyncClient(timeout=3.0) as client:
r = await client.get(f"http://user-svc/users/{user_id}")
r.raise_for_status()
return r.json()
return await retry_with_backoff(
_fetch,
max_attempts=3,
base_delay=0.3,
)
Важные правила для retry:
- Retry только идемпотентных операций. GET — безопасно. POST на /charge — нет. Если сервис получил запрос, но не вернул ответ, повторный POST может списать деньги дважды. Используйте idempotency key.
- Retry только на транзиентные ошибки. 503 (Service Unavailable) — retry. 400 (Bad Request) или 404 — нет смысла, ошибка детерминированная.
- Бюджет retry не безграничен. Если у вас цепочка сервисов A→B→C, и каждый делает 3 retry, итоговое количество попыток на листовом сервисе — 3³ = 27. Retry amplification реальна.
// TypeScript: retry с idempotency key
async function chargeWithIdempotency(
orderId: string,
amount: number,
): Promise {
// Idempotency key — стабильный идентификатор для этой операции
// Сервер должен его хранить и возвращать тот же ответ при повторе
const idempotencyKey = `charge-${orderId}-${Date.now()}`;
return retryWithBackoff(
() =>
fetch("http://payment-svc/charge", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Idempotency-Key": idempotencyKey,
},
body: JSON.stringify({ orderId, amount }),
signal: AbortSignal.timeout(5000),
}).then((r) => {
if (!r.ok) throw new Error(`HTTP ${r.status}`);
return r.json();
}),
{ maxAttempts: 3, baseDelay: 500, retryOn: [503, 429] },
);
}
Dead Letter Queue: когда retry не помогает
Retry решает транзиентные ошибки. Но что если сообщение невалидно, сервис стабильно отвергает его, или обработка требует ручного вмешательства? Бесконечный retry заблокирует очередь и создаст backpressure на весь pipeline.
Dead Letter Queue (DLQ) — специальная очередь для сообщений, которые не удалось обработать после исчерпания попыток. Вместо потери сообщения или бесконечного retry — отправляем в DLQ для анализа и ручной обработки.
# RabbitMQ: настройка DLQ через dead-letter-exchange
# docker-compose.yml фрагмент
rabbitmq:
image: rabbitmq:3.13-management
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
---
# Объявление очереди с DLQ через HTTP API / SDK:
# x-dead-letter-exchange: dlx (dead letter exchange)
# x-message-ttl: 60000 (TTL для retry)
# x-max-length: 10000 (защита от переполнения)
# Топология:
# main_queue → (после 3 nack) → dlx → dlq
# dlq → (ручная обработка или автоматический replay)
import aio_pika
import json
import logging
logger = logging.getLogger(__name__)
MAIN_QUEUE = "orders"
DLQ = "orders.dlq"
DLX = "orders.dlx"
MAX_RETRIES = 3
async def setup_queues(channel: aio_pika.Channel):
"""Настройка топологии очередей с DLQ."""
# Dead letter exchange
dlx = await channel.declare_exchange(
DLX, aio_pika.ExchangeType.DIRECT, durable=True
)
# Dead letter queue
dlq = await channel.declare_queue(
DLQ,
durable=True,
arguments={"x-queue-type": "quorum"}, # RabbitMQ 3.8+
)
await dlq.bind(dlx, routing_key=MAIN_QUEUE)
# Основная очередь с указанием DLX
main_queue = await channel.declare_queue(
MAIN_QUEUE,
durable=True,
arguments={
"x-dead-letter-exchange": DLX,
"x-dead-letter-routing-key": MAIN_QUEUE,
"x-message-ttl": 60_000, # 60 сек TTL для retry
},
)
return main_queue, dlq
async def process_message(message: aio_pika.IncomingMessage):
async with message.process(requeue=False):
body = json.loads(message.body)
retry_count = message.headers.get("x-retry-count", 0)
try:
await handle_order(body)
except Exception as exc:
logger.error(f"Failed to process order {body.get('id')}: {exc}")
if retry_count < MAX_RETRIES:
# Переотправляем с увеличенным счётчиком
await message.channel.default_exchange.publish(
aio_pika.Message(
body=message.body,
headers={**message.headers, "x-retry-count": retry_count + 1},
),
routing_key=MAIN_QUEUE,
)
else:
# Исчерпали попытки — nack без requeue,
# RabbitMQ отправит в DLX автоматически
logger.warning(
f"Message dead-lettered after {MAX_RETRIES} retries: {body.get('id')}"
)
# Здесь — алерт в Slack/PagerDuty
await send_dlq_alert(body, str(exc))
После отправки в DLQ необходим alerting. DLQ не должен молча накапливать сообщения неделями. Настройте мониторинг глубины DLQ: если очередь растёт — это сигнал о системной проблеме.
# Prometheus alert для глубины DLQ
# prometheus/rules/messaging.yml
groups:
- name: messaging
rules:
- alert: DLQDepthCritical
expr: rabbitmq_queue_messages{queue="orders.dlq"} > 100
for: 5m
labels:
severity: warning
annotations:
summary: "DLQ depth exceeded threshold"
description: "orders.dlq has {{ $value }} messages — manual intervention required"
- alert: DLQGrowing
expr: rate(rabbitmq_queue_messages{queue="orders.dlq"}[10m]) > 0
for: 15m
labels:
severity: info
annotations:
summary: "DLQ is growing steadily"
Timeout budgets: управление таймаутами в цепочке
Таймауты — один из самых недооценённых аспектов distributed systems. Без таймаутов один медленный сервис держит поток бесконечно, ресурсы исчерпываются, система деградирует.
Концепция timeout budget: у каждого входящего запроса есть бюджет времени. При проксировании в downstream-сервисы передаётся оставшийся бюджет, а не фиксированный таймаут. Это предотвращает ситуацию, когда вся цепочка вместе превышает ожидаемое время ответа.
import time
import httpx
from contextvars import ContextVar
# Хранение deadline в контексте запроса
request_deadline: ContextVar[float | None] = ContextVar("request_deadline", default=None)
def set_request_deadline(timeout_seconds: float):
"""Устанавливаем deadline для текущего запроса."""
request_deadline.set(time.monotonic() + timeout_seconds)
def get_remaining_budget() -> float:
"""Сколько секунд осталось в бюджете."""
deadline = request_deadline.get()
if deadline is None:
return 5.0 # дефолт если нет deadline
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError("Request deadline exceeded before call")
return remaining
async def call_downstream(url: str, payload: dict) -> dict:
"""Вызов downstream сервиса с оставшимся бюджетом."""
timeout = min(get_remaining_budget() * 0.9, 10.0) # 90% бюджета, max 10s
async with httpx.AsyncClient(timeout=timeout) as client:
r = await client.post(url, json=payload)
r.raise_for_status()
return r.json()
# FastAPI middleware для установки deadline
from fastapi import Request
import asyncio
async def deadline_middleware(request: Request, call_next):
# Читаем deadline из заголовка (gRPC-style)
deadline_header = request.headers.get("X-Request-Deadline")
if deadline_header:
try:
deadline = float(deadline_header) # unix timestamp
budget = deadline - time.time()
if budget > 0:
set_request_deadline(budget)
except ValueError:
pass
# Если нет заголовка — ставим дефолт
if request_deadline.get() is None:
set_request_deadline(30.0) # 30 секунд максимум
return await call_next(request)
Bulkhead: изоляция failure domains
Bulkhead (переборка) — морской термин: водонепроницаемые перегородки не дают одному пробою затопить весь корабль. В программировании — изоляция ресурсов между пулами запросов.
Без bulkhead: один медленный downstream захватывает весь пул потоков, другие операции не могут выполняться даже если их downstream работает нормально.
import asyncio
from asyncio import Semaphore
# Отдельные semaphore для каждого downstream сервиса
# Лимитируем одновременные запросы независимо
class BulkheadedClient:
def __init__(self):
# Изолированные пулы для каждого сервиса
self._semaphores = {
"payment": Semaphore(10), # max 10 concurrent calls to payment
"inventory": Semaphore(20), # max 20 concurrent calls to inventory
"notification": Semaphore(5), # max 5 concurrent calls to notifications
}
self._client = httpx.AsyncClient(timeout=5.0)
async def call(self, service: str, url: str, **kwargs) -> dict:
semaphore = self._semaphores.get(service)
if semaphore is None:
raise ValueError(f"Unknown service: {service}")
# Пробуем взять ресурс из пула — без блокировки
acquired = await asyncio.wait_for(
semaphore.acquire(),
timeout=1.0, # 1 секунда на получение слота
)
try:
r = await self._client.post(url, **kwargs)
r.raise_for_status()
return r.json()
finally:
semaphore.release()
async def call_payment(self, payload: dict) -> dict:
try:
return await self.call("payment", "http://payment-svc/charge", json=payload)
except asyncio.TimeoutError:
# Пул заполнен — немедленный отказ
raise RuntimeError("Payment bulkhead full: too many concurrent requests")
В Java-экосистеме bulkhead реализуется через resilience4j с типами Semaphore и ThreadPool. ThreadPool bulkhead более мощный: изолирует не только количество запросов, но и потоки выполнения.
Graceful degradation: система, которая работает при частичных сбоях
Graceful degradation — способность системы продолжать работу в ограниченном режиме при сбое части компонентов. Вместо полного отказа — частичные результаты, кеш, дефолтные значения.
from dataclasses import dataclass
from typing import Optional
import asyncio
@dataclass
class ProductPage:
product: dict
recommendations: list = None
user_reviews: list = None
inventory_status: str = "unknown"
async def get_product_page(product_id: str, user_id: str) -> ProductPage:
"""
Загружаем страницу продукта с graceful degradation.
Если рекомендации или отзывы недоступны — показываем без них.
"""
# Критический путь: продукт обязателен
product = await product_service.get(product_id) # без fallback
# Некритичные сервисы — parallel с timeout
async def safe_recommendations():
try:
return await asyncio.wait_for(
recommendation_svc.get(user_id, product_id),
timeout=1.5,
)
except Exception:
return [] # fallback: пустой список
async def safe_reviews():
try:
return await asyncio.wait_for(
review_svc.get(product_id, limit=5),
timeout=2.0,
)
except Exception:
return []
async def safe_inventory():
try:
inv = await asyncio.wait_for(
inventory_svc.get_status(product_id),
timeout=1.0,
)
return inv["status"]
except Exception:
# Если инвентарь недоступен — не блокируем покупку,
# проверяем при оформлении заказа
return "unknown"
recs, reviews, inventory = await asyncio.gather(
safe_recommendations(),
safe_reviews(),
safe_inventory(),
return_exceptions=False,
)
return ProductPage(
product=product,
recommendations=recs,
user_reviews=reviews,
inventory_status=inventory,
)
Feature flags — мощный инструмент для управляемой деградации. Вместо кода, который "пробует и падает", — явное отключение фичи через конфиг:
// Feature flags для управляемой деградации
interface FeatureFlags {
recommendations_enabled: boolean;
real_time_inventory: boolean;
ai_search: boolean;
}
async function getFlags(): Promise {
try {
// Загружаем из LaunchDarkly / Unleash / своего хранилища
return await flagsService.getAll();
} catch {
// Если флаговый сервис недоступен — safe defaults
return {
recommendations_enabled: false,
real_time_inventory: false,
ai_search: false,
};
}
}
async function buildHomePage(userId: string): Promise {
const flags = await getFlags();
const [feed, recs, searchConfig] = await Promise.all([
getMainFeed(),
flags.recommendations_enabled
? getRecommendations(userId)
: Promise.resolve([]), // skip if disabled
flags.ai_search
? getAISearchConfig()
: Promise.resolve({ mode: "basic" }),
]);
return { feed, recommendations: recs, searchConfig };
}
Observability: видеть то, что происходит
Все описанные паттерны бессмысленны без наблюдаемости. Если circuit breaker открылся — вы должны узнать об этом немедленно, понять почему, и восстановить сервис.
Структурированное логирование ошибок — фундамент. Логи должны содержать контекст, а не просто текст исключения:
import logging
import json
import traceback
from datetime import datetime, timezone
class StructuredLogger:
def __init__(self, service: str):
self.service = service
self._logger = logging.getLogger(service)
def error(self, event: str, exc: Exception = None, **context):
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": "error",
"service": self.service,
"event": event,
**context,
}
if exc is not None:
record["error"] = {
"type": type(exc).__name__,
"message": str(exc),
"traceback": traceback.format_exc(),
}
# JSON в stdout — для Datadog/ELK/CloudWatch
print(json.dumps(record, ensure_ascii=False))
def circuit_open(self, downstream: str, failure_rate: float, **ctx):
self.error(
"circuit_breaker_opened",
downstream=downstream,
failure_rate=failure_rate,
**ctx,
)
# Использование
logger = StructuredLogger("order-service")
async def process_payment(order_id: str, amount: float):
try:
return await charge_payment(order_id, amount)
except pybreaker.CircuitBreakerError as exc:
logger.error(
"payment_circuit_open",
exc,
order_id=order_id,
amount=amount,
action="fallback_to_pending",
)
return {"status": "pending"}
Error budgets (SRE-подход): вместо целевого "нулевого количества ошибок" определяем допустимый уровень ошибок — error budget. Если SLO = 99.9% успешных запросов, error budget = 0.1% = ~43 минуты простоя в месяц. Это реалистично и позволяет команде двигаться.
# Prometheus: SLO recording rules
# prometheus/rules/slo.yml
groups:
- name: slo_order_service
interval: 30s
rules:
# Доля успешных запросов за 5-минутное окно
- record: job:http_requests:success_rate5m
expr: |
sum(rate(http_requests_total{job="order-service",status!~"5.."}[5m]))
/
sum(rate(http_requests_total{job="order-service"}[5m]))
# Алерт: SLO нарушено за последний час
- alert: SLOBudgetBurning
expr: |
(1 - job:http_requests:success_rate5m{job="order-service"}) > 0.01
for: 10m
labels:
severity: warning
annotations:
summary: "Order service error rate > 1% — burning error budget"
description: "Current error rate: {{ printf \"%.2f\" (mul $value 100) }}%"
Паттерны устойчивости не добавляются постфактум — они проектируются заранее. Circuit breaker без observability бесполезен. Retry без idempotency создаёт новые проблемы. Graceful degradation требует, чтобы продукт определил: что является критическим путём, а что — optional enhancement.
Начните с малого: добавьте таймауты везде, где их нет. Это одно изменение предотвращает большинство каскадных отказов. Потом — circuit breaker для самых ненадёжных зависимостей. Потом — DLQ для асинхронных операций. Итеративно, слой за слоем.
В распределённой системе вопрос не в том, упадёт ли что-то — а в том, деградирует ли система изолированно или тянет за собой всё остальное. Цель всех паттернов — первое.