← все статьи
7 мин

Кейс: Data pipeline для health-tech стартапа: от хаоса к real-time аналитике

Один из наших клиентов — health-tech стартап, производящий IoT-устройства для непрерывного мониторинга состояния пациентов — столкнулся с ситуацией, которую в индустрии называют «data swamp»: данные формально собирались, но были практически бесполезны для клинических решений в реальном времени. Задержка в 4 часа между измерением и отображением в интерфейсе врача — не задержка. Это провал.

К моменту начала нашей работы у клиента было около 5 000 подключённых устройств в нескольких клиниках-партнёрах. Планы — масштабироваться до 50K устройств в течение двух лет. Регулятор тем временем запросил полный audit trail всех измерений для сертификации. Горело со всех сторон.

// Исходная архитектура: почему сломалось

Исходная система собиралась быстро, под MVP, — это нормально. Проблема в том, что она продолжала работать в production значительно дольше, чем планировалось. Типичная история.

Данные с устройств поступали по трём протоколам одновременно. Часть устройств использовала MQTT — лёгкий протокол для IoT, оптимальный по батарее. Часть отправляла данные через HTTP-вебхуки — более новые модели. Клиники требовали поддержки HL7 FHIR — отраслевого стандарта для медицинских данных, который нужен для интеграции с госпитальными системами.

Обработка этих трёх потоков выполнялась набором cron-скриптов на Python, которые запускались каждые 15 минут. Скрипты читали данные из очередей, трансформировали их в единый формат и писали в PostgreSQL. При нормальной нагрузке — работало. При пиках (утренние обходы, когда медперсонал подключал множество устройств одновременно) — скрипты не успевали, очереди переполнялись, данные терялись без какого-либо алерта.

Потеря медицинских данных — это не просто техническая проблема. Для регулятора это вопрос сертификации. Для клиники — потенциальная ответственность. Для пациента — риск пропущенного критического показателя.

Real-time алертов не было вообще. Если у пациента критически падало насыщение крови кислородом, система об этом узнавала через 15 минут — следующим запуском cron. На практике — ещё позже, с учётом задержек очередей.

// Проектирование: от требований к архитектуре

Мы начали с формализации требований — особенно в части, где они пересекались с регуляторными. Три ключевых ограничения определили архитектуру:

  • Zero data loss. Потеря любого измерения недопустима. Это означает guaranteed delivery на уровне messaging layer.
  • Audit trail. Каждое измерение должно иметь неизменяемую запись: когда получено, от какого устройства, как обработано, кто видел. Immutable log — не feature, а требование регулятора.
  • Latency < 30 секунд для критических алертов. Мы взяли цель 12 секунд как рабочую — с запасом относительно регуляторного требования.

Эти три требования вместе практически однозначно определяют стек: event streaming + stream processing + time-series storage. Мы выбрали Apache Kafka, Apache Flink и TimescaleDB (PostgreSQL extension для временных рядов).

// Unified data model: прежде чем строить pipeline

Самая важная работа была сделана до написания кода — проектирование единой модели данных. MQTT, HTTP и HL7 FHIR дают принципиально разные форматы: бинарные пакеты, JSON и XML с медицинской семантикой соответственно. Гетерогенность на входе — нормально. Гетерогенность внутри pipeline — катастрофа.

-- Unified measurement model (TimescaleDB / PostgreSQL)
CREATE TABLE measurements (
    id              UUID        NOT NULL DEFAULT gen_random_uuid(),
    device_id       UUID        NOT NULL,
    patient_id      UUID,                              -- nullable: устройство может быть не назначено
    measurement_type VARCHAR(64) NOT NULL,             -- 'spo2', 'heart_rate', 'blood_pressure', etc.
    value_numeric   DOUBLE PRECISION,
    value_json      JSONB,                             -- для составных измерений (BP: systolic/diastolic)
    unit            VARCHAR(32) NOT NULL,
    quality_score   SMALLINT    CHECK (quality_score BETWEEN 0 AND 100),
    source_protocol VARCHAR(16) NOT NULL,              -- 'mqtt', 'http', 'fhir'
    received_at     TIMESTAMPTZ NOT NULL,              -- время получения нашей системой
    measured_at     TIMESTAMPTZ NOT NULL,              -- время измерения на устройстве
    raw_payload     BYTEA,                             -- оригинальный пакет для audit
    ingestion_id    UUID        NOT NULL,              -- трассировка через pipeline
    PRIMARY KEY (id, measured_at)
) PARTITION BY RANGE (measured_at);

-- TimescaleDB гипертаблица: автоматическое партиционирование по времени
SELECT create_hypertable('measurements', 'measured_at',
    chunk_time_interval => INTERVAL '1 day',
    if_not_exists => TRUE
);

-- Индекс для real-time запросов по устройству
CREATE INDEX ON measurements (device_id, measured_at DESC);

-- Индекс для запросов по пациенту (клинический dashboard)
CREATE INDEX ON measurements (patient_id, measurement_type, measured_at DESC)
    WHERE patient_id IS NOT NULL;

Поле raw_payload — ключ для audit trail. Оригинальный пакет хранится без изменений. Регулятор может в любой момент запросить данные для конкретного устройства за конкретный период — и получить как обработанные значения, так и исходные байты.

// Flink: topology обработки потока

Apache Flink обрабатывает поток в несколько стадий. Каждая стадия — отдельный оператор с собственным parallelism и back-pressure handling.

// Упрощённая Flink topology для обработки измерений
// Реальный код на Java, здесь — схематичное представление

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 1. Источник: Kafka consumer с exactly-once семантикой
KafkaSource<RawMeasurement> kafkaSource = KafkaSource.<RawMeasurement>builder()
    .setBootstrapServers(config.kafkaBrokers)
    .setTopics("raw.measurements.mqtt", "raw.measurements.http", "raw.measurements.fhir")
    .setGroupId("flink-measurements-processor")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new RawMeasurementDeserializer())
    .build();

