← все статьи
7 мин

Error handling в распределённых системах

В монолите ошибка — исключение. В распределённой системе ошибка — нормальное рабочее состояние. Сеть ненадёжна, сервисы падают, таймауты случаются, дата-центры теряют питание. Единственный вопрос — не «произойдёт ли ошибка», а «как система на неё реагирует».

Большинство команд обнаруживают это в момент первого серьёзного инцидента: один сервис упал, и вся система легла каскадом. Платёжный сервис не отвечает — API-gateway ждёт таймаута 30 секунд, накапливает потоки, кончаются коннекты к БД, падает весь бэкенд. Из-за одного сервиса.

Разберём паттерны, которые предотвращают каскадные отказы и делают систему устойчивой к частичным сбоям.

Circuit Breaker: автоматический выключатель

Circuit breaker — паттерн, заимствованный из электротехники. Автоматический выключатель размыкает цепь при перегрузке, защищая оборудование. В программировании он защищает систему от каскадного распространения сбоев.

Три состояния автоматического выключателя:

  • Closed (замкнут) — нормальная работа, запросы проходят. Ошибки считаются. Если ошибок становится больше порога за временное окно — переходим в Open.
  • Open (разомкнут) — запросы блокируются немедленно, без обращения к внешнему сервису. После timeout-периода переходим в Half-Open.
  • Half-Open (полуоткрыт) — пропускаем небольшое количество пробных запросов. Если они успешны — возвращаемся в Closed. Если нет — снова Open.

Реализация на Python с использованием библиотеки pybreaker:

import pybreaker
import httpx
from functools import wraps

# Создаём circuit breaker: открывается после 5 ошибок,
# остаётся открытым 30 секунд
payment_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=30,
    name="payment-service",
)

# Listener для мониторинга переходов состояний
class BreakerListener(pybreaker.CircuitBreakerListener):
    def state_change(self, cb, old_state, new_state):
        # Отправляем метрику в Prometheus/Datadog
        print(f"[CircuitBreaker] {cb.name}: {old_state.name} → {new_state.name}")
    
    def failure(self, cb, exc):
        print(f"[CircuitBreaker] {cb.name} failure: {exc}")

payment_breaker.add_listeners(BreakerListener())


@payment_breaker
async def charge_payment(order_id: str, amount: float) -> dict:
    async with httpx.AsyncClient(timeout=5.0) as client:
        response = await client.post(
            "http://payment-svc/charge",
            json={"order_id": order_id, "amount": amount},
        )
        response.raise_for_status()
        return response.json()


async def process_order(order_id: str, amount: float):
    try:
        result = await charge_payment(order_id, amount)
        return {"status": "charged", "transaction_id": result["tx_id"]}
    except pybreaker.CircuitBreakerError:
        # Breaker открыт — fallback без обращения к сервису
        return {"status": "pending", "message": "Payment service temporarily unavailable"}
    except httpx.HTTPStatusError as e:
        # Ошибка от сервиса — засчитывается в статистику breaker'а
        raise

В экосистеме .NET аналогичный функционал даёт Polly — библиотека resilience-политик:

# Эквивалент на TypeScript (для справки архитектуры)
# Polly .NET: аналогичный паттерн
# services.AddHttpClient("PaymentService")
#     .AddPolicyHandler(GetCircuitBreakerPolicy());
#
# static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy() =>
#     HttpPolicyExtensions
#         .HandleTransientHttpError()
#         .CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));
#
# В Java — resilience4j:
# CircuitBreakerConfig config = CircuitBreakerConfig.custom()
#     .failureRateThreshold(50)
#     .waitDurationInOpenState(Duration.ofSeconds(30))
#     .slidingWindowSize(10)
#     .build();

Ключевой момент: circuit breaker должен быть per-downstream-service. Один breaker на все внешние вызовы бесполезен — он не изолирует сбои. Для каждого зависимого сервиса — свой экземпляр с независимыми настройками.

Retry с экспоненциальным backoff и jitter

Наивный retry убивает упавший сервис. Представьте: 1000 клиентов получили ошибку, каждый немедленно повторяет запрос. Сервис, который едва поднимается после перегрузки, получает ещё 1000 одновременных запросов и падает снова. Это называется thundering herd problem.

Решение — exponential backoff с jitter:

import asyncio
import random
import httpx
from typing import TypeVar, Callable, Awaitable

