Перейти к основному содержимому

Подход к работе с Elasticsearch

Общее описание

Для хранения агрегированных данных по каталогу мы используем Elasticsearch.

Во-первых, нам требуется агрегация данных, т.к. невозможно вывести каталог с сортировкой, фильтрацией и пагинацией из БД, т.к. данные распределены по нескольким сервисам. Во-вторых, Elasticsearch из коробки даёт много полезных фич, одна из которых - полнотекстовый морфологический поиск, что важно для поиска товаров.

Для просмотра данных (при разработке или отладке) используется Kibana

Catalog cache

Catalog-cache - сервис, который занимается:

  • сбором данных из мастер-сервисов с данными (из pim/offers/cms и т.д.)
  • обновлением информации в индексе
  • предоставлением эндпоинтов для поиска данных по индексам

Перед тем как разбираться детально с работой индексатора, рассмотрим несколько базовых вещей:

Для работы с Elasticsearch мы используем пакет ensi/laravel-elastic-query. Он предполагает, что для каждого индекса будет создан класс {Name}Index extends \Ensi\LaravelElasticQuery\ElasticIndex, через который и будет вестись дальнейшая работа с индексом. В catalog-cache есть свой базовый класс для наследования - ElasticIndex - который вводит понятие хеша индекса (md5 от структуры индекса) и добавляет несколько полезных базовых функций.

Для чего нужен хеш индекса? Периодически возникает необходимость вносить изменения в структуру индекса. И это означает, что требуется создать новый индекс (с новой структурой) и заполнить его с нуля данными. При этом старый индекс должен быть в рабочем состоянии, пока новый индекс не наполнится данными, иначе на витрине будет пустой каталог. Чтобы эта схема работала и нужен хеш индекса: мы отдельно отгружаем 1) web-сервис, который только читает 2) индексатор, который записывает данные. Каждый из этих компонентов знает свой хеш индекса и работает с нужной версией.

Дополнительно catalog-cache рассчитан на то, что в будущем могут появляться другие типы индексов, поэтому верхнеуровневые команды не работают напрямую с индексом, они работают с конфигом config/elastic.php, куда можно добавлять новые индексы.

Всё это приводит нас к довольно усложнённой схеме работы сервиса. Рассмотрим её более подробно и с примерами кода:

  1. Мастер-сервис с данными (например pim) для каждой сущности (данные из которой нужны в индексе) при каждом обновлении (создании/изменении/удалении) отправляет в kafka данные. Обычно это реализуется через Observer с флагом public $afterCommit = true;, чтобы не отправлять данные раньше, чем завершится транзакция. Пример реализации
  2. Catalog-cache слушает топики в kafka и забирает все изменения в локальную БД. Пример реализации
  3. В БД множество таблиц, данные из которых потом будут собраны в одну запись в индексе. Но можно выделить одну "ключевую" таблицу (например offers, для индекса по офферам), которая олицетворяет саму сущность, а все остальные - лишь дообагащают эту сущность. Индексатор опирается именно на эту "ключевую" таблицу, а точнее на поле updated_at, чтобы понять пора ли обновлять для этой записи индекс. Таким образом каждая вспомогательная сущность, при своём обновлении, должна обновлять поле updated_at у ключевой сущности. Пример реализации: Action, Observer
  4. И вот мы имеем БД, в которой продублированы данные из мастер-сервисов, и теперь нам необходимо эти данные записать в индекс. Это происходит в несколько шагов. Первый из них - запуск раз в минуту консольной команды, которая добавляет в очередь на индексацию все записи, изменившиеся с последнего сканирования. Рассмотрим этот процесс детальнее:
    • Сервис поддерживает работу с несколькими индексами, они настраиваются в config/elastic.php
    • Для каждого индекса в конфиге указан action, который занимается добавлением изменённых записей в очередь на индексацию.
    • Примерный алгоритм такого action следующий: Получаем из таблицы indexer_timestamps время последнего сканирования изменений. Ищем обновлённые записи. Запускаем эти записи в Job на индексацию. Обновляем время запуска в indexer_timestamps. Пример реализации
  5. Второй шаг - сама индексация. Воркер получает отправленный Job и выполняет его. Сам Job изначально имеет массив идентификаторов записей, которые необходимо проиндексировать. Происходит загрузка данных из БД, формирование json под структуру индекса и отправка пакета данных в эластик (одним запросом). Пример реализации
  6. Отдельно стоит упомянуть кейс с удалением данных. Если удаляются вспомогательные сущности, то происходит просто обновление updated_at у ключевой сущности. Если же удаляется ключевая сущность, то сразу происходит и удаление записи из индекса. Пример реализации

Ещё несколько важных моментов в работе сервиса:

  1. Дополнительная проблема при работе с индексами возникает на тестовых площадках, где отгружено много версий catalog-cache, каждый из которых хочет иметь свой индексатор и свой индекс. Но при этом на наших площадках общая БД на все отгруженные ветки. Именно чтобы не возникал конфликт индексаторов система работает через updated_at, а каждый индексатор сам знает когда он последний раз запускался, проверяет какие записи обновились с того момента, и отправляет их на индексацию в свой индекс. Для случаев, когда структура индекса не изменилась, но логика индексатора поменялась и её нужно проверить - можно заполнить в env APP_STAGE=QKEY-123 (это значение используется при формировании имени индекса)
  2. Тестирование сервиса подразумевает и тестирование взаимодействия с эластиком. Здесь реализован такой же подход, как и при тестах БД - создаётся тестовый индекс, туда добавляются записи, отправляются реальные запросы в эластик.
    • Тут есть проблема: из-за того что эластик не сразу заносит изменения в сам индекс, можно получить кейс, когда вы создали/изменили/удалили запись в индексе, но запрос на поиск не отдаёт эти изменения. Чтобы избежать из-за этого проблем, необходимо принудительно говорить эластику накатить все изменения на индекс с помощью refresh запроса. Это уже реализовано в базовой фабрике \Tests\Factories\BaseElasticFactory
  3. При добавлении в сервис новых вспомогательных сущностей, требуется первично заполнить БД данными из мастер-сервисов. Для этого реализуется отдельная консольная команда, которая обращается в api этих мастер-сервисов и собирает из них данные, обновляя свою БД. Пример реализации