Apache Kafka — повторяющаяся перебалансировка с большим количеством потребителей

Контекст

Мы используем Kafka для обработки больших сообщений, очень редко до 10 МБ, но в основном в диапазоне 500 КБ. Обработка сообщения может занять до 30 секунд, но иногда и минуту.

Проблема

Обработка данных с меньшим количеством потребителей (примерно до 50) не приводит к повторяющейся повторной балансировке брокером, и обработка работает нормально. Любая перебалансировка в этом масштабе также довольно быстрая, в основном менее минуты, согласно журналам брокера.

Как только потребители масштабируются до 100 или 200, потребители постоянно перебалансируются с интервалами примерно до 5 минут. Это приводит к 5 минутам работы/потребления, за которыми следуют 5 минут восстановления баланса, а затем снова то же самое. Потребительские услуги не терпят неудачу, просто перебалансируются без реальной видимой причины. Это приводит к снижению пропускной способности при масштабировании потребителей.

При масштабировании до 200 потребителей обработка выполняется со средней скоростью 2 сообщения в минуту на потребителя. Скорость обработки для одного потребителя без перебалансировки составляет около 6 сообщений в минуту.

Я не подозреваю, что сеть центров обработки данных является проблемой, поскольку у нас есть некоторые потребители, выполняющие другой вид обработки сообщений, и у них нет проблем с передачей от 100 до 1000 сообщений в минуту.

Кто-то еще испытал этот шаблон и нашел простое решение, например. изменение определенного параметра конфигурации?

Дополнительная информация

Брокеры Kafka имеют версию 2.0, и их 10 в разных центрах обработки данных. Репликация установлена ​​на 3. Разделов для этой темы 500. Выдержка из конкретной конфигурации broker для лучшей обработки больших сообщений:

  • сжатие.тип=lz4
  • message.max.bytes=10000000 # 10 МБ
  • replica.fetch.max.bytes=10000000 # 10 МБ
  • group.max.session.timeout.ms=1320000 # 22 мин.
  • offset.retention.minutes=10080 # 7 дней

На стороне потребителя мы используем Java-клиент с прослушивателем перебалансировки, который очищает любые буферизованные сообщения из отозванных разделов. Этот буфер имеет размер 10 сообщений. Потребительские клиенты используют клиентский API версии 2.1, обновление java-клиента с 2.0 до 2.1, по-видимому, значительно сокращает журналы брокера следующего типа на этих больших числах потребителей (мы получали их почти для каждого клиента и каждой повторной балансировки ранее):

INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

Потребители находятся в другом центре обработки данных, чем брокеры. Фиксация смещений выполняется асинхронно. Рекуррентный опрос выполняется в потоке, который заполняет буфер с тайм-аутом 15 секунд; как только буфер заполнен, поток спит несколько секунд и опрашивает только тогда, когда в буфере есть свободное место. Выдержка из конфигурации для варианта использования больших сообщений:

  • max.partition.fetch.bytes.config=200000000 # 200 МБ
  • макс.опрос.записи.config=2
  • session.timeout.ms.config=1200000 # 20 мин.

Файл журнала

Ниже приводится выдержка из файла журнала брокера, который управляет этой конкретной группой в течение 30-минутного периода времени. Именование сокращено до my_group и mytopic. Есть также несколько записей из не относящейся к теме темы.

19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)

Большое спасибо за любую помощь в этом вопросе.


person calloc_org    schedule 24.11.2018    source источник


Ответы (1)


После некоторой дальнейшей разработки и тонкой настройки нам удалось взять проблему под контроль.

Во-первых, кажется, что некоторые сервисы по-прежнему обрабатываются с превышением лимита, и это очень редко приводит к их сбою. Последующий уход вызвал перебалансировку, а затем присоединение примерно через 6-7 минут, что также вызвало перебалансировку. Мы сократили это еще больше, оптимизировав наши услуги с точки зрения пропускной способности.

Вторым фактором была базовая сеть докеров, которую мы используем для масштабирования сервисов. По умолчанию интервал подтверждения очень короткий (5 секунд), так что любая тяжелая работа и сетевая нагрузка на узле-потребителе могут удалить его из роя докеров на очень короткий промежуток времени. Докер отвечает на это прерывание перемещением этих сервисов на другие узлы (повторная балансировка) с последующей повторной балансировкой, когда узел возвращается в оперативный режим. Поскольку сервисы имеют длительное время запуска, составляющее 5-7 минут, это приводит к повторной балансировке нескольких раз на каждом из этих событий.

Третий фактор — это ошибки в потребляющих службах, из-за которых одна из них время от времени зависала, скажем, на 1 % в час. Это снова вызывает два повторных баланса: один уходит, один присоединяется.

В совокупности эти проблемы привели к наблюдаемой проблеме. Последняя версия Kafka также, кажется, выводит больше информации о том, почему сервис покидает группу потребителей. Было бы неплохо, если бы Kafka продолжала предоставлять данные потребителям, которые все еще стабильны, я могу добавить запрос на эту функцию. Тем не менее, теперь он работает стабильно с достойной производительностью.

person calloc_org    schedule 22.01.2019