8 800 555-91-99
#

Организуем платформу обработки потоковых данных из Apache Kafka, Spark и Greenplum

Блог · 27 января 2021
Как связаны серверы в дата-центре и распределенные приложения для обработки данных и почему нам пришлось написать свой коннектор для Spark и Greenplum.

Обычно задача платформы обработки потоковых данных состоит в том, чтобы собрать, сохранить и провести аналитику данных, используя их в дальнейшем для бизнес-аналитики, машинного обучения и т.п. Причём:

Требования к железу всегда индивидуальны, но чаще всего начать можно с базовой конфигурации:

А для прототип подойдёт и что-нибудь попроще: например, dedicated-серверы, где можно разворачивать виртуальные машины. Прежде чем масштабировать систему сбора, хранения и обработки данных, лучше её проверить в небольшой песочнице. Мы чаще всего работаем с виртуализацией KVM, но бывает OpenStack в связке с Ceph. А когда всё отработано — можно переходить и к работе на кластерах.

 

Архитектура платформы

Сейчас мы используем примерно такую схему:

На схеме можно условно выделить два основных блока: ПО, которое обрабатывает данные и складывает их в распределенную систему хранения данных, и системы обеспечения CI/CD.

Путь данных от источников до хранилища и дальнейших аналитических запросов, имеющих бизнес-ценность, включает следующие этапы:

Дальше я постараюсь раскрыть, почему выбран тот или иной продукт и какие рассматривали альтернативы. Например, для работы с данными сейчас используем Greenplum (продукт Pivotal), но ещё пробовали использовать Hadoop с HDFS и поверх этого Hive. Однако Greenplum показался быстрее, проще и у него открытый исходный код. Единственная проблема с Pivotal — как и Confluent, они не распространяют и не поддерживают свои продукты в РФ. Поэтому чтобы связать Spark с Greenplum, понадобится использовать что-нибудь опенсорсное (какие есть варианты — обсудим ниже).

Каждый узел платформы обработки данных мониторится. Для этого используем связку Prometheus + Grafana, а также свою систему мониторинга серверных параметров, по которым администраторы реагируют на инциденты раньше, чем наступили бы серьезные последствия.

За обеспечение непрерывной доставки отвечает следующий конвейер:

 

Нагрузочное тестирование

Для того чтобы оценить, насколько построенная схема жизнеспособна, проводим нагрузочное тестирование, которое включает в себя следующие шаги.

Также нагрузочное тестирование может быть частью CI/CD-процесса, тогда мы будем уверены в работоспособности системы после каждого деплоя, а не только после штучной проверки по списку.

 

Нагрузочное тестирование Яндекс.Танком

Первым вариантом, как проводить нагрузочное тестирование, была идея тестировать платформу обработки данных как веб-сайт — с помощью Яндекс.Танка: сделать микросервис, который принимает данные и посылает их в Kafka, и его тестировать. Для этого нужно предварительно подготовить нагрузку («патроны» для танка), чтобы трассировать данные. Это занимает некоторое время, а также оперативную память и место на диске.

В целом удобно — результаты и графики, как проходит нагрузка, можно посмотреть в облачном интерфейсе. Минус в том, что этот способ подходит только для HTTP-подобных протоколов и нагрузить можно только веб-ендпойнты, которые смотрят наружу.

 

Нагрузочное тестирование с Apache JMeter

Другой продукт, который можно использовать для нагрузочного тестирования, — это Apache JMeter. Он больше подходит для тестирования именно распределенных систем, потому что входит в экосистему Apache Software Foundation: для него есть плагины для прямой нагрузки на Kafka; можно нагружать не только веб-интерфейс, но и сразу шину данных; можно пробовать напрямую тестировать БД и есть поддержка сообщества.

Преимущества JMeter в итоге оказались решающими — на нём и остановились. Но перед этим свой велосипед тоже попробовали написать.

 

Нагрузочное тестирование самописным генератором

Если мы хотим обеспечить профилирование всего конвейера обработки данных от источников до СХД с учётом транспорта и обработки, то нужно иметь возможность трассировать и определить ёмкость каждого участка пути.