T = TypeVar("T")


async def retry_with_backoff(
    fn: Callable[[], Awaitable[T]],
    max_attempts: int = 4,
    base_delay: float = 0.5,   # секунды
    max_delay: float = 30.0,
    jitter: bool = True,
) -> T:
    """
    Retry с exponential backoff + full jitter.
    
    Delays (без jitter): 0.5s, 1s, 2s, 4s
    Delays (с jitter): случайное в [0, delay]
    """
    last_exc: Exception | None = None
    
    for attempt in range(max_attempts):
        try:
            return await fn()
        except (httpx.TimeoutException, httpx.NetworkError) as exc:
            last_exc = exc
            if attempt == max_attempts - 1:
                raise
            
            # Exponential backoff
            delay = min(base_delay * (2 ** attempt), max_delay)
            
            # Full jitter: равномерное распределение в [0, delay]
            # Лучше, чем "decorrelated jitter" для большинства случаев
            if jitter:
                delay = random.uniform(0, delay)
            
            print(f"Attempt {attempt + 1} failed: {exc}. Retrying in {delay:.2f}s...")
            await asyncio.sleep(delay)
    
    raise last_exc  # type: ignore


# Пример использования
async def get_user_profile(user_id: str) -> dict:
    async def _fetch():
        async with httpx.AsyncClient(timeout=3.0) as client:
            r = await client.get(f"http://user-svc/users/{user_id}")
            r.raise_for_status()
            return r.json()
    
    return await retry_with_backoff(
        _fetch,
        max_attempts=3,
        base_delay=0.3,
    )

Важные правила для retry:

  • Retry только идемпотентных операций. GET — безопасно. POST на /charge — нет. Если сервис получил запрос, но не вернул ответ, повторный POST может списать деньги дважды. Используйте idempotency key.
  • Retry только на транзиентные ошибки. 503 (Service Unavailable) — retry. 400 (Bad Request) или 404 — нет смысла, ошибка детерминированная.
  • Бюджет retry не безграничен. Если у вас цепочка сервисов A→B→C, и каждый делает 3 retry, итоговое количество попыток на листовом сервисе — 3³ = 27. Retry amplification реальна.
// TypeScript: retry с idempotency key
async function chargeWithIdempotency(
  orderId: string,
  amount: number,
): Promise {
  // Idempotency key — стабильный идентификатор для этой операции
  // Сервер должен его хранить и возвращать тот же ответ при повторе
  const idempotencyKey = `charge-${orderId}-${Date.now()}`;

  return retryWithBackoff(
    () =>
      fetch("http://payment-svc/charge", {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
          "Idempotency-Key": idempotencyKey,
        },
        body: JSON.stringify({ orderId, amount }),
        signal: AbortSignal.timeout(5000),
      }).then((r) => {
        if (!r.ok) throw new Error(`HTTP ${r.status}`);
        return r.json();
      }),
    { maxAttempts: 3, baseDelay: 500, retryOn: [503, 429] },
  );
}

Dead Letter Queue: когда retry не помогает

Retry решает транзиентные ошибки. Но что если сообщение невалидно, сервис стабильно отвергает его, или обработка требует ручного вмешательства? Бесконечный retry заблокирует очередь и создаст backpressure на весь pipeline.

Dead Letter Queue (DLQ) — специальная очередь для сообщений, которые не удалось обработать после исчерпания попыток. Вместо потери сообщения или бесконечного retry — отправляем в DLQ для анализа и ручной обработки.

# RabbitMQ: настройка DLQ через dead-letter-exchange
# docker-compose.yml фрагмент
rabbitmq:
  image: rabbitmq:3.13-management
  environment:
    RABBITMQ_DEFAULT_USER: admin
    RABBITMQ_DEFAULT_PASS: secret

---
# Объявление очереди с DLQ через HTTP API / SDK:
# x-dead-letter-exchange: dlx (dead letter exchange)
# x-message-ttl: 60000 (TTL для retry)
# x-max-length: 10000 (защита от переполнения)

# Топология:
# main_queue → (после 3 nack) → dlx → dlq
# dlq → (ручная обработка или автоматический replay)
import aio_pika
import json
import logging

logger = logging.getLogger(__name__)

MAIN_QUEUE = "orders"
DLQ = "orders.dlq"
DLX = "orders.dlx"
MAX_RETRIES = 3


