Как мне установить свойства производителя kafka для атомарного потребителя kafka

Я просматривал эту статью, которая объясняет, как гарантировать, что сообщение обрабатывается ровно один раз, выполнив следующие действия:

  • Чтение (тема, раздел, смещение) из базы данных при запуске / перезапуске
  • Прочитать сообщение из определенного (тема, раздел, смещение)
  • Atomically do following things (say for example in same database transaction):
    • Processing message
    • Зафиксировать смещение в базе данных как (тема, раздел, смещение)
    • Вручную зафиксируйте смещение в Kafka, вызвав consumer.commitAsync() или consumer.commitSync()

Я сомневаюсь, каков эффект установки разных значений для разных потребительских свойств:

  1. enable.auto.commit
    Как мне установить это свойство? true или false? В статье говорится, что мы должны установить его на false. Но что может случиться, если я установлю true? При этом я сохраняю смещение во внешнюю базу данных. Таким образом, после сбоя, когда потребитель подключается к сети, он начинает использовать смещение, сохраненное в базе данных. Итак, я чувствую, что значение этого свойства не влияет на запуск / перезапуск.
    Также я не думаю, что будет какое-либо влияние различных значений этого свойства в рамках одного запуска потребителя, поскольку смещение используется для чтения следующего сообщения и того, будет ли мы фиксируем его вручную или автоматически не имеет никакого эффекта (смещение все равно будет таким же).

  2. auto.offset.reset
    Есть два основных значения этого свойства latest и earliest. Если установлено значение latest, он заставит потребителя читать сообщения, помещенные впоследствии, то есть после запуска потребителя. Если установлено значение earliest, потребитель будет читать из первого непрочитанного сообщения. Поскольку оба эти фактора влияют на то, откуда потребитель должен начать чтение сообщения при запуске, я считаю, что это свойство также не повлияет на атомарного потребителя, указанного в статье. Это связано с тем, что в этой реализации вновь запущенный потребитель начинает читать сообщения со смещения, указанного в базе данных.

Верно ли я с обеими вышеупомянутыми мыслями?


person Mahesha999    schedule 25.10.2019    source источник
comment
что вы имеете в виду, говоря о сохранении смещения во внешней базе данных?   -  person Divyanshu Jimmy    schedule 25.10.2019
comment
Во внешнюю базу данных. Если вы читаете статью по ссылке, в ней приведен пример хранения смещения в MySQL.   -  person Mahesha999    schedule 25.10.2019
comment
В случае enable.auto.commit, если вы установите для него значение true, рассмотрите случай, когда потребитель все еще выполняет тяжелую обработку сообщения, но он уже начал читать следующие смещения. Если ваш потребитель потерпел неудачу в этом случае, то считанное смещение будет сохранено в базе данных. Когда система снова запустится, у вас не будет фактического смещения сообщения, которое не удалось во время обработки, следовательно, вы потеряли сообщения.   -  person Divyanshu Jimmy    schedule 25.10.2019


Ответы (2)


  1. enable.auto.commit
    Когда потребитель перезагружается после сбоя, он начинает использовать раздел раздела со смещения, полученного из базы данных. Значение этого свойства будет использоваться в одном безаварийном прогоне потребителя, как и в любом другом сценарии.

    Автоматическая фиксация совершает фиксацию потребителя каждые 5 секунд (значение по умолчанию auto.commit.interval.ms) или на каждой poll() звонки.

    Ручная фиксация (enable.auto.commit=false) помогает избежать отказа от обработки сообщения. Например, если 5-секундный таймер автоматической фиксации истекает между чтением и обработкой сообщения, он может выполнить: (read, commit, process) в этой последовательности. И если потребитель выйдет из строя после фиксации без обработки сообщения (read,commit,crash), это сообщение никогда не будет обработано, потому что на следующем poll() потребитель получит следующее сообщение (поскольку фиксация была успешной). Мы можем предотвратить это, выполнив ручную фиксацию в следующем порядке: (read, process,commit).

    Однако в этом случае существует вероятность того, что потребитель может выйти из строя после обработки без фиксации (read,process,crash). Это приведет к повторной обработке того же сообщения в следующем poll().

    Чтобы избежать этой дублирующей обработки, мы сохраняем смещение во внешней базе данных и получаем его при перезапуске потребителя. Обратите внимание, что сохранение смещения в базе данных и его выборка при перезапуске потребителя также позволяет избежать обработки сообщения в случае последовательности (read,commit,crash), которая может произойти в случае автоматической фиксации.

    Таким образом, вкратце, ручная фиксация не служит никакой цели, когда мы сохраняем смещение во внешней базе данных. Следовательно, мы можем установить enable.auto.commit в любое значение true или false. Однако, если установлено значение false, мы не должны забывать о явной фиксации. В противном случае потребитель будет продолжать читать и обрабатывать одно и то же сообщение.

  2. auto.offset.reset

    Его значение будет иметь значение, если в базе данных нет значения смещения для данного раздела темы. Это произойдет при первом запуске потребителя или при усечении базы данных. В этом случае нам нужно, чтобы потребитель начал использовать с первого сообщения, которое не было получено ни одним из потребителей в его группе потребителей. Для этого нам нужно установить свойство на earliest.