Один из подходов, который используется в Zipkin и Jaeger, состоит в генерации огромного количества пакетов данных с уникальными идентификаторами, создании с их помощью нагрузки и отслеживании всех этапов за счёт trace_id. При этом на каждом участке можно сделать обработчик и измерить, за сколько данные проходят участок пути от источника до обработчика, от обработчика до БД и т.д.

Общая схема понятна и позволяет подробно оценить возможности системы обработки данных, но чаще всего это избыточный метод, который требует дополнительных усилий и ресурсов. Обычно лучше использовать JMeter.

 

Брокер сообщений: Apache Kafka vs RabbitMQ

Выбрали Apache Kafka благодаря широким возможностям именно в контексте работы с большими данными и нагрузками, такими как хранение журнала, масштабируемость и достоверность. 

 

Apache Spark vs Flink

Для того чтобы выбрать один из этих фреймворков, решили попробовать оба. Подробное сравнение проводили 3 года назад, и тогда бросилось в глаза, что документации, примеров больше у Apache Spark, да и в целом распространенность у него больше. Но проведенное нагрузочное тестирование простых примеров, написанных на обоих фреймворках, показало лучшие результаты у Apache Flink. Например, Игорь Кураленок из "Яндекс.Облако" на недавнем митапе упоминал, что для потоковых данных рекомендуется использовать Apache Flink. Поэтому, судя по всему, этот фреймворк тоже зрелый и его можно использовать в разработке.

Но мы сделали выбор в пользу Apache Spark.

 

Требования к СХД

Три самых важных параметра, которые нужно оценить и учесть, составляя требования к СХД:

Сложность работы СХД в случае обработки больших объемов потоковых данных в том, что нужно одновременно и быстро писать, и читать. Аналитики обычно хотят работать со свежими данными, желательно, с теми, которые только что пришли, — чтобы сразу что-то смотреть и делать свои аналитические выводы. Нагрузка на СХД при этом возрастает, и нужно следить, к каким репликам должны подключаться аналитики, и вообще, должны ли они подключаться к репликам или лучше работать напрямую.

Единственное требование с точки зрения железа — держать серверы в одном дата-центре.

К сожалению, точные параметры назвать трудно, они зависят от каждого конкретного случая и отличаются для разных объемов данных и выполняемых задач. Иногда может быть достаточно 4-х выделенных серверов, иногда нужно 12. В каких-то случаях нужны большие диски, а в каких-то лучше шардирование и диски по 1-2 ТБайт. Бывает, что общий объем хранилища измеряется петабайтами, но для нас более привычны объемы в несколько десятков терабайт.

 

Холодные хранилища данных

Пробовали работать с разными холодными хранилищами, в том числе, с Apache Hadoop и Cassandra.

Наш опыт работы с Cassandra небольшой, использовать её напрямую в системе аналитики сложнее. Это более медленная система и лучше подходит для долговременных хранилищ. Мы же говорим о быстрой обработке больших объемов данных — как структурированных, так и неструктурированных. Они попадают в озеро данных на Hadoop и хранятся там в чистом виде, пока не понадобятся, а потом уже передаются в тот же Greenplum и там обрабатываются.

В работе с Apache Hadoop использовали такую систему:

 

Pivotal Greenplum

Как уже писал ранее, в контексте работы с аналитикой остановились на Greenplum. High availability обеспечиваем за счет:

Spark+Greenplum = ?

Один из ключевых моментов статьи: каким образом можно обработать потоковые данные и записать их в систему хранения данных, которая сделана на Greenplum? Как уже говорил, несмотря на то, что Greenplum — продукт с открытым исходным кодом, вся обвязка и поддержка требует лицензии и контакта с западным рынком. Сейчас в России продукт не поддерживается.

Чтобы связать Spark и Greenplum, есть несколько вариантов:

Наш Spark Greenplum connector распространяется под лицензией MIT, поддерживает DataSource API v2 Spark (только для Spark v2). Сейчас он ещё в альфа-версии и работает только на чтение, в дальнейшем доработаем и на запись, но пока, как и в предыдущем варианте, можно записывать при помощи JDBC-запросов через копирование.

Мы используем cookies для быстрой и удобной работы сайта. Продолжая пользоваться сайтом, вы даёте согласие и принимаете политику обработки персональных данных