async def setup_queues(channel: aio_pika.Channel):
    """Настройка топологии очередей с DLQ."""
    # Dead letter exchange
    dlx = await channel.declare_exchange(
        DLX, aio_pika.ExchangeType.DIRECT, durable=True
    )
    
    # Dead letter queue
    dlq = await channel.declare_queue(
        DLQ,
        durable=True,
        arguments={"x-queue-type": "quorum"},  # RabbitMQ 3.8+
    )
    await dlq.bind(dlx, routing_key=MAIN_QUEUE)
    
    # Основная очередь с указанием DLX
    main_queue = await channel.declare_queue(
        MAIN_QUEUE,
        durable=True,
        arguments={
            "x-dead-letter-exchange": DLX,
            "x-dead-letter-routing-key": MAIN_QUEUE,
            "x-message-ttl": 60_000,  # 60 сек TTL для retry
        },
    )
    return main_queue, dlq


async def process_message(message: aio_pika.IncomingMessage):
    async with message.process(requeue=False):
        body = json.loads(message.body)
        retry_count = message.headers.get("x-retry-count", 0)
        
        try:
            await handle_order(body)
        except Exception as exc:
            logger.error(f"Failed to process order {body.get('id')}: {exc}")
            
            if retry_count < MAX_RETRIES:
                # Переотправляем с увеличенным счётчиком
                await message.channel.default_exchange.publish(
                    aio_pika.Message(
                        body=message.body,
                        headers={**message.headers, "x-retry-count": retry_count + 1},
                    ),
                    routing_key=MAIN_QUEUE,
                )
            else:
                # Исчерпали попытки — nack без requeue,
                # RabbitMQ отправит в DLX автоматически
                logger.warning(
                    f"Message dead-lettered after {MAX_RETRIES} retries: {body.get('id')}"
                )
                # Здесь — алерт в Slack/PagerDuty
                await send_dlq_alert(body, str(exc))

После отправки в DLQ необходим alerting. DLQ не должен молча накапливать сообщения неделями. Настройте мониторинг глубины DLQ: если очередь растёт — это сигнал о системной проблеме.

# Prometheus alert для глубины DLQ
# prometheus/rules/messaging.yml
groups:
  - name: messaging
    rules:
      - alert: DLQDepthCritical
        expr: rabbitmq_queue_messages{queue="orders.dlq"} > 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "DLQ depth exceeded threshold"
          description: "orders.dlq has {{ $value }} messages — manual intervention required"
      
      - alert: DLQGrowing
        expr: rate(rabbitmq_queue_messages{queue="orders.dlq"}[10m]) > 0
        for: 15m
        labels:
          severity: info
        annotations:
          summary: "DLQ is growing steadily"

Timeout budgets: управление таймаутами в цепочке

Таймауты — один из самых недооценённых аспектов distributed systems. Без таймаутов один медленный сервис держит поток бесконечно, ресурсы исчерпываются, система деградирует.

Концепция timeout budget: у каждого входящего запроса есть бюджет времени. При проксировании в downstream-сервисы передаётся оставшийся бюджет, а не фиксированный таймаут. Это предотвращает ситуацию, когда вся цепочка вместе превышает ожидаемое время ответа.

import time
import httpx
from contextvars import ContextVar

# Хранение deadline в контексте запроса
request_deadline: ContextVar[float | None] = ContextVar("request_deadline", default=None)


def set_request_deadline(timeout_seconds: float):
    """Устанавливаем deadline для текущего запроса."""
    request_deadline.set(time.monotonic() + timeout_seconds)


def get_remaining_budget() -> float:
    """Сколько секунд осталось в бюджете."""
    deadline = request_deadline.get()
    if deadline is None:
        return 5.0  # дефолт если нет deadline
    remaining = deadline - time.monotonic()
    if remaining <= 0:
        raise TimeoutError("Request deadline exceeded before call")
    return remaining


async def call_downstream(url: str, payload: dict) -> dict:
    """Вызов downstream сервиса с оставшимся бюджетом."""
    timeout = min(get_remaining_budget() * 0.9, 10.0)  # 90% бюджета, max 10s
    
    async with httpx.AsyncClient(timeout=timeout) as client:
        r = await client.post(url, json=payload)
        r.raise_for_status()
        return r.json()


# FastAPI middleware для установки deadline
from fastapi import Request
import asyncio

