Индексация
Компоненты 
- Indexer сервис - в группе indexer добавлены по одному Consumer на сущность, через который идёт управление частичной индексацией. Индексация запускается с помощью REST API. Сервис позволяет осуществлять частичную индексацию:
- товаров, их остатков и категорий,
- цен и скидок на товары,
- медиа ассетов.
- Core сервисы выполняют CRUD операции над сущностями, от которых зависит поиск. Сервисы выпускают события обновления сущностей в брокере Kafka.
- Kafka брокер управляет событиями. Для централизации логики генерации событий используется библиотека
event-generator
. - Cервис-планировщик Scheduler используется для учёта периода валидности цен.
- Проиндексированные продукты используются в ElasticSearch для ускорения поиска.
Сценарий
Core сервис обрабатывает входящий запрос на CRUD (Create, Update, Delete) операцию по сущности. После успешного выполнения операции, сервис генерирует новое событие, используя библиотеку event-generator
, и публикует его в брокере сообщений Kafka. Если идёт создание / обновление / удаление сразу нескольких сущностей, отдельное событие генерируется на каждое изменение сущности.
Сервис Indexer обрабатывает событие изменения первоначальных данных - генерирует скрипт для обновления индекса, вызывает API для частичной переиндексации в ElasticSearch и сохраняет аудит по проведенной частичной индексации.
Публикация событий
Название топика: <название_сервиса>-change-<название_сущности>. Например, catalog-change-product.
Формат тела сообщения общий для всех событий и структурно состоит из 3 частей:
-
описание события - тип изменений и время. Описание строгое и для всех событий одинаковое.
-
описание сущности - ее тип и ID. Описание строгое и для всех событий одинаковое.
-
полезная нагрузка события. Описание в свободном формате, и задается в зависимости от события. При обновлении полезная нагрузка - это детали произошедшего изменения (какие свойства изменились), контекст события (например, товар, рынок, склад и т.д.). При создании полезная нагрузка - это данные сущности при создании.
{
"timestamp": 1721901863, // описание события
"type": "CREATE or UPDATE or DELETE or READ", // описание события
"entity": {
"type": "PRODUCT or PRICE or PROMO, etc.", // описание сущности
"id": "1" // описание сущности
},
"payload": {
// полезная нагрузка события, в свободном формате
}
}
Сервис Indexer позволяет настраивать реакции на события. В таких настройках задаются правила:
-
на какие события реагирует сервис,
-
какие изменения в каких сущностях и их свойствах отслеживает сервис,
-
правила, по которым будет произведена частичная индексация.
В случае совпадения данных события, сервис выполнит одну из 4 видов операции, согласно заданным в настройках правилам:
-
Создание нового документа происходит только при добавлении нового товара. При использовании операции POST {index}/_update/{id}, новый документ будет создан, если его еще не существует. Следовательно, создавать вручную документ нет необходимости, можно воспользоваться сценарием обновления документа.
-
Обновление документа. Соблюдены требования идемпотентности к операциям обновления. Для того, чтобы обновить документ, используется библиотека
elasticsearch-java
. Классco.elastic.clients.elasticsearch.core.UpdateRequest
позволяет сформировать запрос на обновление документа POST {index}/_update/{id}. -
Обновление документов вскользь. Соблюдены требования идемпотентности к операциям обновления. Во время сложных сценариев, например, при изменении имени категории или изменении изображения, вместо обновления документа генерируется скрипт, который содержит правила обновления и фильтр для поиска документов. Сгенерированный скрипт отсылается вместе с запросом POST <index>/_update_by_query. Такие изменения соответствуют команде обновления продуктов по заданным правилам, у которых ID ассета совпадает.
-
Удаление документа в индексе происходит только при удалении продукта. Вызывается DELETE /{index}/_doc/{id}. Обрабатывается такой сценарий отдельно.
Какие события следует мониторить и отображение их на правила по частичной индексации приведины в таблице ниже. Формат name <- created.name
говорит, что отслеживается свойство name
в созданной сущности, и значение записывается в свойство документа name
.
Событие |
Имя сущности |
Тип события |
Тип операции |
Мониторинг и правила по индексации |
|
|
|
1 |
условие: создание новых документов с ID по значениям
Закешировать запрос GET /api/catalog/v1/categories, кэш обновляется в консьюмерах событий |
|
2 |
условие: обновление документов для каждого варианта с ID по значениям
|
||
|
4 |
удаление документов для каждого варианта с ID по значениям
|
||
|
|
|
3 |
обновить
|
|
3 |
обновить
|
||
|
|
|
2 |
обновить документ с id по значению
Перед реализацией нужно добавить массив stocks. Подробнее тут |
|
2 |
обновить документ с id по значению
|
||
2 |
условие: обновить документ с id по значению
|
|||
|
2 |
обновить документ с id по значению
|
||
|
|
|
2 |
условие: обновить документ с id по значению
|
|
2 |
условие: обновить документы с id по значению
|
||
2 |
условие: обновить документы с id по значению
|
|||
|
2 |
обновить документ с id по значению
|
||
|
|
|
2 |
обновить
|
Планировщик
Для учета периода валидности цен используется сервис-планировщик Scheduler. В зависимости от временных границ (validFrom
и validTo
) создаются задачи на индексацию или удаление цены из индекса.
Отложенная индексация цены учтена:
- при полной индексации;
- при частичной индексации товаров (создание и удаление товара);
- при частичной индексации цен.
Механизм, для того чтобы отловить событие применения новой цены, работает следующим образом:
-
Сервис, на котором запущен “планировщик”, подписывается на события изменения сущности Price;
-
При обработке события, сервис дает команду планировщику запланировать задачу, удалить или обновить ее на время, заданное в событии;
-
Сама запланированная задача выпускает событие о том, что изменилась цена на товаре, передавая новую цену;
-
Сервис Indexer подписывается на событие, которое создал планировщик, и индексирует новую цену;
-
Цена со скидкой может также расчитываться через один дополнительный запрос к промо движку в запланированной задаче.
Условия создания задачи
- Если
active = false
то цена не индексируется и задачи не создаются. - Если
текущее время < validFrom
, создаётся задача на индексацию цены в момент времениvalidFrom
. - Если
текущее время < validTo
, создаётся задача на удаление цены из индекса в момент времениvalidTo
.
Процесс создания задачи
- В сервисе Scheduler создаётся задача по вызову HTTP-эндпоинта
POST <хост_сервиса_indexer>/api/indexer/v1/index/scheduled/price
на заданное время. В ответе возвращаетсяjobId
. - В сервисе Indexer в таблице
dc_scheduled_price_indexation
создаётся запись с полями:job_id
— идентификатор созданной задачи;price_id
— идентификатор цены;launch_timestamp
— время запуска задачи;action
— действие (INDEX
илиDELETE
);price
— данные о цене в формате JSON.
Выполнение задачи
- Когда наступает
launchTimestamp
, сервис Scheduler отправляет HTTP-запрос на эндпоинт<хост_сервиса_indexer>/api/indexer/v1/index/scheduled/price
сервиса Indexer, передаваяjobId
. - Сервис Indexer находит соответствующую запись в таблице
dc_scheduled_price_indexation
и выполняет действие (INDEX
илиDELETE
) в зависимости от значенияaction
. - После выполнения запись удаляется из таблицы.
Настройки планировщика
-
Включена кластеризация (с 1 узлом на текущем этапе);
-
Количество одновременных потоков выполнения полезной нагрузки планировщика — 5;
-
Размер очереди на выполнение задач - нефиксированный, персистентное хранение описаний задач, триггеров и очередей в БД;
-
Схема БД для мета-данных инициализируется при запуске сервиса через утилиту
Liquibase
; -
Необходимо вести историю (аудит) выполнения задач. Конфигурация повторных попыток в случае неудач не задана.
Сценарии по частичному обновлению индекса
Компонент Catalog
Создание продукта
Индексация для нового продукта нужна, только если:
- продукт создается со статусом Опубликован или Снят с продажи,
- сформировать новый документ
ProductDocumentDto
, - запустить процесс обновления одного документа.
{
"entity": {
"type": "product",
"id": id
}
"type": "create",
"payload": ProductDto
}
Обновление продукта
Реакция на событие перед индексацией:
-
при обновлении статуса:
-
если статус меняется на Опубликован или Снят с продажи, то произвести частичную индексацию,
-
учесть возможное удаление продукта, если товар снят с продажи,
-
в противном случае обновлять данные по товару в индексе не нужно,
-
-
если товар находится в статусе Опубликован или Снят с продажи:
-
сформировать частично заполненный документ
ProductDocumentDto
для обновления индекса, -
запустить процесс обновления одного документа,
-
-
иначе, если товар находится в одном из статусов Черновик или Готов к публикации, обновление не нужно.
{
"entity": {
"type": "product",
"id": id
}
"type": "update",
"payload": ProductDto delta
}
Удаление продукта
Реакция на событие перед индексацией:
- забрать ID вариантов по товару,
- сформировать запрос на удаление документов по ID вариантов товара,
- запустить процесс удаления нескольких документов.
{
"entity": {
"type": "product",
"id": id
}
"type": "delete",
"payload": ProductDto
}
Пакетное обновление продукта
Реакция на событие перед индексацией:
- для каждого обновленного товара выполнить действия по обновлению товара.
Для каждого товара в обновлении сформировать отдельное событие.
{
"entity": {
"type": "product",
"id": id
}
"type": "update",
"payload": ProductDto delta
}
Обновление категории
Реакция на событие перед индексацией:
- если обновляется наименование категории или уровень или значение родительской категории:
-
использовать генерацию скрипта по обновлению неопределенных документов,
-
необходимо учесть статус продукта / варианта аналогично сценарию обновления продукта,
-
сформировать скрипт по обновлению документов по ID категории,
-
скрипт обновит все документы, которые используют ID категории.
-
{
"entity": {
"type": "category",
"id": id
}
"type": "update",
"payload": CategoryDto delta
}
Удаление категории
Реакция на событие перед индексацией:
-
использовать генерацию скрипта по обновлению неопределенных документов,
-
необходимо учесть статус продукта / варианта аналогично сценарию обновления документа,
-
сформировать скрипт по обновлению документов по ID категории,
-
скрипт обновит все документы, которые используют ID категории,
{
"entity": {
"type": "category",
"id": id
}
"type": "delete",
"payload": CategoryDto
}
Обновление медиа ассета
Реакция на событие перед индексацией:
-
использовать генерацию скрипта по обновлению неопределенных документов,
-
необходимо учесть статус продукта / варианта аналогично сценарию обновления продукта,
- сформировать скрипт по обновлению документов по ID медиа ассета,
-
скрипт обновит все документы, которые используют ID медиа ассета.
{
"entity": {
"type": "media_asset",
"id": id
}
"type": "update",
"payload": MediaAssetDto delta
}
Удаление медиа ассета
Реакция на событие перед индексацией:
-
использовать генерацию скрипта по обновлению неопределенных документов,
-
необходимо учесть статус продукта / варианта аналогично сценарию обновления продукта,
- сформировать скрипт по обновлению документов по ID медиа ассета,
-
скрипт обновит все документы, которые используют ID медиа ассета.
{
"entity": {
"type": "media_asset",
"id": id
}
"type": "delete",
"payload": MediaAssetDto
}
Компонент Availability 
Создание остатка
Реакция на событие перед индексацией:
-
найти товары, к которым применяется остаток,
-
по каждому товару найти остатки по локациям (если остатки по локациям не больше создаваемого, то индексация не нужна),
- обновить каждый документ в индексе.
Особенности реализации:
- Остатки по товарам не подвязаны на локации. Данных о количестве товара в стоке нет. Хранится только одно общее свойство
stockStatus
со значениямиIN_STOCK
,OUT_OF_STOCK
.
{
"entity": {
"type": "stock",
"id": id
}
"type": "create",
"payload": AvailabilityDto
}
Обновление остатка
Реакция на событие перед индексацией:
- обновление сущности остатка влечет за собой перерасчет остатка, как и в случае с созданием остатка.
{
"entity": {
"type": "stock",
"id": id
}
"type": "update",
"payload": AvailabilityDto delta
}
Удаление остатка
Реакция на событие перед индексацией:
- обновление сущности остатка влечет за собой перерасчет остатка, как и в случае с созданием остатка.
{
"entity": {
"type": "stock",
"id": id
}
"type": "delete",
"payload": AvailabilityDto
}
Компонент Price 
Создание цены
Реакция на событие перед индексацией:
-
найти товары, к которым применяется цена,
-
по каждому такому товару найти цену, расчитать
unitPrice
по валютам, и вычислить значенияdiscountUnitPrice
для каждой валюты аналогично сценарию добавления скидки, -
обновить каждый документ в индексе.
{
"entity": {
"type": "price",
"id": id
}
"type": "create",
"payload": PriceDto
}
Обновление цены
Реакция на событие перед индексацией:
-
использовать генерацию скрипта по обновлению неопределенных документов,
-
вычислить значения
discountUnitPrice
аналогично сценарию с добавлением скидки, -
скрипт обновит все документы, которые используют ассет по ID цены.
{
"entity": {
"type": "price",
"id": id
}
"type": "update",
"payload": PriceDto delta
}
Удаление цены
Реакция на событие перед индексацией:
-
использовать генерацию скрипта по обновлению неопределенных документов,
-
вычислить значения
discountUnitPrice
аналогично сценарию с добавлением скидки, -
скрипт обновит все документы, которые используют ассет по ID цены,
-
удаление сущности цены влечет за собой полный перерасчет, как в случае с созданием скидки.
{
"entity": {
"type": "price",
"id": id
}
"type": "delete",
"payload": PriceDto
}
Компонент Promo 
Создание скидки
Реакция на событие перед индексацией:
-
найти товары, к которым применяется скидка,
-
по каждому товару забрать цену,
unitPrice
по валютам, и вычислить значенияdiscountUnitPrice
для каждой валюты, -
обновить каждый документ в индексе
Особенности реализации:
-
расчет
discountUnitPrice
происходит с использованием BRMS Drools, -
расчет зависит от:
-
способа предоставления скидки: только процент,
-
области применения: все товары или конкретный товар,
-
требования к покупке: только нет требований,
-
метода применения: только автоматическая скидка,
-
целевой аудитории: только все клиенты,
-
способа комбинирования скидок: не суммировать, суммировать с другими,
-
периода действия.
-
{
"entity": {
"type": "promo",
"id": id
}
"type": "create",
"payload": PromoDto
}
Обновление скидки
Реакция на событие перед индексацией:
- обновление сущности скидки влечет за собой полный перерасчет, как в случае с созданием скидки
{
"entity": {
"type": "promo",
"id": id
}
"type": "update",
"payload": PromoDto
}
Удаление скидки
Реакция на событие перед индексацией:
-
учесть, что удаляемая сущность скидки может быть не единственная настроенная в системе,
-
удаление сущности скидки влечет за собой полный перерасчет, как в случае с созданием скидки.
{
"entity": {
"type": "promo",
"id": id
}
"type": "delete",
"payload": PromoDto
}