后端:增加商品修改时,发送 MQ 消息

后端:增加搜索服务,监听 MQ 消息,建立商品索引
This commit is contained in:
YunaiV
2019-04-25 20:12:01 +08:00
parent cddffabeba
commit f529985c40
41 changed files with 642 additions and 251 deletions

View File

@@ -3,7 +3,7 @@ package cn.iocoder.mall.search.biz.convert;
import cn.iocoder.mall.order.api.bo.CalcSkuPriceBO;
import cn.iocoder.mall.product.api.bo.ProductSpuDetailBO;
import cn.iocoder.mall.promotion.api.bo.PromotionActivityBO;
import cn.iocoder.mall.search.api.bo.ESProductBO;
import cn.iocoder.mall.search.api.bo.ProductBO;
import cn.iocoder.mall.search.biz.dataobject.ESProductDO;
import org.mapstruct.Mapper;
import org.mapstruct.Mappings;
@@ -34,6 +34,6 @@ public interface ProductSearchConvert {
return product;
}
List<ESProductBO> convert(List<ESProductDO> list);
List<ProductBO> convert(List<ESProductDO> list);
}

View File

@@ -54,11 +54,13 @@ public interface ProductRepository extends ElasticsearchRepository<ESProductDO,
nativeSearchQueryBuilder.withQuery(QueryBuilders.matchAllQuery());
}
// 排序
if (CollectionUtil.isEmpty(sortFields)) {
nativeSearchQueryBuilder.withSort(SortBuilders.scoreSort().order(SortOrder.DESC));
} else {
if (!CollectionUtil.isEmpty(sortFields)) {
sortFields.forEach(sortField -> nativeSearchQueryBuilder.withSort(SortBuilders.fieldSort(sortField.getField())
.order(SortOrder.fromString(sortField.getOrder()))));
} else if (StringUtil.hasText(keyword)) {
nativeSearchQueryBuilder.withSort(SortBuilders.scoreSort().order(SortOrder.DESC));
} else {
nativeSearchQueryBuilder.withSort(SortBuilders.fieldSort("sort").order(SortOrder.DESC));
}
// 执行查询
return search(nativeSearchQueryBuilder.build());

View File

@@ -0,0 +1,28 @@
package cn.iocoder.mall.search.biz.mq;
import cn.iocoder.common.framework.vo.CommonResult;
import cn.iocoder.mall.product.api.message.ProductUpdateMessage;
import cn.iocoder.mall.search.api.ProductSearchService;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
@Service
@RocketMQMessageListener(
topic = ProductUpdateMessage.TOPIC,
consumerGroup = "search-consumer-group-" + ProductUpdateMessage.TOPIC
)
public class PayTransactionPaySuccessConsumer implements RocketMQListener<ProductUpdateMessage> {
@Autowired
private ProductSearchService productSearchService;
@Override
public void onMessage(ProductUpdateMessage message) {
CommonResult<Boolean> result = productSearchService.save(message.getId());
Assert.isTrue(result.isSuccess(), String.format("重构商品 ES 索引,必然成功。实际结果是 %s", result));
}
}

View File

@@ -1,26 +1,39 @@
package cn.iocoder.mall.search.biz.service;
import cn.iocoder.common.framework.util.CollectionUtil;
import cn.iocoder.common.framework.util.StringUtil;
import cn.iocoder.common.framework.vo.CommonResult;
import cn.iocoder.common.framework.vo.SortingField;
import cn.iocoder.mall.order.api.CartService;
import cn.iocoder.mall.order.api.bo.CalcSkuPriceBO;
import cn.iocoder.mall.product.api.ProductCategoryService;
import cn.iocoder.mall.product.api.ProductSpuService;
import cn.iocoder.mall.product.api.bo.ProductCategoryBO;
import cn.iocoder.mall.product.api.bo.ProductSpuDetailBO;
import cn.iocoder.mall.search.api.ProductSearchService;
import cn.iocoder.mall.search.api.bo.ESProductPageBO;
import cn.iocoder.mall.search.api.bo.ProductConditionBO;
import cn.iocoder.mall.search.api.bo.ProductPageBO;
import cn.iocoder.mall.search.api.dto.ProductConditionDTO;
import cn.iocoder.mall.search.api.dto.ProductSearchPageDTO;
import cn.iocoder.mall.search.biz.convert.ProductSearchConvert;
import cn.iocoder.mall.search.biz.dao.ProductRepository;
import cn.iocoder.mall.search.biz.dataobject.ESProductDO;
import com.alibaba.dubbo.config.annotation.Reference;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
@@ -31,10 +44,14 @@ public class ProductSearchServiceImpl implements ProductSearchService {
@Autowired
private ProductRepository productRepository;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate; // 因为需要使用到聚合操作,只好引入 ElasticsearchTemplate 。
@Reference(validation = "true")
private ProductSpuService productSpuService;
@Reference(validation = "true")
private ProductCategoryService productCategoryService;
@Reference(validation = "true")
private CartService cartService;
@Override
@@ -57,9 +74,22 @@ public class ProductSearchServiceImpl implements ProductSearchService {
lastId = spus.get(spus.size() - 1).getId();
}
}
// 返回成功
return CommonResult.success(rebuildCounts);
}
@Override
public CommonResult<Boolean> save(Integer id) {
// 获得商品性情
CommonResult<ProductSpuDetailBO> result = productSpuService.getProductSpuDetail(id);
Assert.isTrue(result.isSuccess(), "获得商品详情必然成功");
// 存储到 ES 中
ESProductDO product = convert(result.getData());
productRepository.save(product);
// 返回成功
return CommonResult.success(Boolean.TRUE);
}
@SuppressWarnings("OptionalGetWithoutIsPresent")
private ESProductDO convert(ProductSpuDetailBO spu) {
// 获得最小价格的 SKU ,用于下面的价格计算
@@ -72,13 +102,13 @@ public class ProductSearchServiceImpl implements ProductSearchService {
}
@Override
public CommonResult<ESProductPageBO> searchPage(ProductSearchPageDTO searchPageDTO) {
public CommonResult<ProductPageBO> getSearchPage(ProductSearchPageDTO searchPageDTO) {
checkSortFieldInvalid(searchPageDTO.getSorts());
// 执行查询
Page<ESProductDO> searchPage = productRepository.search(searchPageDTO.getCid(), searchPageDTO.getKeyword(),
searchPageDTO.getPageNo(), searchPageDTO.getPageSize(), searchPageDTO.getSorts());
// 转换结果
ESProductPageBO resultPage = new ESProductPageBO()
ProductPageBO resultPage = new ProductPageBO()
.setList(ProductSearchConvert.INSTANCE.convert(searchPage.getContent()))
.setTotal((int) searchPage.getTotalElements());
return CommonResult.success(resultPage);
@@ -92,4 +122,46 @@ public class ProductSearchServiceImpl implements ProductSearchService {
String.format("排序字段(%s) 不在允许范围内", sortingField.getField())));
}
@Override
public CommonResult<ProductConditionBO> getSearchCondition(ProductConditionDTO conditionDTO) {
// 创建 ES 搜索条件
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
// 筛选
if (StringUtil.hasText(conditionDTO.getKeyword())) { // 如果有 keyword ,就去匹配
nativeSearchQueryBuilder.withQuery(QueryBuilders.multiMatchQuery(conditionDTO.getKeyword(),
"name", "sellPoint", "categoryName"));
} else {
nativeSearchQueryBuilder.withQuery(QueryBuilders.matchAllQuery());
}
// 聚合
if (conditionDTO.getFields().contains(ProductConditionDTO.FIELD_CATEGORY)) { // 商品分类
nativeSearchQueryBuilder.addAggregation(AggregationBuilders.terms("cids").field("cid"));
}
// 执行查询
ProductConditionBO condition = elasticsearchTemplate.query(nativeSearchQueryBuilder.build(), response -> {
ProductConditionBO result = new ProductConditionBO();
// categoryIds 聚合
Aggregation categoryIdsAggregation = response.getAggregations().get("cids");
if (categoryIdsAggregation != null) {
result.setCategories(new ArrayList<>());
for (LongTerms.Bucket bucket : (((LongTerms) categoryIdsAggregation).getBuckets())) {
result.getCategories().add(new ProductConditionBO.Category().setId(bucket.getKeyAsNumber().intValue()));
}
}
// 返回结果
return result;
});
// 聚合其它数据源
if (!CollectionUtil.isEmpty(condition.getCategories())) {
// 查询指定的 ProductCategoryBO 数组,并转换成 ProductCategoryBO Map
Map<Integer, ProductCategoryBO> categoryMap = productCategoryService.getListByIds(
condition.getCategories().stream().map(ProductConditionBO.Category::getId).collect(Collectors.toList()))
.stream().collect(Collectors.toMap(ProductCategoryBO::getId, category -> category));
// 设置分类名
condition.getCategories().forEach(category -> category.setName(categoryMap.get(category.getId()).getName()));
}
// 返回结果
return CommonResult.success(condition);
}
}

View File

@@ -18,3 +18,9 @@ dubbo:
name: dubbo
scan:
base-packages: cn.iocoder.mall.search.biz.service
# rocketmq
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: search-producer-group