1. 接入 spring cloud stream,支持多租户

2. 弱化 spring cloud dubbo 集成,可通过加入依赖自动实现
This commit is contained in:
YunaiV
2022-06-22 23:59:19 +08:00
parent 4807547d73
commit 4381d938be
21 changed files with 125 additions and 474 deletions

View File

@@ -20,12 +20,6 @@
<url>https://github.com/YunaiV/ruoyi-vue-pro</url>
<dependencies>
<!-- DB 相关 -->
<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-redis</artifactId>
</dependency>
<!-- MQ 相关 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>

View File

@@ -1,15 +1,10 @@
package cn.iocoder.yudao.framework.mq.config;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.converter.*;
import java.util.ArrayList;
@@ -21,19 +16,8 @@ import java.util.List;
* @author 芋道源码
*/
@Configuration
@AutoConfigureAfter(YudaoRedisAutoConfiguration.class)
@Slf4j
public class YudaoMQAutoConfiguration {
@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
List<RedisMessageInterceptor> interceptors) {
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
// 添加拦截器
interceptors.forEach(redisMQTemplate::addInterceptor);
return redisMQTemplate;
}
/**
* 覆盖 {@link RocketMQMessageConverter} 的配置,去掉 fastjson 的转换器,解决不兼容的问题
*/

View File

@@ -1,87 +0,0 @@
package cn.iocoder.yudao.framework.mq.core;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.ArrayList;
import java.util.List;
/**
* Redis MQ 操作模板类
*
* @author 芋道源码
*/
@AllArgsConstructor
public class RedisMQTemplate {
@Getter
private final RedisTemplate<String, ?> redisTemplate;
/**
* 拦截器数组
*/
@Getter
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
/**
* 发送 Redis 消息,基于 Redis pub/sub 实现
*
* @param message 消息
*/
public <T extends AbstractChannelMessage> void send(T message) {
try {
sendMessageBefore(message);
// 发送消息
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
} finally {
sendMessageAfter(message);
}
}
/**
* 发送 Redis 消息,基于 Redis Stream 实现
*
* @param message 消息
* @return 消息记录的编号对象
*/
public <T extends AbstractStreamMessage> RecordId send(T message) {
try {
sendMessageBefore(message);
// 发送消息
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
.withStreamKey(message.getStreamKey())); // 设置 stream key
} finally {
sendMessageAfter(message);
}
}
/**
* 添加拦截器
*
* @param interceptor 拦截器
*/
public void addInterceptor(RedisMessageInterceptor interceptor) {
interceptors.add(interceptor);
}
private void sendMessageBefore(AbstractRedisMessage message) {
// 正序
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
}
private void sendMessageAfter(AbstractRedisMessage message) {
// 倒序
for (int i = interceptors.size() - 1; i >= 0; i--) {
interceptors.get(i).sendMessageAfter(message);
}
}
}

View File

@@ -1,26 +0,0 @@
package cn.iocoder.yudao.framework.mq.core.interceptor;
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
/**
* {@link AbstractRedisMessage} 消息拦截器
* 通过拦截器,作为插件机制,实现拓展。
* 例如说,多租户场景下的 MQ 消息处理
*
* @author 芋道源码
*/
public interface RedisMessageInterceptor {
default void sendMessageBefore(AbstractRedisMessage message) {
}
default void sendMessageAfter(AbstractRedisMessage message) {
}
default void consumeMessageBefore(AbstractRedisMessage message) {
}
default void consumeMessageAfter(AbstractRedisMessage message) {
}
}

View File

@@ -1,29 +0,0 @@
package cn.iocoder.yudao.framework.mq.core.message;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* Redis 消息抽象基类
*
* @author 芋道源码
*/
@Data
public abstract class AbstractRedisMessage {
/**
* 头
*/
private Map<String, String> headers = new HashMap<>();
public String getHeader(String key) {
return headers.get(key);
}
public void addHeader(String key, String value) {
headers.put(key, value);
}
}

View File

@@ -0,0 +1,4 @@
/**
* TODO 芋艿,后续删除,临时占位
*/
package cn.iocoder.yudao.framework.mq.core;

View File

@@ -1,21 +0,0 @@
package cn.iocoder.yudao.framework.mq.core.pubsub;
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Redis Channel Message 抽象类
*
* @author 芋道源码
*/
public abstract class AbstractChannelMessage extends AbstractRedisMessage {
/**
* 获得 Redis Channel
*
* @return Channel
*/
@JsonIgnore // 避免序列化。原因是Redis 发布 Channel 消息的时候,已经会指定。
public abstract String getChannel();
}

View File

