Кейс: Data pipeline для health-tech стартапа: от хаоса к real-time аналитике
Один из наших клиентов — health-tech стартап, производящий IoT-устройства для непрерывного мониторинга состояния пациентов — столкнулся с ситуацией, которую в индустрии называют «data swamp»: данные формально собирались, но были практически бесполезны для клинических решений в реальном времени. Задержка в 4 часа между измерением и отображением в интерфейсе врача — не задержка. Это провал.
К моменту начала нашей работы у клиента было около 5 000 подключённых устройств в нескольких клиниках-партнёрах. Планы — масштабироваться до 50K устройств в течение двух лет. Регулятор тем временем запросил полный audit trail всех измерений для сертификации. Горело со всех сторон.
// Исходная архитектура: почему сломалось
Исходная система собиралась быстро, под MVP, — это нормально. Проблема в том, что она продолжала работать в production значительно дольше, чем планировалось. Типичная история.
Данные с устройств поступали по трём протоколам одновременно. Часть устройств использовала MQTT — лёгкий протокол для IoT, оптимальный по батарее. Часть отправляла данные через HTTP-вебхуки — более новые модели. Клиники требовали поддержки HL7 FHIR — отраслевого стандарта для медицинских данных, который нужен для интеграции с госпитальными системами.
Обработка этих трёх потоков выполнялась набором cron-скриптов на Python, которые запускались каждые 15 минут. Скрипты читали данные из очередей, трансформировали их в единый формат и писали в PostgreSQL. При нормальной нагрузке — работало. При пиках (утренние обходы, когда медперсонал подключал множество устройств одновременно) — скрипты не успевали, очереди переполнялись, данные терялись без какого-либо алерта.
Потеря медицинских данных — это не просто техническая проблема. Для регулятора это вопрос сертификации. Для клиники — потенциальная ответственность. Для пациента — риск пропущенного критического показателя.
Real-time алертов не было вообще. Если у пациента критически падало насыщение крови кислородом, система об этом узнавала через 15 минут — следующим запуском cron. На практике — ещё позже, с учётом задержек очередей.
// Проектирование: от требований к архитектуре
Мы начали с формализации требований — особенно в части, где они пересекались с регуляторными. Три ключевых ограничения определили архитектуру:
- Zero data loss. Потеря любого измерения недопустима. Это означает guaranteed delivery на уровне messaging layer.
- Audit trail. Каждое измерение должно иметь неизменяемую запись: когда получено, от какого устройства, как обработано, кто видел. Immutable log — не feature, а требование регулятора.
- Latency < 30 секунд для критических алертов. Мы взяли цель 12 секунд как рабочую — с запасом относительно регуляторного требования.
Эти три требования вместе практически однозначно определяют стек: event streaming + stream processing + time-series storage. Мы выбрали Apache Kafka, Apache Flink и TimescaleDB (PostgreSQL extension для временных рядов).
// Unified data model: прежде чем строить pipeline
Самая важная работа была сделана до написания кода — проектирование единой модели данных. MQTT, HTTP и HL7 FHIR дают принципиально разные форматы: бинарные пакеты, JSON и XML с медицинской семантикой соответственно. Гетерогенность на входе — нормально. Гетерогенность внутри pipeline — катастрофа.
-- Unified measurement model (TimescaleDB / PostgreSQL)
CREATE TABLE measurements (
id UUID NOT NULL DEFAULT gen_random_uuid(),
device_id UUID NOT NULL,
patient_id UUID, -- nullable: устройство может быть не назначено
measurement_type VARCHAR(64) NOT NULL, -- 'spo2', 'heart_rate', 'blood_pressure', etc.
value_numeric DOUBLE PRECISION,
value_json JSONB, -- для составных измерений (BP: systolic/diastolic)
unit VARCHAR(32) NOT NULL,
quality_score SMALLINT CHECK (quality_score BETWEEN 0 AND 100),
source_protocol VARCHAR(16) NOT NULL, -- 'mqtt', 'http', 'fhir'
received_at TIMESTAMPTZ NOT NULL, -- время получения нашей системой
measured_at TIMESTAMPTZ NOT NULL, -- время измерения на устройстве
raw_payload BYTEA, -- оригинальный пакет для audit
ingestion_id UUID NOT NULL, -- трассировка через pipeline
PRIMARY KEY (id, measured_at)
) PARTITION BY RANGE (measured_at);
-- TimescaleDB гипертаблица: автоматическое партиционирование по времени
SELECT create_hypertable('measurements', 'measured_at',
chunk_time_interval => INTERVAL '1 day',
if_not_exists => TRUE
);
-- Индекс для real-time запросов по устройству
CREATE INDEX ON measurements (device_id, measured_at DESC);
-- Индекс для запросов по пациенту (клинический dashboard)
CREATE INDEX ON measurements (patient_id, measurement_type, measured_at DESC)
WHERE patient_id IS NOT NULL;
Поле raw_payload — ключ для audit trail. Оригинальный пакет хранится без изменений. Регулятор может в любой момент запросить данные для конкретного устройства за конкретный период — и получить как обработанные значения, так и исходные байты.
// Flink: topology обработки потока
Apache Flink обрабатывает поток в несколько стадий. Каждая стадия — отдельный оператор с собственным parallelism и back-pressure handling.
// Упрощённая Flink topology для обработки измерений
// Реальный код на Java, здесь — схематичное представление
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1. Источник: Kafka consumer с exactly-once семантикой
KafkaSource<RawMeasurement> kafkaSource = KafkaSource.<RawMeasurement>builder()
.setBootstrapServers(config.kafkaBrokers)
.setTopics("raw.measurements.mqtt", "raw.measurements.http", "raw.measurements.fhir")
.setGroupId("flink-measurements-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new RawMeasurementDeserializer())
.build();
DataStream<RawMeasurement> rawStream = env
.fromSource(kafkaSource, WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withIdleness(Duration.ofMinutes(1)),
"Kafka Measurements Source");
// 2. Нормализация: унификация форматов (MQTT/HTTP/FHIR → единая модель)
DataStream<NormalizedMeasurement> normalized = rawStream
.process(new MeasurementNormalizerFunction()) // роутинг по source_protocol
.name("Protocol Normalizer");
// 3. Валидация: проверка диапазонов, качества сигнала, дубликатов
DataStream<ValidatedMeasurement> validated = normalized
.keyBy(m -> m.deviceId)
.process(new MeasurementValidatorFunction(config.validationRules))
.name("Quality Validator");
// 4. Alert detection: окно 30 секунд, детекция критических показателей
validated
.keyBy(m -> m.patientId)
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.process(new CriticalAlertDetector(config.alertThresholds))
.addSink(new KafkaSink<>("alerts.critical"))
.name("Critical Alert Detector");
// 5. Sink: запись в TimescaleDB с exactly-once через транзакции
validated
.addSink(JdbcSink.exactlyOnceSink(
"INSERT INTO measurements (...) VALUES (?) ON CONFLICT (id, measured_at) DO NOTHING",
new MeasurementRowMapper(),
JdbcExactlyOnceOptions.defaults(),
() -> config.timescaleDataSource()
))
.name("TimescaleDB Sink");
env.execute("Measurements Pipeline v2");
Exactly-once семантика — на уровне и Kafka (транзакционный продюсер), и Flink (checkpointing + JDBC exactly-once sink). Именно это обеспечивает гарантию zero data loss: при сбое Flink восстанавливается из checkpoint и продолжает с последней успешной позиции, не создавая дубликатов.
// Alerting pipeline: правила на основе медицинских порогов
Пороги алертов — не произвольные числа. Они основаны на клинических протоколах и были согласованы с медицинским советником клиента. Система поддерживает три уровня: критический (немедленное уведомление дежурного), предупреждение (уведомление лечащего врача) и информационный (запись в лог).
# Конфигурация порогов (хранится в PostgreSQL, горячее обновление без рестарта)
ALERT_THRESHOLDS = {
"spo2": {
"critical": {"below": 90, "duration_sec": 10}, # SpO2 < 90% в течение 10 сек
"warning": {"below": 94, "duration_sec": 30},
},
"heart_rate": {
"critical_high": {"above": 150, "duration_sec": 15},
"critical_low": {"below": 40, "duration_sec": 10},
"warning_high": {"above": 120, "duration_sec": 60},
},
"blood_pressure_systolic": {
"critical_high": {"above": 180, "duration_sec": 0}, # немедленно
"critical_low": {"below": 80, "duration_sec": 0},
}
}
# Алерт-событие в Kafka (упрощённо)
@dataclass
class CriticalAlert:
alert_id: str # UUID
patient_id: str
device_id: str
measurement_type: str
trigger_value: float
threshold: float
severity: Literal["critical", "warning", "info"]
detected_at: datetime
window_start: datetime
window_end: datetime
notified_staff: list[str] # заполняется notification-service после отправки
// Результаты
Через три месяца после запуска нового pipeline в production цифры следующие:
- Задержка данных: 4 часа → 12 секунд (медиана). P99 — 28 секунд, что укладывается в клиническое требование.
- Zero data loss — ни одного потерянного измерения с момента запуска. Система пережила три пиковых нагрузки во время крупных клинических мероприятий без деградации.
- Real-time алерты — критические уведомления доходят до дежурного персонала в среднем за 18 секунд с момента события на устройстве.
- Аудит регулятора пройден — полный audit trail, неизменяемые записи, воспроизводимость — все требования выполнены. Клиент получил необходимую сертификацию.
- Масштабируемость до 50K устройств подтверждена нагрузочным тестированием. Архитектура горизонтально масштабируется без изменений: добавление Kafka-партиций и Flink task managers.
Самый важный результат — не технический. Врачи начали доверять данным из системы. Это произошло, когда они перестали видеть расхождения между показаниями устройства и тем, что отображается на экране.
// Уроки и антипаттерны
Несколько вещей, которые мы сделали правильно с первого раза — и которые рекомендуем закладывать в любой медицинский data pipeline с самого начала.
Timestamp discipline. У каждого измерения два timestamp: measured_at (время на устройстве) и received_at (время получения системой). Это не косметика — это принципиальное различие. Устройства работают офлайн, синхронизируются пакетами. Без measured_at нельзя правильно реконструировать временной ряд.
Schema evolution. Форматы данных медицинских устройств меняются с каждой прошивкой. Kafka Schema Registry с backward compatibility policy — не опция, а требование. Мы потратили неделю на его настройку в самом начале и не пожалели ни разу.
Идемпотентность на всех уровнях. Устройства иногда отправляют одно измерение несколько раз (retry после потери связи). Система должна это обрабатывать корректно. ON CONFLICT DO NOTHING в SQL — это только последний рубеж; дедупликация должна быть и в Flink-операторах.
Если вы строите data infrastructure для медицинских или регулируемых данных — приходите поговорить до начала разработки. Дешевле заложить правильную архитектуру сразу, чем переделывать под требования регулятора за месяц до сертификации.