DataStream<RawMeasurement> rawStream = env
    .fromSource(kafkaSource, WatermarkStrategy
        .forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withIdleness(Duration.ofMinutes(1)),
        "Kafka Measurements Source");

// 2. Нормализация: унификация форматов (MQTT/HTTP/FHIR → единая модель)
DataStream<NormalizedMeasurement> normalized = rawStream
    .process(new MeasurementNormalizerFunction())  // роутинг по source_protocol
    .name("Protocol Normalizer");

// 3. Валидация: проверка диапазонов, качества сигнала, дубликатов
DataStream<ValidatedMeasurement> validated = normalized
    .keyBy(m -> m.deviceId)
    .process(new MeasurementValidatorFunction(config.validationRules))
    .name("Quality Validator");

// 4. Alert detection: окно 30 секунд, детекция критических показателей
validated
    .keyBy(m -> m.patientId)
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .process(new CriticalAlertDetector(config.alertThresholds))
    .addSink(new KafkaSink<>("alerts.critical"))
    .name("Critical Alert Detector");

// 5. Sink: запись в TimescaleDB с exactly-once через транзакции
validated
    .addSink(JdbcSink.exactlyOnceSink(
        "INSERT INTO measurements (...) VALUES (?) ON CONFLICT (id, measured_at) DO NOTHING",
        new MeasurementRowMapper(),
        JdbcExactlyOnceOptions.defaults(),
        () -> config.timescaleDataSource()
    ))
    .name("TimescaleDB Sink");

env.execute("Measurements Pipeline v2");

Exactly-once семантика — на уровне и Kafka (транзакционный продюсер), и Flink (checkpointing + JDBC exactly-once sink). Именно это обеспечивает гарантию zero data loss: при сбое Flink восстанавливается из checkpoint и продолжает с последней успешной позиции, не создавая дубликатов.

// Alerting pipeline: правила на основе медицинских порогов

Пороги алертов — не произвольные числа. Они основаны на клинических протоколах и были согласованы с медицинским советником клиента. Система поддерживает три уровня: критический (немедленное уведомление дежурного), предупреждение (уведомление лечащего врача) и информационный (запись в лог).

# Конфигурация порогов (хранится в PostgreSQL, горячее обновление без рестарта)
ALERT_THRESHOLDS = {
    "spo2": {
        "critical": {"below": 90, "duration_sec": 10},   # SpO2 < 90% в течение 10 сек
        "warning":  {"below": 94, "duration_sec": 30},
    },
    "heart_rate": {
        "critical_high": {"above": 150, "duration_sec": 15},
        "critical_low":  {"below": 40,  "duration_sec": 10},
        "warning_high":  {"above": 120, "duration_sec": 60},
    },
    "blood_pressure_systolic": {
        "critical_high": {"above": 180, "duration_sec": 0},  # немедленно
        "critical_low":  {"below": 80,  "duration_sec": 0},
    }
}

# Алерт-событие в Kafka (упрощённо)
@dataclass
class CriticalAlert:
    alert_id:       str          # UUID
    patient_id:     str
    device_id:      str
    measurement_type: str
    trigger_value:  float
    threshold:      float
    severity:       Literal["critical", "warning", "info"]
    detected_at:    datetime
    window_start:   datetime
    window_end:     datetime
    notified_staff: list[str]    # заполняется notification-service после отправки

// Результаты

Через три месяца после запуска нового pipeline в production цифры следующие:

  • Задержка данных: 4 часа → 12 секунд (медиана). P99 — 28 секунд, что укладывается в клиническое требование.
  • Zero data loss — ни одного потерянного измерения с момента запуска. Система пережила три пиковых нагрузки во время крупных клинических мероприятий без деградации.
  • Real-time алерты — критические уведомления доходят до дежурного персонала в среднем за 18 секунд с момента события на устройстве.
  • Аудит регулятора пройден — полный audit trail, неизменяемые записи, воспроизводимость — все требования выполнены. Клиент получил необходимую сертификацию.
  • Масштабируемость до 50K устройств подтверждена нагрузочным тестированием. Архитектура горизонтально масштабируется без изменений: добавление Kafka-партиций и Flink task managers.

Самый важный результат — не технический. Врачи начали доверять данным из системы. Это произошло, когда они перестали видеть расхождения между показаниями устройства и тем, что отображается на экране.

// Уроки и антипаттерны

Несколько вещей, которые мы сделали правильно с первого раза — и которые рекомендуем закладывать в любой медицинский data pipeline с самого начала.

Timestamp discipline. У каждого измерения два timestamp: measured_at (время на устройстве) и received_at (время получения системой). Это не косметика — это принципиальное различие. Устройства работают офлайн, синхронизируются пакетами. Без measured_at нельзя правильно реконструировать временной ряд.

Schema evolution. Форматы данных медицинских устройств меняются с каждой прошивкой. Kafka Schema Registry с backward compatibility policy — не опция, а требование. Мы потратили неделю на его настройку в самом начале и не пожалели ни разу.

Идемпотентность на всех уровнях. Устройства иногда отправляют одно измерение несколько раз (retry после потери связи). Система должна это обрабатывать корректно. ON CONFLICT DO NOTHING в SQL — это только последний рубеж; дедупликация должна быть и в Flink-операторах.

Если вы строите data infrastructure для медицинских или регулируемых данных — приходите поговорить до начала разработки. Дешевле заложить правильную архитектуру сразу, чем переделывать под требования регулятора за месяц до сертификации.


← все статьи Следующая →TypeScript: паттерны, которые реально спасают