async def deadline_middleware(request: Request, call_next):
    # Читаем deadline из заголовка (gRPC-style)
    deadline_header = request.headers.get("X-Request-Deadline")
    if deadline_header:
        try:
            deadline = float(deadline_header)  # unix timestamp
            budget = deadline - time.time()
            if budget > 0:
                set_request_deadline(budget)
        except ValueError:
            pass
    
    # Если нет заголовка — ставим дефолт
    if request_deadline.get() is None:
        set_request_deadline(30.0)  # 30 секунд максимум
    
    return await call_next(request)

Bulkhead: изоляция failure domains

Bulkhead (переборка) — морской термин: водонепроницаемые перегородки не дают одному пробою затопить весь корабль. В программировании — изоляция ресурсов между пулами запросов.

Без bulkhead: один медленный downstream захватывает весь пул потоков, другие операции не могут выполняться даже если их downstream работает нормально.

import asyncio
from asyncio import Semaphore

# Отдельные semaphore для каждого downstream сервиса
# Лимитируем одновременные запросы независимо

class BulkheadedClient:
    def __init__(self):
        # Изолированные пулы для каждого сервиса
        self._semaphores = {
            "payment": Semaphore(10),    # max 10 concurrent calls to payment
            "inventory": Semaphore(20),  # max 20 concurrent calls to inventory
            "notification": Semaphore(5), # max 5 concurrent calls to notifications
        }
        self._client = httpx.AsyncClient(timeout=5.0)
    
    async def call(self, service: str, url: str, **kwargs) -> dict:
        semaphore = self._semaphores.get(service)
        if semaphore is None:
            raise ValueError(f"Unknown service: {service}")
        
        # Пробуем взять ресурс из пула — без блокировки
        acquired = await asyncio.wait_for(
            semaphore.acquire(),
            timeout=1.0,  # 1 секунда на получение слота
        )
        try:
            r = await self._client.post(url, **kwargs)
            r.raise_for_status()
            return r.json()
        finally:
            semaphore.release()
    
    async def call_payment(self, payload: dict) -> dict:
        try:
            return await self.call("payment", "http://payment-svc/charge", json=payload)
        except asyncio.TimeoutError:
            # Пул заполнен — немедленный отказ
            raise RuntimeError("Payment bulkhead full: too many concurrent requests")

В Java-экосистеме bulkhead реализуется через resilience4j с типами Semaphore и ThreadPool. ThreadPool bulkhead более мощный: изолирует не только количество запросов, но и потоки выполнения.

Graceful degradation: система, которая работает при частичных сбоях

Graceful degradation — способность системы продолжать работу в ограниченном режиме при сбое части компонентов. Вместо полного отказа — частичные результаты, кеш, дефолтные значения.

from dataclasses import dataclass
from typing import Optional
import asyncio

@dataclass
class ProductPage:
    product: dict
    recommendations: list = None
    user_reviews: list = None
    inventory_status: str = "unknown"


async def get_product_page(product_id: str, user_id: str) -> ProductPage:
    """
    Загружаем страницу продукта с graceful degradation.
    Если рекомендации или отзывы недоступны — показываем без них.
    """
    # Критический путь: продукт обязателен
    product = await product_service.get(product_id)  # без fallback
    
    # Некритичные сервисы — parallel с timeout
    async def safe_recommendations():
        try:
            return await asyncio.wait_for(
                recommendation_svc.get(user_id, product_id),
                timeout=1.5,
            )
        except Exception:
            return []  # fallback: пустой список
    
    async def safe_reviews():
        try:
            return await asyncio.wait_for(
                review_svc.get(product_id, limit=5),
                timeout=2.0,
            )
        except Exception:
            return []
    
    async def safe_inventory():
        try:
            inv = await asyncio.wait_for(
                inventory_svc.get_status(product_id),
                timeout=1.0,
            )
            return inv["status"]
        except Exception:
            # Если инвентарь недоступен — не блокируем покупку,
            # проверяем при оформлении заказа
            return "unknown"
    
    recs, reviews, inventory = await asyncio.gather(
        safe_recommendations(),
        safe_reviews(),
        safe_inventory(),
        return_exceptions=False,
    )
    
    return ProductPage(
        product=product,
        recommendations=recs,
        user_reviews=reviews,
        inventory_status=inventory,
    )

Feature flags — мощный инструмент для управляемой деградации. Вместо кода, который "пробует и падает", — явное отключение фичи через конфиг:

