Подход к работе с Elasticsearch
Общее описание
Для хранения агрегированных данных по каталогу мы используем Elasticsearch.
Во-первых, нам требуется агрегация данных, т.к. невозможно вывести каталог с сортировкой, фильтрацией и пагинацией из БД, т.к. данные распределены по нескольким сервисам. Во-вторых, Elasticsearch из коробки даёт много полезных фич, одна из которых - полнотекстовый морфологический поиск, что важно для поиска товаров.
Для просмотра данных (при разработке или отладке) используется Kibana
Catalog cache
Catalog-cache - сервис, который занимается:
- сбором данных из мастер-сервисов с данными (из pim/offers/cms и т.д.)
- обновлением информации в индексе
- предоставлением эндпоинтов для поиска данных по индексам
Перед тем как разбираться детально с работой индексатора, рассмотрим несколько базовых вещей:
Для работы с Elasticsearch мы используем пакет ensi/laravel-elastic-query. Он предполагает, что для каждого индекса будет создан класс {Name}Index extends \Ensi\LaravelElasticQuery\ElasticIndex
, через который и будет вестись дальнейшая работа с индексом. В catalog-cache есть свой базовый класс для наследования - ElasticIndex - который вводит понятие хеша индекса (md5 от структуры индекса) и добавляет несколько полезных базовых функций.
Для чего нужен хеш индекса? Периодически возникает необходимость вносить изменения в структуру индекса. И это означает, что требуется создать новый индекс (с новой структурой) и заполнить его с нуля данными. При этом старый индекс должен быть в рабочем состоянии, пока новый индекс не наполнится данными, иначе на витрине будет пустой каталог. Чтобы эта схема работала и нужен хеш индекса: мы отдельно отгружаем 1) web-сервис, который только читает 2) индексатор, который записывает данные. Каждый из этих компонентов знает свой хеш индекса и работает с нужной версией.
Дополнительно catalog-cache рассчитан на то, что в будущем могут появляться другие типы индексов, поэтому верхнеуровневые команды не работают напрямую с индексом, они работают с конфигом config/elastic.php, куда можно добавлять новые индексы.
Всё это приводит нас к довольно усложнённой схеме работы сервиса. Рассмотрим её более подробно и с примерами кода:
- Мастер-сервис с данными (например pim) для каждой сущности (данные из которой нужны в индексе) при каждом обновлении (создании/изменении/удалении) отправляет в kafka данные. Обычно это реализуется через Observer с флагом
public $afterCommit = true;
, чтобы не отправлять данные раньше, чем завершится транзакция. Пример реализации - Catalog-cache слушает топики в kafka и забирает все изменения в локальную БД. Пример реализации
- В БД множество таблиц, данные из которых потом будут собраны в одну запись в индексе. Но можно выделить одну "ключевую" таблицу (например offers, для индекса по офферам), которая олицетворяет саму сущность, а все остальные - лишь дообагащают эту сущность. Индексатор опирается именно на эту "ключевую" таблицу, а точнее на поле
updated_at
, чтобы понять пора ли обновлять для этой записи индекс. Таким образом каждая вспомогательная сущность, при своём обновлении, должна обновлять полеupdated_at
у ключевой сущности. Пример реализации: Action, Observer - И вот мы имеем БД, в которой продублированы данные из мастер-сервисов, и теперь нам необходимо эти данные записать в индекс. Это происходит в несколько шагов. Первый из них - запуск раз в минуту консольной команды, которая добавляет в очередь на индексацию все записи, изменившиеся с последнего сканирования. Рассмотрим этот процесс детальнее:
- Сервис поддерживает работу с несколькими индексами, они настраиваются в
config/elastic.php
- Для каждого индекса в конфиге указан action, который занимается добавлением изменённых записей в очередь на индексацию.
- Примерный алгоритм такого action следующий: Получаем из таблицы
indexer_timestamps
время последнего сканирования изменений. Ищем обновлённые записи. Запускаем эти записи в Job на индексацию. Обновляем время запуска вindexer_timestamps
. Пример реализации
- Сервис поддерживает работу с несколькими индексами, они настраиваются в
- Второй шаг - сама индексация. Воркер получает отправленный Job и выполняет его. Сам Job изначально имеет массив идентификаторов записей, которые необходимо проиндексировать. Происходит загрузка данных из БД, формирование json под структуру индекса и отправка пакета данных в эластик (одним запросом). Пример реализации
- Отдельно стоит упомянуть кейс с удалением данных. Если удаляются вспомогательные сущности, то происходит просто обновление
updated_at
у ключевой сущности. Если же удаляется ключевая сущность, то сразу происходит и удаление записи из индекса. Пример реализации
Ещё несколько важных моментов в работе сервиса:
- Дополнительная проблема при работе с индексами возникает на тестовых площадках, где отгружено много версий catalog-cache, каждый из которых хочет иметь свой индексатор и свой индекс. Но при этом на наших площадках общая БД на все отгруженные ветки. Именно чтобы не возникал конфликт индексаторов система работает через
updated_at
, а каждый индексатор сам знает когда он последний раз запускался, проверяет какие записи обновились с того момента, и отправляет их на индексацию в свой индекс. Для случаев, когда структура индекса не изменилась, но логика индексатора поменялась и её нужно проверить - можно заполнить в envAPP_STAGE=QKEY-123
(это значение используется при формировании имени индекса) - Тестирование сервиса подразумевает и тестирование взаимодействия с эластиком. Здесь реализован такой же подход, как и при тестах БД - создаётся тестовый индекс, туда добавляются записи, отправляются реальные запросы в эластик.
- Тут есть проблема: из-за того что эластик не сразу заносит изменения в сам индекс, можно получить кейс, когда вы создали/изменили/удалили запись в индексе, но запрос на поиск не отдаёт эти изменения. Чтобы избежать из-за этого проблем, необходимо принудительно говорить эластику накатить все изменения на индекс с помощью refresh запроса. Это уже реализовано в базовой фабрике
\Tests\Factories\BaseElasticFactory
- Тут есть проблема: из-за того что эластик не сразу заносит изменения в сам индекс, можно получить кейс, когда вы создали/изменили/удалили запись в индексе, но запрос на поиск не отдаёт эти изменения. Чтобы избежать из-за этого проблем, необходимо принудительно говорить эластику накатить все изменения на индекс с помощью refresh запроса. Это уже реализовано в базовой фабрике
- При добавлении в сервис новых вспомогательных сущностей, требуется первично заполнить БД данными из мастер-сервисов. Для этого реализуется отдельная консольная команда, которая обращается в api этих мастер-сервисов и собирает из них данные, обновляя свою БД. Пример реализации