将支付成功、退款成功的 MQ 消费逻辑进行迁移
This commit is contained in:
@@ -0,0 +1,30 @@
|
||||
package cn.iocoder.mall.payservice.enums.refund;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* 支付退款状态枚举
|
||||
*/
|
||||
@Getter
|
||||
public enum PayRefundStatus {
|
||||
|
||||
WAITING(1, "处理中"),
|
||||
SUCCESS(2, "成功"),
|
||||
FAILURE(3, "失败"), // 例如说,支付单超时
|
||||
;
|
||||
|
||||
/**
|
||||
* 状态
|
||||
*/
|
||||
private final Integer value;
|
||||
/**
|
||||
* 名字
|
||||
*/
|
||||
private final String name;
|
||||
|
||||
PayRefundStatus(Integer value, String name) {
|
||||
this.value = value;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
package cn.iocoder.mall.payservice.common.dubbo;
|
||||
|
||||
import cn.iocoder.common.framework.util.StringUtils;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import lombok.Data;
|
||||
import org.apache.dubbo.config.ApplicationConfig;
|
||||
import org.apache.dubbo.config.ReferenceConfig;
|
||||
import org.apache.dubbo.config.RegistryConfig;
|
||||
import org.apache.dubbo.rpc.service.GenericService;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Component
|
||||
public class DubboReferencePool {
|
||||
|
||||
@Data
|
||||
public class ReferenceMeta {
|
||||
|
||||
private final ReferenceConfig config; // TODO 芋艿,后续需要做销毁
|
||||
private final GenericService service;
|
||||
private final String methodName;
|
||||
|
||||
private ReferenceMeta(ReferenceConfig config, GenericService service, String methodName) {
|
||||
this.config = config;
|
||||
this.service = service;
|
||||
this.methodName = methodName;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private LoadingCache<String, ReferenceMeta> referenceMetaCache = CacheBuilder.newBuilder()
|
||||
.build(new CacheLoader<String, ReferenceMeta>() {
|
||||
@Override
|
||||
public ReferenceMeta load(String notifyUrl) {
|
||||
return createGenericService(notifyUrl);
|
||||
}
|
||||
});
|
||||
|
||||
@Value("${dubbo.registry.address}")
|
||||
private String dubboRegistryAddress;
|
||||
@Value("${dubbo.application.name}")
|
||||
private String dubboApplicationName;
|
||||
|
||||
private ReferenceMeta createGenericService(String notifyUrl) {
|
||||
// 使用 # 号分隔,格式为 服务名#方法名#版本号
|
||||
List<String> notifyUrlParts = StringUtils.split(notifyUrl, "#");
|
||||
// 创建 ApplicationConfig 对象
|
||||
ApplicationConfig application = new ApplicationConfig();
|
||||
application.setName(dubboApplicationName);
|
||||
// 创建 RegistryConfig 对象
|
||||
RegistryConfig registry = new RegistryConfig();
|
||||
// registry.setAddress("zookeeper://127.0.0.1:2181");
|
||||
registry.setAddress(dubboRegistryAddress);
|
||||
application.setRegistry(registry);
|
||||
// 创建 ReferenceConfig 对象
|
||||
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
|
||||
reference.setInterface(notifyUrlParts.get(0)); // 弱类型接口名
|
||||
reference.setGeneric(true); // 声明为泛化接口
|
||||
reference.setApplication(application);
|
||||
reference.setVersion(notifyUrlParts.size() > 2 ? notifyUrlParts.get(2) : "1.0.0"); // 如果未配置服务的版本号,则默认使用 1.0.0
|
||||
// 获得 GenericService 对象
|
||||
GenericService genericService = reference.get();
|
||||
// 构建最终的 ReferenceMeta 对象
|
||||
return new ReferenceMeta(reference, genericService, notifyUrlParts.get(1));
|
||||
}
|
||||
|
||||
public ReferenceMeta getReferenceMeta(String notifyUrl) {
|
||||
DubboReferencePool.ReferenceMeta referenceMeta = referenceMetaCache.getUnchecked(notifyUrl);
|
||||
Assert.notNull(referenceMeta, String.format("notifyUrl(%s) 不存在对应的 ReferenceMeta 对象", notifyUrl));
|
||||
return referenceMeta;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
package cn.iocoder.mall.payservice.common;
|
||||
@@ -1,6 +1,7 @@
|
||||
package cn.iocoder.mall.payservice.dal.mysql.dataobject.refund;
|
||||
|
||||
import cn.iocoder.mall.mybatis.core.dataobject.DeletableDO;
|
||||
import cn.iocoder.mall.payservice.enums.refund.PayRefundStatus;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
@@ -68,7 +69,7 @@ public class PayRefundDO extends DeletableDO {
|
||||
/**
|
||||
* 退款状态
|
||||
*
|
||||
* @see cn.iocoder.mall.pay.api.constant.PayRefundStatus
|
||||
* 外键 {@link PayRefundStatus}
|
||||
*/
|
||||
private Integer status;
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
package cn.iocoder.mall.payservice.dal.mysql.mapper.refund;
|
||||
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.refund.PayRefundDO;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface PayRefundMapper extends BaseMapper<PayRefundDO> {
|
||||
|
||||
default int update(PayRefundDO entity, Integer whereStatus) {
|
||||
return update(entity, new QueryWrapper<PayRefundDO>()
|
||||
.eq("id", entity.getId()).eq("status", whereStatus));
|
||||
}
|
||||
|
||||
default PayRefundDO selectByRefundCode(String refundCode) {
|
||||
return selectOne(new QueryWrapper<PayRefundDO>()
|
||||
.eq("refund_code", refundCode));
|
||||
}
|
||||
|
||||
|
||||
// <if test="createBeginTime != null">
|
||||
// AND create_time >= #{createBeginTime}
|
||||
// </if>
|
||||
// <if test="createEndTime != null">
|
||||
// AND #{createEndTime} >= create_time
|
||||
// </if>
|
||||
// <if test="finishBeginTime != null">
|
||||
// AND finish_time >= #{finishBeginTime}
|
||||
// </if>
|
||||
// <if test="finishEndTime != null">
|
||||
// AND #{finishEndTime} >= finish_time
|
||||
// </if>
|
||||
// <if test="status != null">
|
||||
// AND status = #{status}
|
||||
// </if>
|
||||
// <if test="payChannel != null">
|
||||
// AND pay_channel = #{payChannel}
|
||||
// </if>
|
||||
|
||||
// List<PayRefundDO> selectListByPage(@Param("createBeginTime") Date createBeginTime,
|
||||
// @Param("createEndTime") Date createEndTime,
|
||||
// @Param("finishBeginTime") Date finishBeginTime,
|
||||
// @Param("finishEndTime") Date finishEndTime,
|
||||
// @Param("status") Integer status,
|
||||
// @Param("payChannel") Integer payChannel,
|
||||
// @Param("offset") Integer offset,
|
||||
// @Param("limit") Integer limit);
|
||||
//
|
||||
// Integer selectCountByPage(@Param("createBeginTime") Date createBeginTime,
|
||||
// @Param("createEndTime") Date createEndTime,
|
||||
// @Param("finishBeginTime") Date finishBeginTime,
|
||||
// @Param("finishEndTime") Date finishEndTime,
|
||||
// @Param("status") Integer status,
|
||||
// @Param("payChannel") Integer payChannel);
|
||||
|
||||
}
|
||||
@@ -7,6 +7,42 @@ import org.springframework.stereotype.Repository;
|
||||
|
||||
@Repository
|
||||
public interface PayTransactionMapper extends BaseMapper<PayTransactionDO> {
|
||||
//
|
||||
// UPDATE `transaction`
|
||||
// SET refund_total = refund_total + ${refundTotalIncr}
|
||||
// WHERE price >= refund_total + ${refundTotalIncr}
|
||||
//
|
||||
|
||||
// int updateForRefundTotal(@Param("id") Integer id,
|
||||
// @Param("refundTotalIncr") Integer refundTotalIncr);
|
||||
|
||||
// <if test="createBeginTime != null">
|
||||
// AND create_time >= #{createBeginTime}
|
||||
// </if>
|
||||
// <if test="createEndTime != null">
|
||||
// AND #{createEndTime} >= create_time
|
||||
// </if>
|
||||
// <if test="paymentBeginTime != null">
|
||||
// AND payment_time >= #{paymentBeginTime}
|
||||
// </if>
|
||||
// <if test="paymentEndTime != null">
|
||||
// AND #{paymentEndTime} >= payment_time
|
||||
// </if>
|
||||
// <if test="status != null">
|
||||
// AND status = #{status}
|
||||
// </if>
|
||||
// <if test="hasRefund == true">
|
||||
// AND refund_total > 0
|
||||
// </if>
|
||||
// <if test="hasRefund == false">
|
||||
// AND refund_total = 0
|
||||
// </if>
|
||||
// <if test="payChannel != null">
|
||||
// AND pay_channel = #{payChannel}
|
||||
// </if>
|
||||
// <if test="orderSubject != null">
|
||||
// order_subject LIKE "%"#{orderSubject}"%"
|
||||
// </if>
|
||||
|
||||
// default IPage<PayTransactionDO> selectPage(TransactionPageBO pageBO) {
|
||||
// return selectPage(new Page<>(pageBO.getPageNo(), pageBO.getPageSize()),
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
package cn.iocoder.mall.payservice.mq.consumer;
|
||||
|
||||
import cn.iocoder.common.framework.util.DateUtil;
|
||||
import cn.iocoder.common.framework.util.ExceptionUtil;
|
||||
import cn.iocoder.mall.payservice.common.dubbo.DubboReferencePool;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.notify.PayNotifyLogDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.notify.PayNotifyTaskDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.mapper.notify.PayNotifyLogMapper;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.mapper.notify.PayNotifyTaskMapper;
|
||||
import cn.iocoder.mall.payservice.enums.notify.PayNotifyStatusEnum;
|
||||
import cn.iocoder.mall.payservice.mq.producer.message.AbstractPayNotifySuccessMessage;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
|
||||
public abstract class AbstractPayNotifySuccessMQConsumer<T extends AbstractPayNotifySuccessMessage> implements RocketMQListener<T> {
|
||||
|
||||
@Autowired
|
||||
private DubboReferencePool dubboReferencePool;
|
||||
|
||||
@Autowired
|
||||
private PayNotifyTaskMapper payNotifyTaskMapper;
|
||||
@Autowired
|
||||
private PayNotifyLogMapper payTransactionNotifyLogMapper;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void onMessage(T message) {
|
||||
// 获得 ReferenceMeta 对象
|
||||
DubboReferencePool.ReferenceMeta referenceMeta = dubboReferencePool.getReferenceMeta(message.getNotifyUrl());
|
||||
// 发起调用
|
||||
String response = null; // RPC / HTTP 调用的响应
|
||||
PayNotifyTaskDO updateTask = new PayNotifyTaskDO() // 更新 PayTransactionNotifyTaskDO 对象
|
||||
.setId(message.getId())
|
||||
.setLastExecuteTime(new Date())
|
||||
.setNotifyTimes(message.getNotifyTimes() + 1);
|
||||
try {
|
||||
// TODO 芋艿,这里要优化下,不要在事务里,进行 RPC 调用
|
||||
response = invoke(message, referenceMeta);
|
||||
if ("success".equals(response)) { // 情况一,请求成功且返回成功
|
||||
// 更新通知成功
|
||||
updateTask.setStatus(PayNotifyStatusEnum.SUCCESS.getStatus());
|
||||
payNotifyTaskMapper.updateById(updateTask);
|
||||
// 需要更新支付交易单通知应用成功
|
||||
afterInvokeSuccess(message);
|
||||
} else { // 情况二,请求成功且返回失败
|
||||
// 更新通知请求成功,但是结果失败
|
||||
handleFailure(updateTask, PayNotifyStatusEnum.REQUEST_SUCCESS.getStatus());
|
||||
payNotifyTaskMapper.updateById(updateTask);
|
||||
}
|
||||
} catch (Throwable e) { // 请求失败
|
||||
// 更新通知请求失败
|
||||
response = ExceptionUtil.getRootCauseMessage(e);
|
||||
handleFailure(updateTask, PayNotifyStatusEnum.REQUEST_FAILURE.getStatus());
|
||||
payNotifyTaskMapper.updateById(updateTask);
|
||||
// 抛出异常,回滚事务
|
||||
// TODO 芋艿,此处不能抛出异常。因为,会导致 MQ + 定时任务多重试。此处的目标是,事务回滚 + 吃掉事务。另外,最后的 finally 的日志,要插入成功。
|
||||
// throw e;
|
||||
} finally {
|
||||
// 插入 PayTransactionNotifyLogDO 日志
|
||||
PayNotifyLogDO notifyLog = new PayNotifyLogDO().setNotifyId(message.getId())
|
||||
.setRequest(JSON.toJSONString(message)).setResponse(response).setStatus(updateTask.getStatus());
|
||||
payTransactionNotifyLogMapper.insert(notifyLog);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleFailure(PayNotifyTaskDO updateTask, Integer defaultStatus) {
|
||||
if (updateTask.getNotifyTimes() >= PayNotifyTaskDO.NOTIFY_FREQUENCY.length) {
|
||||
updateTask.setStatus(PayNotifyStatusEnum.FAILURE.getStatus());
|
||||
} else {
|
||||
updateTask.setNextNotifyTime(DateUtil.addDate(Calendar.SECOND, PayNotifyTaskDO.NOTIFY_FREQUENCY[updateTask.getNotifyTimes()]));
|
||||
updateTask.setStatus(defaultStatus);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract String invoke(T message, DubboReferencePool.ReferenceMeta referenceMeta);
|
||||
|
||||
protected abstract void afterInvokeSuccess(T message);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package cn.iocoder.mall.payservice.mq.consumer;
|
||||
|
||||
import cn.iocoder.mall.payservice.common.dubbo.DubboReferencePool;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.refund.PayRefundDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.mapper.refund.PayRefundMapper;
|
||||
import cn.iocoder.mall.payservice.mq.producer.message.PayRefundSuccessMessage;
|
||||
import org.apache.dubbo.rpc.service.GenericService;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Service
|
||||
@RocketMQMessageListener(
|
||||
topic = PayRefundSuccessMessage.TOPIC,
|
||||
consumerGroup = "pay-consumer-group-" + PayRefundSuccessMessage.TOPIC
|
||||
)
|
||||
public class PayRefundSuccessMQConsumer extends AbstractPayNotifySuccessMQConsumer<PayRefundSuccessMessage>
|
||||
implements RocketMQListener<PayRefundSuccessMessage> {
|
||||
|
||||
@Autowired
|
||||
private PayRefundMapper payRefundMapper;
|
||||
|
||||
@Override
|
||||
protected String invoke(PayRefundSuccessMessage message, DubboReferencePool.ReferenceMeta referenceMeta) {
|
||||
// 查询支付交易
|
||||
PayRefundDO refund = payRefundMapper.selectById(message.getRefundId());
|
||||
Assert.notNull(refund, String.format("回调消息(%s) 退款单不能为空", message.toString()));
|
||||
// 执行调用
|
||||
GenericService genericService = referenceMeta.getService();
|
||||
String methodName = referenceMeta.getMethodName();
|
||||
return (String) genericService.$invoke(methodName, new String[]{String.class.getName(), Integer.class.getName()},
|
||||
new Object[]{message.getOrderId(), refund.getPrice()});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterInvokeSuccess(PayRefundSuccessMessage message) {
|
||||
PayRefundDO updateRefund = new PayRefundDO().setId(message.getRefundId()).setFinishTime(new Date());
|
||||
payRefundMapper.updateById(updateRefund);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package cn.iocoder.mall.payservice.mq.consumer;
|
||||
|
||||
import cn.iocoder.mall.payservice.common.dubbo.DubboReferencePool;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.transaction.PayTransactionDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.mapper.transaction.PayTransactionMapper;
|
||||
import cn.iocoder.mall.payservice.mq.producer.message.PayTransactionSuccessMessage;
|
||||
import org.apache.dubbo.rpc.service.GenericService;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Service
|
||||
@RocketMQMessageListener(
|
||||
topic = PayTransactionSuccessMessage.TOPIC,
|
||||
consumerGroup = "pay-consumer-group-" + PayTransactionSuccessMessage.TOPIC
|
||||
)
|
||||
public class PayTransactionSuccessMQConsumer extends AbstractPayNotifySuccessMQConsumer<PayTransactionSuccessMessage>
|
||||
implements RocketMQListener<PayTransactionSuccessMessage> {
|
||||
|
||||
@Autowired
|
||||
private PayTransactionMapper payTransactionMapper;
|
||||
|
||||
@Override
|
||||
protected String invoke(PayTransactionSuccessMessage message, DubboReferencePool.ReferenceMeta referenceMeta) {
|
||||
// 查询支付交易
|
||||
PayTransactionDO transaction = payTransactionMapper.selectById(message.getTransactionId());
|
||||
Assert.notNull(transaction, String.format("回调消息(%s) 订单交易不能为空", message.toString()));
|
||||
// 执行调用
|
||||
GenericService genericService = referenceMeta.getService();
|
||||
String methodName = referenceMeta.getMethodName();
|
||||
return (String) genericService.$invoke(methodName, new String[]{String.class.getName(), Integer.class.getName()},
|
||||
new Object[]{message.getOrderId(), transaction.getPrice()});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterInvokeSuccess(PayTransactionSuccessMessage message) {
|
||||
PayTransactionDO updateTransaction = new PayTransactionDO().setId(message.getTransactionId()).setFinishTime(new Date());
|
||||
payTransactionMapper.updateById(updateTransaction);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user