// Feature flags для управляемой деградации
interface FeatureFlags {
  recommendations_enabled: boolean;
  real_time_inventory: boolean;
  ai_search: boolean;
}

async function getFlags(): Promise {
  try {
    // Загружаем из LaunchDarkly / Unleash / своего хранилища
    return await flagsService.getAll();
  } catch {
    // Если флаговый сервис недоступен — safe defaults
    return {
      recommendations_enabled: false,
      real_time_inventory: false,
      ai_search: false,
    };
  }
}

async function buildHomePage(userId: string): Promise {
  const flags = await getFlags();

  const [feed, recs, searchConfig] = await Promise.all([
    getMainFeed(),
    flags.recommendations_enabled
      ? getRecommendations(userId)
      : Promise.resolve([]), // skip if disabled
    flags.ai_search
      ? getAISearchConfig()
      : Promise.resolve({ mode: "basic" }),
  ]);

  return { feed, recommendations: recs, searchConfig };
}

Observability: видеть то, что происходит

Все описанные паттерны бессмысленны без наблюдаемости. Если circuit breaker открылся — вы должны узнать об этом немедленно, понять почему, и восстановить сервис.

Структурированное логирование ошибок — фундамент. Логи должны содержать контекст, а не просто текст исключения:

import logging
import json
import traceback
from datetime import datetime, timezone

class StructuredLogger:
    def __init__(self, service: str):
        self.service = service
        self._logger = logging.getLogger(service)
    
    def error(self, event: str, exc: Exception = None, **context):
        record = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "level": "error",
            "service": self.service,
            "event": event,
            **context,
        }
        
        if exc is not None:
            record["error"] = {
                "type": type(exc).__name__,
                "message": str(exc),
                "traceback": traceback.format_exc(),
            }
        
        # JSON в stdout — для Datadog/ELK/CloudWatch
        print(json.dumps(record, ensure_ascii=False))
    
    def circuit_open(self, downstream: str, failure_rate: float, **ctx):
        self.error(
            "circuit_breaker_opened",
            downstream=downstream,
            failure_rate=failure_rate,
            **ctx,
        )


# Использование
logger = StructuredLogger("order-service")

async def process_payment(order_id: str, amount: float):
    try:
        return await charge_payment(order_id, amount)
    except pybreaker.CircuitBreakerError as exc:
        logger.error(
            "payment_circuit_open",
            exc,
            order_id=order_id,
            amount=amount,
            action="fallback_to_pending",
        )
        return {"status": "pending"}

Error budgets (SRE-подход): вместо целевого "нулевого количества ошибок" определяем допустимый уровень ошибок — error budget. Если SLO = 99.9% успешных запросов, error budget = 0.1% = ~43 минуты простоя в месяц. Это реалистично и позволяет команде двигаться.

# Prometheus: SLO recording rules
# prometheus/rules/slo.yml
groups:
  - name: slo_order_service
    interval: 30s
    rules:
      # Доля успешных запросов за 5-минутное окно
      - record: job:http_requests:success_rate5m
        expr: |
          sum(rate(http_requests_total{job="order-service",status!~"5.."}[5m]))
          /
          sum(rate(http_requests_total{job="order-service"}[5m]))
      
      # Алерт: SLO нарушено за последний час
      - alert: SLOBudgetBurning
        expr: |
          (1 - job:http_requests:success_rate5m{job="order-service"}) > 0.01
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Order service error rate > 1% — burning error budget"
          description: "Current error rate: {{ printf \"%.2f\" (mul $value 100) }}%"

Паттерны устойчивости не добавляются постфактум — они проектируются заранее. Circuit breaker без observability бесполезен. Retry без idempotency создаёт новые проблемы. Graceful degradation требует, чтобы продукт определил: что является критическим путём, а что — optional enhancement.

Начните с малого: добавьте таймауты везде, где их нет. Это одно изменение предотвращает большинство каскадных отказов. Потом — circuit breaker для самых ненадёжных зависимостей. Потом — DLQ для асинхронных операций. Итеративно, слой за слоем.

В распределённой системе вопрос не в том, упадёт ли что-то — а в том, деградирует ли система изолированно или тянет за собой всё остальное. Цель всех паттернов — первое.

← ПредыдущаяКэширование: от HTTP-заголовков... Следующая →Infrastructure as Code: Terraform vs Pulumi