Описание работы Kafka
Общее описание
Kafka - это такой сетевой "лог-файл" в который можно писать сообщения и из которого их можно читать. Kafka часто используется как интеграционная шина, поэтому её можно сравшивать с RabbitMQ, и в первую очередь сравнить их нужно чтобы подчеркнуть различия.
RabbitMQ - это броккер сообщений. Он занимается передачей сообщений. В него можно послать сообщение, и оно, вероятно, будет доставлено адресату. После доставки сообщение удаляется из системы. Если при обработке сообщения возникла ошибка, сообщение может быть заново помещено в очередь.
В свою очередь кафка - это хранилище упорядоченных сообщений. У них нет адресата. Они не удаляются из системы если их кто-то прочитал. Подразумевается, что одно и то же сообщение может быть прочитано разными потребителями (consumer) с разными целями, и факт прочтения сообщения одним потребителем никак не влияет на других. Чтобы отслеживать какие сообщения уже были прочитаны потребителем, вводится понятие "смещение" (offset) - номер сообщения в последовательности.
Kafka хранит у себя текущее смещение для каждого известного потребителя, поэтому потребителю не нужно хранить его у себя, достаточно сделать запрос вида "дай следующее сообщение", а после "увеличь смещение на единицу".
Конкретное описание
Сообщения пишутся в топик (topic). Топик - это последовательность сообщений. У топика нет схемы, в него могут писать и из него могут читать кто угодно и что угодно. Процесс, который пишет называется продюсер (producer). Процесс который читает - консюмер (consumer). Внутри топик состоит из партиций (partition). Партиция это уже реальный файл на диске. Если вы хотите распределить нагрузку по нескольким серверам, кафки, то вам нужно настроить топик так, чтобы у него было больше одной партиции.
Партиции ещё интересны тем, что они задают максимальное количество параллельных потоков конкурентного чтения. Не параллельного, а именно конкурентного. Вы можете иметь сколько угодно консюмеров читающих из одного топика. Все они получат каждое сообщение. Но если вам нужно сделать так чтобы сообщение было гарантировано прочитано только одним консюмером из определённой группы, вам нужно создать столько партиций сколько параллельных потоков вы хотите сделать.
Чтобы настроить несколько консюмеров на конкурентное чтение сообщений, нужно задать им одинаковый ID консюмер-группы (consumer group). Если у консюмера задана консюмер-группа, то кафка начинает сохранять смещения не для конкретного консюмера, а для группы. Таким образом, когда первый консюмер прочитает сообщение, он увеличит смещение, и второй консюмер иэ той же группы получит уже следующее сообщение, а не то же самое.
Настройка сервиса для работы с кафкой
Мы используем расширения php для работы с кафкой phprdkafka. Оно уже установлено в базовом docker образе и ничего дополнительно делать не нужно. Для удобства есть три пакета для ларавеля, которые скрывают низкоуровневые операции, предоставляя более простой интерфейс для чтения и записи сообщений в топики.
ensi/laravel-phprdkafka
Этот пакет добавляет в систему менеджер подключений к кафке, по аналогии с менеджером подключений к БД.
Вы можете описать в config/kafka.php
параметры подключения, отдельно для консюмеров, отдельно для продюсеров, можно задать несколько подключений с разными именами.
И в дальнейшем можно получать уже сконфигурированные объекты RdKafka\Producer
и RdKafka\KafkaConsumer
просто вызывая соотвествующие методы у сервиса KafkaManager, получив его
из сервис-контейнера или же обращаясь к нему через фасад Kafka
.
И консюмеры и продюсеры могут иметь немного разные настройки подключения, поэтому, чтобы не дублировать те настройки, которые у всех клиентов одинаковые,
добавлена сущность "соединение" (connection).
В соединении описываются такие настройки как: адрес кафки, параметры авторизации и т.д.
В консюмере/продюсере мы указываем какое соединение будет использоваться и добавляем специфичные для текущего клиента параметры
Пример конфигурации:
# config/kafka.php
return [
'connections' => [
'default' => [
'settings' => [
'metadata.broker.list' => env('KAFKA_BROKER_LIST'),
'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'),
'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'),
'sasl.username' => env('KAFKA_SASL_USERNAME'),
'sasl.password' => env('KAFKA_SASL_PASSWORD'),
'log_level' => env('KAFKA_DEBUG', false) ? (string)LOG_DEBUG : (string)LOG_INFO,
'debug' => env('KAFKA_DEBUG', false) ? 'all' : null,
],
'topics' => [
'foobars' => $contour . '.domain.fact.foobars.1'
]
]
],
'consumers' => [
'default' => [
'connection' => 'default',
'additional-settings' => [
'group.id' => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')),
'enable.auto.commit' => true,
'auto.offset.reset' => 'beginning',
],
],
],
'producers' => [
'default' => [
'connection' => 'default',
'additional-settings' => [
'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'),
],
],
],
];
Настраивая подключение вы можете использовать любые параметры rdkafka. В пакете уже установлены некоторые параметры по умолчанию:
group.id
- идентификатор консюмер-группы. Он задаётся всегда, не важно планируете вы использовать один инстанс консюмера или несколько. Когда вам захочется отмасштабировать консюмеры, они уже будут в одной группе, останется только следить за тем чтобы хватало партиций.enable.auto.commit
- автоматически обновляет смещение через некоторое время после получения сообщенияauto.offset.reset
- задаёт поведение в случае, когда для текущей консюмер-группы в кафке не сохранено смещение в каком-то топике.beginning
означает что будут прочитаны все сообщения самого начала.
Помимо прочего, в соединении указывается список всех топиков, которые будут использоваться приложением.
'topics' => [
'foobars' => $contour . '.domain.fact.foobars.1'
]
В дальнейшем, в коде нужно будет ссылаться на кокретный топик используя ключ из этого списка, а не реальное название.
ensi/laravel-phprdkafka-consumer
Этот пакет добавляет в систему artisan команду kafka:consume topic
и возможность настроить обработчик сообщения.
Пример настройки:
# config/kafka-consumer.php
return [
'global_middleware' => [ TraceEventKafkaMiddleware::class ],
'stop_signals' => [SIGTERM, SIGINT],
'processors' => [
[
'topic' => 'foobars', // ключ из kafka.connections.<name>.topics
'consumer' => 'default',
'type' => 'action',
'class' => \App\Domain\Kafka\Actions\Listen\ListenOfferAction::class,
'queue' => false,
'consume_timeout' => 5000,
],
]
];
Самое главное здесь - это то что вы указываете каким классом будут обрабатываться сообщения из конкретного топика, а так же
какой консюмер будет использоваться для получения сообщений.
Если вам для какого-то топика нужно задать другие настройки консюмера, то нужно создать другое подключение в config/kafka.php
и сослаться на него.
Класс обработчик - это просто класс с методом execute(RdKafka\Message $message)
, внутри которого вы описываете логику обработки сообщения.
class ListenOfferAction
{
public function execute(Message $message)
{
// ...
}
}
Кроме того вы можете задавать middleware, которые очень похожи на http middleware:
class TraceEventKafkaMiddleware {
public function handle(Message $message, Closure $next): mixed
{
// ...
return $next($message);
}
}
ensi/laravel-phprdkafka-producer
Этот пакет предоставляет обёртку над RdKafka\Producer
, которая выполняет рутинные задачи.
$producer = new HighLevelProducer("my-topic", "my-producer");
$producer->sendOne($payload);
Под капотом HighLevelProducer получает объект продюсера, настройки которого описаны в config/kafka.php
,
отправляет сообщение в топик, выполняет flush чтобы сообщение гарантировано ушло из буфера в кафку.
Настройка топиков
У топиков есть множество параметров, которые нужно задать при их создании, вот самые важные из них:
- количество партиций - влияет на максимальный размер консюмер-группы
- количество реплик - влияет на надёжность хранения данных
- количество данных в топике, которые нужно хранить, а лишние (самые старые) удалять
- время по истечении которого нужно удалять сообщения из топика
Эти параметры задаются не в консюмере или проюсере, а отдельно, при создании топика. Топик создаётся в ci/cd пайплайне при отгрузке сервиса, в котором используется этот топик, не важно является ли сервис источником данных или потребителем.
Пайплайн работает следующим образом:
- запускает команду
php artisan kafka:find-not-created-topics
, которая ищет в кафке топики, котрые перечислены в конфигеconfig/kafka.php
- если каких-то топиков нет, то формируется список отсутствующих топиков
- читается файл с настройками топиков из репозитория с конфигами развёртывания
- создаются топики
Вот пример файла настроек топиков:
topics:
- name: prod.all.fct.my-topic.0
partitions: 2
replicas: 1
config:
- name: retention.ms
value: 60480000 # 7 days
- name: retention.bytes
value: 1073741824 # 1 Gb
Если необходимый сервису топик не описан в таком файле, то пайплайн завершается с ошибкой.
Список топиков для локальной разработки
Тот же механизм описания топиков используется и для того чтобы создать все необходимые топики в локальной кафке. В воркспейсе есть аналогичный файл с топиками и скрипт, который их создаёт.