完成 xxl-job 的接入

This commit is contained in:
YunaiV
2022-06-25 10:18:04 +08:00
parent 4381d938be
commit 3774afe553
27 changed files with 284 additions and 715 deletions

View File

@@ -1,13 +1,10 @@
package cn.iocoder.yudao.framework.tenant.config;
import cn.hutool.core.annotation.AnnotationUtil;
import cn.iocoder.yudao.framework.common.enums.WebFilterOrderEnum;
import cn.iocoder.yudao.framework.mybatis.core.util.MyBatisUtils;
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnoreAspect;
import cn.iocoder.yudao.framework.tenant.core.db.TenantDatabaseInterceptor;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJob;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJobHandlerDecorator;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJobAspect;
import cn.iocoder.yudao.framework.tenant.core.mq.TenantChannelInterceptor;
import cn.iocoder.yudao.framework.tenant.core.mq.TenantFunctionAroundWrapper;
import cn.iocoder.yudao.framework.tenant.core.security.TenantSecurityWebFilter;
@@ -19,6 +16,7 @@ import cn.iocoder.yudao.framework.web.core.handler.GlobalExceptionHandler;
import cn.iocoder.yudao.module.system.api.tenant.TenantApi;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor;
import com.xxl.job.core.executor.XxlJobExecutor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -103,19 +101,25 @@ public class YudaoTenantAutoConfiguration {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof JobHandler)) {
if (!(bean instanceof XxlJobExecutor)) {
return bean;
}
// 有 TenantJob 注解的情况下,才会进行处理
if (!AnnotationUtil.hasAnnotation(bean.getClass(), TenantJob.class)) {
return bean;
}
// 使用 TenantJobHandlerDecorator 装饰
return new TenantJobHandlerDecorator(tenantFrameworkService, (JobHandler) bean);
// // 有 TenantJob 注解的情况下,才会进行处理
// if (!AnnotationUtil.hasAnnotation(bean.getClass(), TenantJob.class)) {
// return bean;
// }
//
// // 使用 TenantJobHandlerDecorator 装饰
// return new TenantJobHandlerDecorator(tenantFrameworkService, (JobHandler) bean);
return bean;
}
};
}
@Bean
public TenantJobAspect tenantJobAspect(TenantFrameworkService tenantFrameworkService) {
return new TenantJobAspect(tenantFrameworkService);
}
}

View File

@@ -0,0 +1,76 @@
package cn.iocoder.yudao.framework.tenant.core.job;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkService;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import java.lang.annotation.Annotation;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Aspect
@RequiredArgsConstructor
@Slf4j
public class TenantJobAspect {
private final TenantFrameworkService tenantFrameworkService;
@Around("@annotation(xxlJob)")
public Object around(ProceedingJoinPoint joinPoint, XxlJob xxlJob) throws Throwable {
// 如果非多租户 Job则跳过
TenantJob tenantJob = getClassAnnotation(joinPoint, TenantJob.class);
if (tenantJob == null) {
return joinPoint.proceed();
}
// 如果是多租户 Job则会按照租户逐个执行 Job 的逻辑
execute(joinPoint, xxlJob);
return null; // JobHandler 无返回
}
private void execute(ProceedingJoinPoint joinPoint, XxlJob xxlJob) {
// 获得租户列表
List<Long> tenantIds = tenantFrameworkService.getTenantIds();
if (CollUtil.isEmpty(tenantIds)) {
return;
}
// 逐个租户,执行 Job
Map<Long, String> results = new ConcurrentHashMap<>();
tenantIds.parallelStream().forEach(tenantId -> { // TODO 芋艿:先通过 parallel 实现并行1多个租户是一条执行日志2异常的情况
TenantUtils.execute(tenantId, () -> {
try {
joinPoint.proceed();
} catch (Throwable e) {
results.put(tenantId, ExceptionUtil.getRootCauseMessage(e));
// 打印异常
XxlJobHelper.log(StrUtil.format("[多租户({}) 执行任务({}),发生异常:{}]",
tenantId, xxlJob.value(), ExceptionUtils.getStackTrace(e)));
}
});
});
// 如果 results 非空,说明发生了异常,标记 XXL-Job 执行失败
if (CollUtil.isNotEmpty(results)) {
XxlJobHelper.handleFail(JsonUtils.toJsonString(results));
}
}
@SuppressWarnings("SameParameterValue")
private static <T extends Annotation> T getClassAnnotation(ProceedingJoinPoint joinPoint, Class<T> annotationClass) {
return ((MethodSignature) joinPoint.getSignature()).getMethod().getDeclaringClass().getAnnotation(annotationClass);
}
}

View File

@@ -1,58 +0,0 @@
package cn.iocoder.yudao.framework.tenant.core.job;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkService;
import lombok.AllArgsConstructor;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 多租户 JobHandler 装饰器
* 任务执行时,会按照租户逐个执行 Job 的逻辑
*
* 注意,需要保证 JobHandler 的幂等性。因为 Job 因为某个租户执行失败重试时,之前执行成功的租户也会再次执行。
*
* @author 芋道源码
*/
@AllArgsConstructor
public class TenantJobHandlerDecorator implements JobHandler {
private final TenantFrameworkService tenantFrameworkService;
/**
* 被装饰的 Job
*/
private final JobHandler jobHandler;
@Override
public final String execute(String param) throws Exception {
// 获得租户列表
List<Long> tenantIds = tenantFrameworkService.getTenantIds();
if (CollUtil.isEmpty(tenantIds)) {
return null;
}
// 逐个租户,执行 Job
Map<Long, String> results = new ConcurrentHashMap<>();
tenantIds.parallelStream().forEach(tenantId -> { // TODO 芋艿:先通过 parallel 实现并行1多个租户是一条执行日志2异常的情况
try {
// 设置租户
TenantContextHolder.setTenantId(tenantId);
// 执行 Job
String result = jobHandler.execute(param);
// 添加结果
results.put(tenantId, result);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
TenantContextHolder.clear();
}
});
return JsonUtils.toJsonString(results);
}
}