person Mahesha999    schedule 01.11.2019

Если вы используете Kafka Stream, он поддерживает шаблон потока Exactly-once, вы можете сослаться на

Шаблон потока «Ровно один раз» - это просто возможность выполнить операцию чтения-обработки-записи ровно один раз. Означает, что вы потребляете одно сообщение за раз, получаете процесс, публикуете его в другой теме и фиксируете. Таким образом, фиксация будет обрабатываться Stream автоматически, по одному сообщению за раз.

Если вы не используете конкретный вариант использования, для которого лучше использовать Kafka при условии фиксации, иначе вам нужно обработать многие сценарии сбоя вручную, например. что произошло, если возникла проблема с подключением к БД ... и если обработка миллиона или записей требует очень частого доступа к БД .. Kafka уже хранит зафиксированное смещение, которое лучше с точки зрения производительности, не требует какого-либо внешнего подключения к БД.

_consumer_offsets хранит информацию о подтвержденных смещениях для каждой темы, раздела для каждой группы потребителей. Который использует фиксацию для обновления деталей смещения по этой теме

Если у вас есть конкретный вариант использования, когда требуется атомарная транзакция, или вы хотите сохранить детали смещения рядом с вашим результатом, вы можете управлять внешним хранилищем смещений, как указано здесь

enable.auto.commit: В случае использования внешнего управления смещением вы не используете сценарий фиксации, поэтому не имеет значения, включен или отключен enable.auto.commit. Вы будете продолжать получать сообщения, используя метод seek (раздел TopicPartition, длинное смещение) и сохраняя смещение извне. В случае перезапуска начните выборку последнего сохраненного смещения. Единственное влияние, если какая-то встроенная панель управления, такая как Confluent Control-Center, Grafana и т. Д., Используемая для поддержки темы Kafka, не будет отражаться в случае, если не будет сделана ручная фиксация и enable.auto.commit false. .

auto.offset.reset Да, это влияет только при первом запуске, но поскольку вы используете для получения сообщения из определенного раздела и смещения, никакого воздействия.

======================= Обновлено =======================

enable.auto.commit - если true (по умолчанию), периодически фиксировать смещение последнего сообщения, переданного приложению. Принятое смещение будет использоваться при перезапуске процесса, чтобы продолжить с того места, где он остановился.

auto.commit.interval.ms - частота в миллисекундах, с которой смещения потребителя фиксируются (записываются) в хранилище смещений.

Примечание. если enable.auto.commit false, auto.commit.interval.ms не используется.

В случае enable.auto.commit истинный метод фиксации будет вызывать при каждом опросе, и если auto.commit.interval.ms будет передан, смещение будет зафиксировано

  1. интервал опроса> интервал фиксации: смещение фиксации во время интервала опроса
  2. интервал опроса ‹интервал фиксации: метод фиксации будет вызывать при каждом опросе, но смещение будет зафиксировано при последовательном опросе () после того, как интервал фиксации пройден.
person sun007    schedule 25.10.2019
comment
Я думаю, потоки Kafka подходят, когда у нас есть kafka как на входе, так и на выходе (конвейер чтения-обработки-записи). У меня на выходе нет Кафки. (Поправьте меня, если я ошибаюсь.) - person Mahesha999; 25.10.2019
comment
[... продолжение предыдущего комментария] Также официальный документ API говорит: Если результаты потребления хранятся в реляционной базе данных, сохранение смещения в базе данных также может позволить зафиксировать как результаты, так и смещение в одной транзакции. Таким образом, либо транзакция будет успешной, и смещение будет обновлено в зависимости от того, что было использовано, либо результат не будет сохранен, а смещение не будет обновлено. - person Mahesha999; 25.10.2019
comment
да, вы правы для этого требования да ... спасибо, что указали, я также обновил свой ответ - person sun007; 26.10.2019
comment
Я предполагаю, что если я сохраню enable.auto.commit до false, а затем не буду вызывать syncCommit() или asyncCommit() , то потребитель будет продолжать получать то же сообщение. Верно? Можете ли вы сказать в режиме автоматической фиксации, происходит ли фиксация между каждым последующим poll()? Или он зафиксируется только после установленного интервала? (Я так не чувствую, потому что poll() требуется последнее смещение) - person Mahesha999; 30.10.2019
comment
Да, потребитель будет продолжать получать сообщение, если enable.auto.commit false и не вызывает фиксацию, он использует метод поиска для получения следующего индекса. позвольте обновить ответ соответственно - person sun007; 30.10.2019