1. 增加 XXL-Job starter
2. 迁移 pay 服务的 Job 逻辑
This commit is contained in:
@@ -37,6 +37,12 @@
|
||||
<artifactId>mall-spring-boot-starter-rocketmq</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Job 相关 -->
|
||||
<dependency>
|
||||
<groupId>cn.iocoder.mall</groupId>
|
||||
<artifactId>mall-spring-boot-starter-xxl-job</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Registry 和 Config 相关 -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
|
||||
@@ -46,9 +46,15 @@ public class DubboReferencePool {
|
||||
@Value("${dubbo.application.name}")
|
||||
private String dubboApplicationName;
|
||||
|
||||
public ReferenceMeta getReferenceMeta(String notifyUrl) {
|
||||
DubboReferencePool.ReferenceMeta referenceMeta = referenceMetaCache.getUnchecked(notifyUrl);
|
||||
Assert.notNull(referenceMeta, String.format("notifyUrl(%s) 不存在对应的 ReferenceMeta 对象", notifyUrl));
|
||||
return referenceMeta;
|
||||
}
|
||||
|
||||
private ReferenceMeta createGenericService(String notifyUrl) {
|
||||
// 使用 # 号分隔,格式为 服务名#方法名#版本号
|
||||
List<String> notifyUrlParts = StringUtils.split(notifyUrl, "#");
|
||||
List<String> notifyUrlParts = this.parseNotifyUrl(notifyUrl);
|
||||
// 创建 ApplicationConfig 对象
|
||||
ApplicationConfig application = new ApplicationConfig();
|
||||
application.setName(dubboApplicationName);
|
||||
@@ -69,10 +75,9 @@ public class DubboReferencePool {
|
||||
return new ReferenceMeta(reference, genericService, notifyUrlParts.get(1));
|
||||
}
|
||||
|
||||
public ReferenceMeta getReferenceMeta(String notifyUrl) {
|
||||
DubboReferencePool.ReferenceMeta referenceMeta = referenceMetaCache.getUnchecked(notifyUrl);
|
||||
Assert.notNull(referenceMeta, String.format("notifyUrl(%s) 不存在对应的 ReferenceMeta 对象", notifyUrl));
|
||||
return referenceMeta;
|
||||
// TODO 芋艿,后续重构成一个对象
|
||||
private List<String> parseNotifyUrl(String notifyUrl) {
|
||||
return StringUtils.split(notifyUrl, "#");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package cn.iocoder.mall.payservice.config;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.EnableAspectJAutoProxy;
|
||||
|
||||
/**
|
||||
* Spring Aop 配置类
|
||||
*/
|
||||
@Configuration
|
||||
@EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true)
|
||||
public class AopConfiguration {
|
||||
}
|
||||
@@ -4,6 +4,8 @@ import cn.iocoder.mall.payservice.dal.mysql.dataobject.notify.PayNotifyTaskDO;
|
||||
import cn.iocoder.mall.payservice.mq.producer.message.PayRefundSuccessMessage;
|
||||
import cn.iocoder.mall.payservice.mq.producer.message.PayTransactionSuccessMessage;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.Mapping;
|
||||
import org.mapstruct.Mappings;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
|
||||
@Mapper
|
||||
@@ -11,8 +13,18 @@ public interface PayNotifyConvert {
|
||||
|
||||
PayNotifyConvert INSTANCE = Mappers.getMapper(PayNotifyConvert.class);
|
||||
|
||||
PayTransactionSuccessMessage convertTransaction(PayNotifyTaskDO payTransactionNotifyTaskDO);
|
||||
@Mappings({
|
||||
@Mapping(source = "transaction.transactionId", target = "transactionId"),
|
||||
@Mapping(source = "transaction.orderId", target = "orderId"),
|
||||
})
|
||||
PayTransactionSuccessMessage convertTransaction(PayNotifyTaskDO entity);
|
||||
|
||||
@Mappings({
|
||||
@Mapping(source = "refund.transactionId", target = "transactionId"),
|
||||
@Mapping(source = "refund.orderId", target = "orderId"),
|
||||
@Mapping(source = "refund.refundId", target = "refundId"),
|
||||
})
|
||||
PayRefundSuccessMessage convertRefund(PayNotifyTaskDO entity);
|
||||
|
||||
PayRefundSuccessMessage convertRefund(PayNotifyTaskDO payTransactionNotifyTaskDO);
|
||||
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import cn.iocoder.mall.payservice.dal.mysql.dataobject.transaction.PayTransactio
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.transaction.PayTransactionExtensionDO;
|
||||
import cn.iocoder.mall.payservice.enums.notify.PayNotifyStatusEnum;
|
||||
import cn.iocoder.mall.payservice.enums.notify.PayNotifyType;
|
||||
import cn.iocoder.mall.payservice.service.transaction.PayTransactionService;
|
||||
import com.baomidou.mybatisplus.annotation.TableField;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
|
||||
@@ -56,25 +55,18 @@ public class PayNotifyTaskDO extends DeletableDO {
|
||||
* 外键 {@link PayNotifyStatusEnum}
|
||||
*/
|
||||
private Integer status;
|
||||
/**
|
||||
* 是否激活中,即处于正在 MQ 异步通知中
|
||||
*
|
||||
* @see cn.iocoder.mall.payservice.job.notify.PayNotifyRetryJob
|
||||
*/
|
||||
private Boolean active;
|
||||
/**
|
||||
* 下一次通知时间
|
||||
*/
|
||||
private Date nextNotifyTime;
|
||||
/**
|
||||
* 最后一次执行时间
|
||||
*
|
||||
* 这个字段,需要结合 {@link #nextNotifyTime} 一起使用。
|
||||
*
|
||||
* 1. 初始时,{@link PayTransactionService#updateTransactionPaySuccess(Integer, String)}
|
||||
* nextNotifyTime 为当前时间 + 15 秒
|
||||
* lastExecuteTime 为空
|
||||
* 并发送给 MQ ,执行执行
|
||||
*
|
||||
* 2. MQ 消费时,更新 lastExecuteTime 为当时时间
|
||||
*
|
||||
* 3. 定时任务,扫描 nextNotifyTime < lastExecuteTime 的任务
|
||||
* nextNotifyTime 为当前时间 + N 秒。具体的 N ,由第几次通知决定
|
||||
* lastExecuteTime 为当前时间
|
||||
*/
|
||||
private Date lastExecuteTime;
|
||||
/**
|
||||
|
||||
@@ -16,16 +16,21 @@ public interface PayNotifyTaskMapper extends BaseMapper<PayNotifyTaskDO> {
|
||||
*
|
||||
* 1. status 非成功
|
||||
* 2. nextNotifyTime 小于当前时间
|
||||
* 3. lastExecuteTime > nextNotifyTime
|
||||
* 3. active 为 false 并未正在执行中
|
||||
*
|
||||
* @return PayTransactionNotifyTaskDO 数组
|
||||
*/
|
||||
default List<PayNotifyTaskDO> selectListByNotify() {
|
||||
return selectList(new QueryWrapper<PayNotifyTaskDO>()
|
||||
.in("status", PayNotifyStatusEnum.WAITING.getName(), PayNotifyStatusEnum.REQUEST_SUCCESS.getName(),
|
||||
PayNotifyStatusEnum.REQUEST_FAILURE.getName())
|
||||
.in("status", PayNotifyStatusEnum.WAITING.getStatus(), PayNotifyStatusEnum.REQUEST_SUCCESS.getStatus(),
|
||||
PayNotifyStatusEnum.REQUEST_FAILURE.getStatus())
|
||||
.le("next_notify_time", "NOW()")
|
||||
.gt("last_execute_time", "next_notify_time"));
|
||||
.eq("active", Boolean.FALSE));
|
||||
}
|
||||
|
||||
default int update(PayNotifyTaskDO update, Integer whereNotifyTimes) {
|
||||
return update(update, new QueryWrapper<PayNotifyTaskDO>()
|
||||
.eq("id", update.getId()).eq("notify_times", whereNotifyTimes));
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
package cn.iocoder.mall.payservice.job.notify;
|
||||
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.notify.PayNotifyTaskDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.mapper.notify.PayNotifyTaskMapper;
|
||||
import cn.iocoder.mall.payservice.service.notify.PayNotifyService;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.IJobHandler;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 支付通知重试 Job
|
||||
*
|
||||
* 由于 RocketMQ 不支持指定时间的延迟消息,所以我们需要通过 Job 扫描到达 {@link PayNotifyTaskDO#getNextNotifyTime()} 时间的任务。
|
||||
* 扫描到后,通过发送 MQ 去异步通知,提高通知效率。
|
||||
*
|
||||
* 考虑到 MQ 执行可能存在延迟的情况,导致一个 {@link PayNotifyTaskDO} 同时触发多个通知,通过 {@link PayNotifyTaskDO#getActive()} 标记解决。
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class PayNotifyRetryJob extends IJobHandler {
|
||||
|
||||
@Autowired
|
||||
private PayNotifyTaskMapper payNotifyTaskMapper;
|
||||
|
||||
@Autowired
|
||||
private PayNotifyService payNotifyService;
|
||||
|
||||
@Override
|
||||
@XxlJob("payNotifyRetryJob")
|
||||
public ReturnT<String> execute(String param) {
|
||||
// 获得需要通知的任务
|
||||
List<PayNotifyTaskDO> notifyTasks = payNotifyTaskMapper.selectListByNotify();
|
||||
|
||||
// 循环任务,发送通知
|
||||
for (PayNotifyTaskDO notifyTask : notifyTasks) {
|
||||
// 发送 MQ
|
||||
payNotifyService.sendNotifyMessage(notifyTask);
|
||||
|
||||
// 标记任务执行中。考虑到 MQ 可能会存在先于该操作执行完,所以更新时,增加一个 notifyTimes 作为额外条件,避免覆盖更新的问题。
|
||||
PayNotifyTaskDO updateNotifyTask = new PayNotifyTaskDO().setId(notifyTask.getId()).setActive(true);
|
||||
payNotifyTaskMapper.update(updateNotifyTask, notifyTask.getNotifyTimes());
|
||||
}
|
||||
return new ReturnT<>("执行通知数:" + notifyTasks.size());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
package cn.iocoder.mall.payservice.job;
|
||||
@@ -2,6 +2,7 @@ package cn.iocoder.mall.payservice.mq.consumer;
|
||||
|
||||
import cn.iocoder.common.framework.util.DateUtil;
|
||||
import cn.iocoder.common.framework.util.ExceptionUtil;
|
||||
import cn.iocoder.common.framework.vo.CommonResult;
|
||||
import cn.iocoder.mall.payservice.common.dubbo.DubboReferencePool;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.notify.PayNotifyLogDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.notify.PayNotifyTaskDO;
|
||||
@@ -10,14 +11,15 @@ import cn.iocoder.mall.payservice.dal.mysql.mapper.notify.PayNotifyTaskMapper;
|
||||
import cn.iocoder.mall.payservice.enums.notify.PayNotifyStatusEnum;
|
||||
import cn.iocoder.mall.payservice.mq.producer.message.AbstractPayNotifySuccessMessage;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class AbstractPayNotifySuccessMQConsumer<T extends AbstractPayNotifySuccessMessage> implements RocketMQListener<T> {
|
||||
public abstract class AbstractPayNotifySuccessMQConsumer<T extends AbstractPayNotifySuccessMessage> {
|
||||
// implements RocketMQListener<T> TODO 芋艿,理论来说,可以实现 RocketMQListener 接口,然后 execute 作为 onMessage 的具体实现。但是新版本貌似不行,后续在排查下;
|
||||
|
||||
@Autowired
|
||||
private DubboReferencePool dubboReferencePool;
|
||||
@@ -27,21 +29,22 @@ public abstract class AbstractPayNotifySuccessMQConsumer<T extends AbstractPayNo
|
||||
@Autowired
|
||||
private PayNotifyLogMapper payTransactionNotifyLogMapper;
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void onMessage(T message) {
|
||||
// 获得 ReferenceMeta 对象
|
||||
DubboReferencePool.ReferenceMeta referenceMeta = dubboReferencePool.getReferenceMeta(message.getNotifyUrl());
|
||||
public void execute(T message) {
|
||||
// 发起调用
|
||||
String response = null; // RPC / HTTP 调用的响应
|
||||
CommonResult<Boolean> invokeResult = null; // RPC / HTTP 调用的响应
|
||||
Throwable invokeException = null; //
|
||||
PayNotifyTaskDO updateTask = new PayNotifyTaskDO() // 更新 PayTransactionNotifyTaskDO 对象
|
||||
.setId(message.getId())
|
||||
.setActive(false) // 标记本地通知已经完成
|
||||
.setLastExecuteTime(new Date())
|
||||
.setNotifyTimes(message.getNotifyTimes() + 1);
|
||||
try {
|
||||
// 获得 ReferenceMeta 对象
|
||||
DubboReferencePool.ReferenceMeta referenceMeta = dubboReferencePool.getReferenceMeta(message.getNotifyUrl());
|
||||
// TODO 芋艿,这里要优化下,不要在事务里,进行 RPC 调用
|
||||
response = invoke(message, referenceMeta);
|
||||
if ("success".equals(response)) { // 情况一,请求成功且返回成功
|
||||
invokeResult = invoke(message, referenceMeta);
|
||||
if (invokeResult.isSuccess()) { // 情况一,请求成功且返回成功
|
||||
// 更新通知成功
|
||||
updateTask.setStatus(PayNotifyStatusEnum.SUCCESS.getStatus());
|
||||
payNotifyTaskMapper.updateById(updateTask);
|
||||
@@ -53,8 +56,8 @@ public abstract class AbstractPayNotifySuccessMQConsumer<T extends AbstractPayNo
|
||||
payNotifyTaskMapper.updateById(updateTask);
|
||||
}
|
||||
} catch (Throwable e) { // 请求失败
|
||||
invokeException = e;
|
||||
// 更新通知请求失败
|
||||
response = ExceptionUtil.getRootCauseMessage(e);
|
||||
handleFailure(updateTask, PayNotifyStatusEnum.REQUEST_FAILURE.getStatus());
|
||||
payNotifyTaskMapper.updateById(updateTask);
|
||||
// 抛出异常,回滚事务
|
||||
@@ -63,7 +66,9 @@ public abstract class AbstractPayNotifySuccessMQConsumer<T extends AbstractPayNo
|
||||
} finally {
|
||||
// 插入 PayTransactionNotifyLogDO 日志
|
||||
PayNotifyLogDO notifyLog = new PayNotifyLogDO().setNotifyId(message.getId())
|
||||
.setRequest(JSON.toJSONString(message)).setResponse(response).setStatus(updateTask.getStatus());
|
||||
.setStatus(updateTask.getStatus())
|
||||
.setRequest(JSON.toJSONString(message))
|
||||
.setResponse(invokeResult != null ? JSON.toJSONString(invokeResult) : ExceptionUtil.getRootCauseMessage(invokeException));
|
||||
payTransactionNotifyLogMapper.insert(notifyLog);
|
||||
}
|
||||
}
|
||||
@@ -77,8 +82,26 @@ public abstract class AbstractPayNotifySuccessMQConsumer<T extends AbstractPayNo
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract String invoke(T message, DubboReferencePool.ReferenceMeta referenceMeta);
|
||||
protected abstract CommonResult<Boolean> invoke(T message, DubboReferencePool.ReferenceMeta referenceMeta);
|
||||
|
||||
protected abstract void afterInvokeSuccess(T message);
|
||||
|
||||
/**
|
||||
* 将 Dubbo 泛化调用的结果,解析成 CommonResult
|
||||
*
|
||||
* 目前,约定 Dubbo 返回的结果为 CommonResult<Boolean>
|
||||
*
|
||||
* @param dubboResult Dubbo 调用结果
|
||||
* @return CommonResult 结果
|
||||
*/
|
||||
protected static CommonResult<Boolean> parseDubboGenericResult(Object dubboResult) {
|
||||
// TODO 芋艿,目前暂时这么实现,未来找下更合适的
|
||||
Map<String, Object> dubboResultMap = (Map<String, Object>) dubboResult;
|
||||
CommonResult<Boolean> commonResult = new CommonResult<>();
|
||||
commonResult.setCode((Integer) dubboResultMap.get("code"));
|
||||
commonResult.setMessage((String) dubboResultMap.get("message"));
|
||||
commonResult.setData((Boolean) dubboResultMap.get("data"));
|
||||
return commonResult;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package cn.iocoder.mall.payservice.mq.consumer;
|
||||
|
||||
import cn.iocoder.common.framework.vo.CommonResult;
|
||||
import cn.iocoder.mall.payservice.common.dubbo.DubboReferencePool;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.refund.PayRefundDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.mapper.refund.PayRefundMapper;
|
||||
@@ -25,15 +26,22 @@ public class PayRefundSuccessMQConsumer extends AbstractPayNotifySuccessMQConsum
|
||||
private PayRefundMapper payRefundMapper;
|
||||
|
||||
@Override
|
||||
protected String invoke(PayRefundSuccessMessage message, DubboReferencePool.ReferenceMeta referenceMeta) {
|
||||
public void onMessage(PayRefundSuccessMessage message) {
|
||||
super.execute(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CommonResult<Boolean> invoke(PayRefundSuccessMessage message, DubboReferencePool.ReferenceMeta referenceMeta) {
|
||||
// 查询支付交易
|
||||
PayRefundDO refund = payRefundMapper.selectById(message.getRefundId());
|
||||
Assert.notNull(refund, String.format("回调消息(%s) 退款单不能为空", message.toString()));
|
||||
// 执行调用
|
||||
GenericService genericService = referenceMeta.getService();
|
||||
String methodName = referenceMeta.getMethodName();
|
||||
return (String) genericService.$invoke(methodName, new String[]{String.class.getName(), Integer.class.getName()},
|
||||
Object dubboResult = genericService.$invoke(methodName,
|
||||
new String[]{String.class.getName(), Integer.class.getName()},
|
||||
new Object[]{message.getOrderId(), refund.getPrice()});
|
||||
return parseDubboGenericResult(dubboResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package cn.iocoder.mall.payservice.mq.consumer;
|
||||
|
||||
import cn.iocoder.common.framework.vo.CommonResult;
|
||||
import cn.iocoder.mall.payservice.common.dubbo.DubboReferencePool;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.transaction.PayTransactionDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.mapper.transaction.PayTransactionMapper;
|
||||
@@ -25,15 +26,22 @@ public class PayTransactionSuccessMQConsumer extends AbstractPayNotifySuccessMQC
|
||||
private PayTransactionMapper payTransactionMapper;
|
||||
|
||||
@Override
|
||||
protected String invoke(PayTransactionSuccessMessage message, DubboReferencePool.ReferenceMeta referenceMeta) {
|
||||
public void onMessage(PayTransactionSuccessMessage message) {
|
||||
super.execute(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CommonResult<Boolean> invoke(PayTransactionSuccessMessage message, DubboReferencePool.ReferenceMeta referenceMeta) {
|
||||
// 查询支付交易
|
||||
PayTransactionDO transaction = payTransactionMapper.selectById(message.getTransactionId());
|
||||
Assert.notNull(transaction, String.format("回调消息(%s) 订单交易不能为空", message.toString()));
|
||||
// 执行调用
|
||||
GenericService genericService = referenceMeta.getService();
|
||||
String methodName = referenceMeta.getMethodName();
|
||||
return (String) genericService.$invoke(methodName, new String[]{String.class.getName(), Integer.class.getName()},
|
||||
Object dubboResult = genericService.$invoke(methodName,
|
||||
new String[]{String.class.getName(), Integer.class.getName()},
|
||||
new Object[]{message.getOrderId(), transaction.getPrice()});
|
||||
return parseDubboGenericResult(dubboResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -17,8 +17,7 @@ public class PayMQProducer {
|
||||
@Autowired
|
||||
private RocketMQTemplate template;
|
||||
|
||||
public void sendPayRefundNotifyTaskMessage(PayRefundSuccessMessage message, Integer refundId, Integer transactionId, String orderId) {
|
||||
message.setRefundId(refundId).setTransactionId(transactionId).setOrderId(orderId);
|
||||
public void sendPayRefundNotifyTaskMessage(PayRefundSuccessMessage message) {
|
||||
try {
|
||||
SendResult sendResult = template.syncSend(PayTransactionSuccessMessage.TOPIC, message);
|
||||
if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
|
||||
@@ -29,8 +28,7 @@ public class PayMQProducer {
|
||||
}
|
||||
}
|
||||
|
||||
public void sendPayTransactionNotifyTaskMessage(PayTransactionSuccessMessage message, Integer transactionId, String orderId) {
|
||||
message.setTransactionId(transactionId).setOrderId(orderId);
|
||||
public void sendPayTransactionNotifyTaskMessage(PayTransactionSuccessMessage message) {
|
||||
try {
|
||||
SendResult sendResult = template.syncSend(PayTransactionSuccessMessage.TOPIC, message);
|
||||
if (!SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package cn.iocoder.mall.payservice.service.notify;
|
||||
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.notify.PayNotifyTaskDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.refund.PayRefundDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.transaction.PayTransactionDO;
|
||||
import cn.iocoder.mall.payservice.dal.mysql.dataobject.transaction.PayTransactionExtensionDO;
|
||||
@@ -15,4 +16,7 @@ public interface PayNotifyService {
|
||||
// TODO 芋艿:后续优化下,不要暴露 entity 出来
|
||||
void addPayTransactionNotifyTask(PayTransactionDO transaction, PayTransactionExtensionDO extension);
|
||||
|
||||
// TODO 芋艿:后续优化下,不要暴露 entity 出来
|
||||
void sendNotifyMessage(PayNotifyTaskDO notifyTask);
|
||||
|
||||
}
|
||||
|
||||
@@ -39,8 +39,7 @@ public class PayNotifyServiceImpl implements PayNotifyService {
|
||||
payNotifyTaskMapper.insert(payNotifyTaskDO);
|
||||
|
||||
// 发送 MQ 消息
|
||||
payMQProducer.sendPayRefundNotifyTaskMessage(PayNotifyConvert.INSTANCE.convertRefund(payNotifyTaskDO),
|
||||
refund.getId(), refund.getTransactionId(), refund.getOrderId());
|
||||
sendNotifyMessage(payNotifyTaskDO);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -54,14 +53,24 @@ public class PayNotifyServiceImpl implements PayNotifyService {
|
||||
payNotifyTaskMapper.insert(payNotifyTaskDO);
|
||||
|
||||
// 发送 MQ 消息
|
||||
payMQProducer.sendPayTransactionNotifyTaskMessage(PayNotifyConvert.INSTANCE.convertTransaction(payNotifyTaskDO),
|
||||
transaction.getId(), transaction.getOrderId());
|
||||
sendNotifyMessage(payNotifyTaskDO);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendNotifyMessage(PayNotifyTaskDO notifyTask) {
|
||||
if (PayNotifyType.TRANSACTION.getType().equals(notifyTask.getType())) {
|
||||
payMQProducer.sendPayTransactionNotifyTaskMessage(PayNotifyConvert.INSTANCE.convertTransaction(notifyTask));
|
||||
} else if (PayNotifyType.REFUND.getType().equals(notifyTask.getType())) {
|
||||
payMQProducer.sendPayRefundNotifyTaskMessage(PayNotifyConvert.INSTANCE.convertRefund(notifyTask));
|
||||
} else {
|
||||
throw new IllegalArgumentException(String.format("通知任务(%s) 无法发送通知消息", notifyTask.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
private PayNotifyTaskDO createBasePayNotifyTaskDO(String appId, String notifyUrl) {
|
||||
return new PayNotifyTaskDO()
|
||||
.setAppId(appId)
|
||||
.setStatus(PayNotifyStatusEnum.WAITING.getStatus())
|
||||
.setStatus(PayNotifyStatusEnum.WAITING.getStatus()).setActive(true)
|
||||
.setNotifyTimes(0).setMaxNotifyTimes(PayNotifyTaskDO.NOTIFY_FREQUENCY.length + 1)
|
||||
.setNextNotifyTime(DateUtil.addDate(Calendar.SECOND, PayNotifyTaskDO.NOTIFY_FREQUENCY[0]))
|
||||
.setNotifyUrl(notifyUrl);
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
spring:
|
||||
# 数据源配置项
|
||||
datasource:
|
||||
url: jdbc:mysql://400-infra.server.iocoder.cn:3306/mall_pay?useSSL=false&useUnicode=true&characterEncoding=UTF-8
|
||||
url: jdbc:mysql://400-infra.server.iocoder.cn:3306/mall_pay?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=CTT
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
username: root
|
||||
password: 3WLiVUBEwTbvAfsh
|
||||
@@ -19,3 +19,13 @@ dubbo:
|
||||
registry:
|
||||
# address: spring-cloud://400-infra.server.iocoder.cn:8848 # 指定 Dubbo 服务注册中心的地址
|
||||
address: nacos://400-infra.server.iocoder.cn:8848?namespace=dev # 指定 Dubbo 服务注册中心的地址
|
||||
|
||||
# XXL-Job 配置项
|
||||
xxl:
|
||||
job:
|
||||
admin:
|
||||
addresses: http://127.0.0.1:9099/
|
||||
executor:
|
||||
appname: ${spring.application.name}
|
||||
logpath: /data/applogs/xxl-job/
|
||||
accessToken:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
spring:
|
||||
# 数据源配置项
|
||||
datasource:
|
||||
url: jdbc:mysql://400-infra.server.iocoder.cn:3306/mall_pay?useSSL=false&useUnicode=true&characterEncoding=UTF-8
|
||||
url: jdbc:mysql://400-infra.server.iocoder.cn:3306/mall_pay?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=CTT
|
||||
driver-class-name: com.mysql.jdbc.Driver
|
||||
username: root
|
||||
password: 3WLiVUBEwTbvAfsh
|
||||
@@ -22,3 +22,19 @@ dubbo:
|
||||
# Dubbo 服务提供者的配置
|
||||
provider:
|
||||
tag: ${DUBBO_TAG} # Dubbo 路由分组
|
||||
|
||||
# XXL-Job 配置项
|
||||
xxl:
|
||||
job:
|
||||
enabled: false # 本地开发时,关闭 XXL-Job
|
||||
admin:
|
||||
addresses: http://400-infra.server.iocoder.cn:9099
|
||||
executor:
|
||||
appname: ${spring.application.name}
|
||||
accessToken:
|
||||
|
||||
|
||||
# MyBatis Plus 配置
|
||||
mybatis-plus:
|
||||
configuration:
|
||||
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 本地开发环境下,多打印 SQL 到控制台
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package cn.iocoder.mall.payservice.common.dubbo;
|
||||
|
||||
import cn.iocoder.common.framework.vo.CommonResult;
|
||||
import org.apache.dubbo.config.ApplicationConfig;
|
||||
import org.apache.dubbo.config.ReferenceConfig;
|
||||
import org.apache.dubbo.config.RegistryConfig;
|
||||
import org.apache.dubbo.rpc.service.GenericService;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class DubboGenericInvokerTest {
|
||||
|
||||
public static void main(String[] args) {
|
||||
ApplicationConfig application = new ApplicationConfig();
|
||||
application.setName("api-generic-consumer");
|
||||
|
||||
RegistryConfig registry = new RegistryConfig();
|
||||
registry.setAddress("nacos://400-infra.server.iocoder.cn:8848?namespace=dev");
|
||||
|
||||
application.setRegistry(registry);
|
||||
|
||||
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
|
||||
// 弱类型接口名
|
||||
reference.setInterface("cn.iocoder.mall.tradeservice.rpc.order.TradeOrderRpc");
|
||||
reference.setVersion("1.0.0");
|
||||
// 声明为泛化接口
|
||||
reference.setGeneric(true);
|
||||
|
||||
reference.setApplication(application);
|
||||
|
||||
// 用com.alibaba.dubbo.rpc.service.GenericService可以替代所有接口引用
|
||||
GenericService genericService = reference.get();
|
||||
|
||||
Object result = genericService.$invoke("updateTradeOrderPaySuccess",
|
||||
new String[]{String.class.getName(), Integer.class.getName()},
|
||||
new Object[]{"1", 100});
|
||||
CommonResult<Boolean> commonResult = parseCommonResult((Map<String, Object>) result);
|
||||
System.out.println(result);
|
||||
}
|
||||
|
||||
private static CommonResult<Boolean> parseCommonResult(Map<String, Object> dubboResult) {
|
||||
CommonResult<Boolean> commonResult = new CommonResult<>();
|
||||
commonResult.setCode((Integer) dubboResult.get("code"));
|
||||
commonResult.setMessage((String) dubboResult.get("message"));
|
||||
commonResult.setData((Boolean) dubboResult.get("data"));
|
||||
return commonResult;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
package cn.iocoder.mall.payservice.common;
|
||||
Reference in New Issue
Block a user