@@ -1,103 +0,0 @@
package cn.iocoder.yudao.framework.mq.core.pubsub;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
import lombok.Setter;
import lombok.SneakyThrows;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.lang.reflect.Type;
import java.util.List;
/**
* Redis Pub/Sub 监听器抽象类,用于实现广播消费
*
* @param <T> 消息类型。一定要填写噢,不然会报错
*
* @author 芋道源码
*/
public abstract class AbstractChannelMessageListener<T extends AbstractChannelMessage> implements MessageListener {
/**
* 消息类型
*/
private final Class<T> messageType;
/**
* Redis Channel
*/
private final String channel;
/**
* RedisMQTemplate
*/
@Setter
private RedisMQTemplate redisMQTemplate;
@SneakyThrows
protected AbstractChannelMessageListener() {
this.messageType = getMessageClass();
this.channel = messageType.newInstance().getChannel();
}
/**
* 获得 Sub 订阅的 Redis Channel 通道
*
* @return channel
*/
public final String getChannel() {
return channel;
}
@Override
public final void onMessage(Message message, byte[] bytes) {
T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
try {
consumeMessageBefore(messageObj);
// 消费消息
this.onMessage(messageObj);
} finally {
consumeMessageAfter(messageObj);
}
}
/**
* 处理消息
*
* @param message 消息
*/
public abstract void onMessage(T message);
/**
* 通过解析类上的泛型,获得消息类型
*
* @return 消息类型
*/
@SuppressWarnings("unchecked")
private Class<T> getMessageClass() {
Type type = TypeUtil.getTypeArgument(getClass(), 0);
if (type == null) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
return (Class<T>) type;
}
private void consumeMessageBefore(AbstractRedisMessage message) {
assert redisMQTemplate != null;
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
// 正序
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
}
private void consumeMessageAfter(AbstractRedisMessage message) {
assert redisMQTemplate != null;
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
// 倒序
for (int i = interceptors.size() - 1; i >= 0; i--) {
interceptors.get(i).consumeMessageAfter(message);
}
}
}

View File

@@ -1,21 +0,0 @@
package cn.iocoder.yudao.framework.mq.core.stream;
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Redis Stream Message 抽象类
*
* @author 芋道源码
*/
public abstract class AbstractStreamMessage extends AbstractRedisMessage {
/**
* 获得 Redis Stream Key
*
* @return Channel
*/
@JsonIgnore // 避免序列化
public abstract String getStreamKey();
}

View File

@@ -1,113 +0,0 @@
package cn.iocoder.yudao.framework.mq.core.stream;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import java.lang.reflect.Type;
import java.util.List;
/**
* Redis Stream 监听器抽象类,用于实现集群消费
*
* @param <T> 消息类型。一定要填写噢,不然会报错
*
* @author 芋道源码
*/
public abstract class AbstractStreamMessageListener<T extends AbstractStreamMessage>
implements StreamListener<String, ObjectRecord<String, String>> {
/**
* 消息类型
*/
private final Class<T> messageType;
/**
* Redis Channel
*/
@Getter
private final String streamKey;
/**
* Redis 消费者分组,默认使用 spring.application.name 名字
*/
@Value("${spring.application.name}")
@Getter
private String group;
/**
* RedisMQTemplate
*/
@Setter
private RedisMQTemplate redisMQTemplate;
@SneakyThrows
protected AbstractStreamMessageListener() {
this.messageType = getMessageClass();
this.streamKey = messageType.newInstance().getStreamKey();
}
@Override
public void onMessage(ObjectRecord<String, String> message) {
// 消费消息
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
try {
consumeMessageBefore(messageObj);
// 消费消息
this.onMessage(messageObj);
// ack 消息消费完成
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
// TODO 芋艿:需要额外考虑以下几个点:
// 1. 处理异常的情况
// 2. 发送日志;以及事务的结合
// 3. 消费日志;以及通用的幂等性
// 4. 消费失败的重试https://zhuanlan.zhihu.com/p/60501638
} finally {
consumeMessageAfter(messageObj);
}
}
/**
* 处理消息
*
* @param message 消息
*/
public abstract void onMessage(T message);
/**
* 通过解析类上的泛型,获得消息类型
*
* @return 消息类型
*/
@SuppressWarnings("unchecked")
private Class<T> getMessageClass() {
Type type = TypeUtil.getTypeArgument(getClass(), 0);
if (type == null) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
return (Class<T>) type;
}
private void consumeMessageBefore(AbstractRedisMessage message) {
assert redisMQTemplate != null;
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
// 正序
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
}
private void consumeMessageAfter(AbstractRedisMessage message) {
assert redisMQTemplate != null;
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
// 倒序
for (int i = interceptors.size() - 1; i >= 0; i--) {
interceptors.get(i).consumeMessageAfter(message);
}
}
}