mq:移除默认的 spring cloud stream 和 bus,使用原生的 spring-rocketmq、spring-kafka、spring-rabbitmq 替代,降低学习成本,提升使用灵活性。

This commit is contained in:
YunaiV
2023-11-02 13:06:05 +08:00
parent 02693836b2
commit 2450d7afdc
31 changed files with 0 additions and 497 deletions

View File

@@ -1,32 +0,0 @@
package cn.iocoder.yudao.framework.tenant.core.mq;
import cn.hutool.core.util.ReflectUtil;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import java.util.Map;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* 多租户的 {@link ChannelInterceptor} 实现类
* 发送消息时,设置租户编号到 Header 上
*
* @author 芋道源码
*/
public class TenantChannelInterceptor implements ChannelInterceptor {
@Override
@SuppressWarnings({"unchecked", "NullableProblems"})
public Message<?> preSend(Message<?> message, MessageChannel channel) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
Map<String, Object> headers = (Map<String, Object>) ReflectUtil.getFieldValue(message.getHeaders(), "headers");
headers.put(HEADER_TENANT_ID, tenantId);
}
return message;
}
}

View File

@@ -1,36 +0,0 @@
package cn.iocoder.yudao.framework.tenant.core.mq;
import cn.hutool.core.map.MapUtil;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.messaging.Message;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* 多租户 FunctionAroundWrapper 实现类
* 消费消息时,设置租户编号到 Context 上
*
* @author 芋道源码
*/
public class TenantFunctionAroundWrapper extends FunctionAroundWrapper {
@Override
protected Object doApply(Object input, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
// 如果不是 MQ 消息,则直接跳过
if (!(input instanceof Message)) {
return targetFunction.apply(input);
}
// 如果没有多租户,则直接跳过
Message<?> message = (Message<?>) input;
Long tenantId = MapUtil.getLong(message.getHeaders(), HEADER_TENANT_ID);
if (tenantId == null) {
return targetFunction.apply(input);
}
// 如果有多租户,则使用多租户上下文
return TenantUtils.execute(tenantId, () -> targetFunction.apply(input));
}
}

View File

@@ -21,17 +21,6 @@
<dependencies>
<!-- MQ 相关 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<!-- 引入基于 RocketMQ 的 Spring Cloud Bus 的实现的依赖,并实现对其的自动配置 -->
<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -1,15 +1,6 @@
package cn.iocoder.yudao.framework.mq.config;
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.*;
import java.util.ArrayList;
import java.util.List;
/**
* 消息队列配置类
@@ -19,19 +10,4 @@ import java.util.List;
@AutoConfiguration
public class YudaoMQAutoConfiguration {
/**
* 覆盖 {@link RocketMQMessageConverter} 的配置,去掉 fastjson 的转换器,解决不兼容的问题
*/
@Bean(RocketMQMessageConverter.DEFAULT_NAME)
@ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
public CompositeMessageConverter rocketMQMessageConverter() {
List<MessageConverter> messageConverters = new ArrayList<>();
ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
byteArrayMessageConverter.setContentTypeResolver(null);
messageConverters.add(byteArrayMessageConverter);
messageConverters.add(new StringMessageConverter());
messageConverters.add(new MappingJackson2MessageConverter());
return new CompositeMessageConverter(messageConverters);
}
}

View File

@@ -1,41 +0,0 @@
package cn.iocoder.yudao.framework.mq.core.bus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.bus.ServiceMatcher;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import javax.annotation.Resource;
/**
* 基于 Spring Cloud Bus 实现的 Producer 抽象类
*
* @author 芋道源码
*/
public abstract class AbstractBusProducer {
@Resource
protected ApplicationEventPublisher applicationEventPublisher;
@Resource
protected ServiceMatcher serviceMatcher;
@Value("${spring.application.name}")
protected String applicationName;
protected void publishEvent(RemoteApplicationEvent event) {
applicationEventPublisher.publishEvent(event);
}
/**
* @return 只广播给自己服务的实例
*/
protected String selfDestinationService() {
return applicationName + ":**";
}
protected String getBusId() {
return serviceMatcher.getBusId();
}
}