Initial commit

This commit is contained in:
Eric
2026-01-16 18:51:16 +08:00
commit 98c057de11
280 changed files with 16665 additions and 0 deletions

View File

@@ -0,0 +1,22 @@
package cn.lingniu.framework.plugin.rocketmq;
import org.springframework.stereotype.Component;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RocketMqConsumer {
/**
* 自定义消费端名称
*/
String value();
}

View File

@@ -0,0 +1,13 @@
package cn.lingniu.framework.plugin.rocketmq;
import cn.lingniu.framework.plugin.rocketmq.core.producer.GeneralRocketMqProducerInner;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
/**
* 生产基类
*/
@Slf4j
public class RocketMqProducer<T extends Serializable> extends GeneralRocketMqProducerInner<T> {
}

View File

@@ -0,0 +1,15 @@
package cn.lingniu.framework.plugin.rocketmq.config;
/**
* 消费者模式
*/
public enum ConsumeMode {
/**
* 并发消费
*/
CONCURRENTLY,
/**
* 顺序消费
*/
ORDERLY
}

View File

@@ -0,0 +1,105 @@
package cn.lingniu.framework.plugin.rocketmq.config;
import lombok.Data;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* 消费端配置信息
*/
@Data
public class ConsumerProperties {
/**
* RocketMQ nameServer*
*/
private String nameServer;
/**
* 消费应用BeanName
*/
private String consumerBeanName;
/**
* Topic
*/
private String topic;
/**
* 消费组
*/
private String consumerGroup;
/**
* 消息表达式类型 默认按照TAG
*/
private SelectorType selectorType = SelectorType.TAG;
/**
* 表达式
*/
private String selectorExpress;
/**
* 默认同时消费
*/
private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;
/**
* 默认集群消费
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* 消费者最小线程
*/
private Integer consumeThreadMin;
/**
* 消费者最大线程
*/
private Integer consumeThreadMax;
/**
* 批量消费数量
* @see MessageListenerConcurrently#consumeMessage
*/
private int consumeMessageBatchMaxSize;
/**
* 批量拉取消息数量
*/
private int pullBatchSize = 32;
/**
* 是否批量处理方法
*/
private Boolean consumerbatchMode;
/**
* 消息的最大重试次数
*/
private int maxRetryTimes = 15;
/**
* 客户端名称
*/
private String clientUnitName;
public ConsumerProperties() {
this("", "", "", "*", 20, 64, 1, 32, 4);
this.nameServer = "";
}
public ConsumerProperties(
String consumerGroup,
String consumerBeanName,
String topic,
String selectorExpress,
int consumeThreadMin,
int consumeThreadMax,
int consumeMessageBatchMaxSize,
int pullBatchSize,
int maxRetryTimes
) {
this.selectorType = SelectorType.TAG;
this.messageModel = MessageModel.CLUSTERING;
this.consumerGroup = consumerGroup;
this.topic = topic;
this.consumerBeanName = consumerBeanName;
this.selectorExpress = selectorExpress;
this.consumeThreadMin = consumeThreadMin;
this.consumeThreadMax = consumeThreadMax;
this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
this.consumeMode = ConsumeMode.CONCURRENTLY;
this.pullBatchSize = pullBatchSize;
this.maxRetryTimes = maxRetryTimes;
}
}

View File

@@ -0,0 +1,79 @@
package cn.lingniu.framework.plugin.rocketmq.config;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* 生产端配置信息--很多优化参数可以扩展
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProducerProperties {
/**
* RocketMQ nameServer
*/
String nameServer;
/**
* name of producer
*/
private String group;
/**
* 生产端Topic
*/
private String topic;
private String clientUnitName;
/**
* millis of send message timeout
*/
private int sendMsgTimeout = 3000;
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
* <p> Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private boolean retryAnotherBrokerWhenNotStoreOk = false;
/**
* 是否开启异常切换
*/
private boolean sendLatencyFaultEnable = false;
/**
* Maximum allowed message size in bytes.
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M
/**
* 事务消息线程配置
*/
private Transaction transaction = new Transaction();
/**
* 是否事务型
*/
private Boolean isTransactionMQ = false;
@Getter
@Setter
public static class Transaction {
private int corePoolSize = 5;
private int maximumPoolSize = 10;
private int keepAliveTime = 200;
private int queueCapacity = 2000;
}
}

View File

@@ -0,0 +1,27 @@
package cn.lingniu.framework.plugin.rocketmq.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.HashMap;
import java.util.Map;
@Setter
@Getter
@ConfigurationProperties(prefix = RocketMqConfig.PRE_FIX)
public class RocketMqConfig {
public final static String PRE_FIX = "framework.lingniu.rocketmq";
@Getter
@Setter
private Map<String, ProducerProperties> producers = new HashMap<>();
@Getter
@Setter
private Map<String, ConsumerProperties> consumers = new HashMap<>();
}

View File

@@ -0,0 +1,19 @@
package cn.lingniu.framework.plugin.rocketmq.config;
import org.apache.rocketmq.common.filter.ExpressionType;
/**
* 过滤类型
*/
public enum SelectorType {
/**
* @see ExpressionType#TAG
*/
TAG,
/**
* @see ExpressionType#SQL92
*/
SQL92
}

View File

@@ -0,0 +1,63 @@
package cn.lingniu.framework.plugin.rocketmq.core;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.TimeZone;
/**
* Jackson 序列化
**/
@Slf4j
public class RocketMqBodyJacksonSerializer implements RocketMqBodySerializer {
public static final String DEFAULT_ZONE = "GMT+08:00";
public static final String DEFAULT_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
private static ObjectMapper objectMapper;
static {
objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
SimpleDateFormat smt = new SimpleDateFormat(DEFAULT_FORMAT);
TimeZone timeZone = TimeZone.getTimeZone(DEFAULT_ZONE);
objectMapper.setDateFormat(smt);
objectMapper.setTimeZone(timeZone);
objectMapper.registerModule(new ParameterNamesModule()).registerModule(new Jdk8Module()).registerModule(new JavaTimeModule());
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
objectMapper.configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false);
objectMapper.configure(SerializationFeature.WRITE_ENUMS_USING_TO_STRING, true);
}
@Override
public byte[] serialize(Object t) {
try {
return objectMapper.writeValueAsBytes(t);
} catch (Exception ex) {
log.error("JacksonMq序列化异常", ex);
throw new IllegalArgumentException(ex);
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try {
return objectMapper.readValue(bytes, clazz);
} catch (Exception ex) {
log.error(String.format("JacksonMq反序列化异常,原始Json: %s", new String(bytes, StandardCharsets.UTF_8)), ex);
throw new IllegalArgumentException(ex);
}
}
}

View File

@@ -0,0 +1,13 @@
package cn.lingniu.framework.plugin.rocketmq.core;
/**
* RocketMq 序列化
**/
public interface RocketMqBodySerializer {
byte[] serialize(Object t);
<T> T deserialize(byte[] bytes, Class<T> clazz);
}

View File

@@ -0,0 +1,315 @@
package cn.lingniu.framework.plugin.rocketmq.core.consumer;
import cn.lingniu.framework.plugin.rocketmq.core.consumer.listener.MRocketMqListener;
import cn.lingniu.framework.plugin.rocketmq.core.consumer.listener.RocketMqListener;
import cn.lingniu.framework.plugin.rocketmq.config.ConsumeMode;
import cn.lingniu.framework.plugin.rocketmq.config.SelectorType;
import cn.lingniu.framework.plugin.rocketmq.core.RocketMqBodySerializer;
import cn.lingniu.framework.plugin.util.validation.ObjectEmptyUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@Slf4j
@SuppressWarnings("all")
public class DefaultRocketMqListenerContainer implements InitializingBean, RocketMqListenerContainer {
@Setter
@Getter
private long suspendCurrentQueueTimeMillis = 1000;
/**
* Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
* >0,client control retry frequency
*/
@Setter
@Getter
private int delayLevelWhenNextConsume = 0;
@Setter
@Getter
private String consumerGroup;
@Setter
@Getter
private String nameServer;
@Setter
@Getter
private String topic;
@Setter
@Getter
private ConsumeMode consumeMode = ConsumeMode.CONCURRENTLY;
@Setter
@Getter
private SelectorType selectorType = SelectorType.TAG;
@Setter
@Getter
private String selectorExpress = "*";
@Setter
@Getter
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* 最大线程数
*/
@Setter
@Getter
private int consumeThreadMax = 20;
/**
* 最小线程数
*/
@Setter
@Getter
private int consumeThreadMin = 20;
/**
* 最大重试次数
*/
@Getter
@Setter
private int maxRetryTimes = 15;
/**
* 批量消费数量
*/
@Setter
@Getter
private int consumeMessageBatchMaxSize = 1;
/**
* 批量拉取消息数量
*/
@Setter
@Getter
private int pullBatchSize = 32;
@Setter
@Getter
private int pollNameServerInterval = 1000 * 30;
@Setter
@Getter
private int heartbeatBrokerInterval = 1000 * 30;
@Setter
@Getter
private Boolean isBatchMode;
@Getter
@Setter
private String charset = "UTF-8";
@Getter
@Setter
private String clientUnitName;
@Getter
@Setter
private RocketMqBodySerializer serializer;
@Setter
@Getter
private boolean started;
@Setter
private RocketMqListener rocketMQListener;
private DefaultMQPushConsumer consumer;
private Class messageType;
@Override
public void setupMessageListener(RocketMqListener rocketMQListener) {
this.rocketMQListener = rocketMQListener;
}
@Override
public void destroy() {
this.setStarted(false);
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
log.info("DefaultRocketMQListenerContainer destroyed, {}", this.toString());
}
public synchronized void start() throws MQClientException {
if (this.isStarted()) {
throw new IllegalStateException("DefaultRocketMQListenerContainer already started. " + this.toString());
}
initRocketMQPushConsumer();
this.messageType = getMessageType();
consumer.start();
this.setStarted(true);
if (log.isInfoEnabled()) {
log.info("开启TOPIC: {} 监听端,消费组:{},消息类型:{},线程信息:{} ~ {},消费模式:{},过虑类型:{},标签:{}", this.getTopic(), this.getConsumerGroup(), messageType.getSimpleName(), this.getConsumeThreadMin()
, this.getConsumeThreadMax(), this.consumeMode.name(), this.getSelectorType().name(), this.selectorExpress
);
}
}
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
private final String topic;
private final String group;
public DefaultMessageListenerConcurrently(String topic, String group) {
this.topic = topic;
this.group = group;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
List<MessageExt> outRetrys = msgs.stream().filter(n -> n.getReconsumeTimes() > maxRetryTimes).collect(Collectors.toList());
msgs = msgs.stream().filter(n -> n.getReconsumeTimes() <= maxRetryTimes).collect(Collectors.toList());
if (ObjectEmptyUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
Throwable throwable = null;
try {
if (rocketMQListener instanceof MRocketMqListener) {
((MRocketMqListener) rocketMQListener).onMessage(
msgs.stream().map(m -> {
RocketMqConsumerMessage item = new RocketMqConsumerMessage<>(this.topic);
item.setOriginMsg(m);
item.setMsg(doConvertMessage(m));
return item;
}).collect(Collectors.toList())
);
} else {
msgs.forEach(p -> {rocketMQListener.onMessage(doConvertMessage(p), p);});
}
} catch (Throwable ex) {
throwable = ex;
log.error(String.format("%s 消费 %s:%s 发生系统异常", this.getClass().getSimpleName(), this.topic, this.group), ex);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
private final String topic;
private final String group;
public DefaultMessageListenerOrderly(String topic, String group) {
this.topic = topic;
this.group = group;
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
List<MessageExt> outRetrys = msgs.stream().filter(n -> n.getReconsumeTimes() > maxRetryTimes).collect(Collectors.toList());
msgs = msgs.stream().filter(n -> n.getReconsumeTimes() <= maxRetryTimes).collect(Collectors.toList());
if (ObjectEmptyUtils.isEmpty(msgs)) {
return ConsumeOrderlyStatus.SUCCESS;
}
Throwable throwable = null;
try {
if (rocketMQListener instanceof MRocketMqListener) {
((MRocketMqListener) rocketMQListener).onMessage(
msgs.stream().map(m -> {
RocketMqConsumerMessage item = new RocketMqConsumerMessage<>(this.topic);
item.setOriginMsg(m);
item.setMsg(doConvertMessage(m));
return item;
}).collect(Collectors.toList())
);
} else {
msgs.forEach(p -> {
rocketMQListener.onMessage(doConvertMessage(p), p);
});
}
} catch (Throwable ex) {
throwable = ex;
log.error(String.format("%s 消费 %s:%s 发生系统异常", this.getClass().getSimpleName(), this.topic, this.group), ex);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
@Override
public void afterPropertiesSet() throws Exception {
start();
}
@SuppressWarnings("unchecked")
private Object doConvertMessage(MessageExt messageExt) {
if (Objects.equals(messageType, MessageExt.class)) {
return messageExt;
} else {
if (Objects.equals(messageType, String.class)) {
return new String(messageExt.getBody(), Charset.forName(charset));
} else {
return serializer.deserialize(messageExt.getBody(), messageType);
}
}
}
private Class getMessageType() {
if (Proxy.isProxyClass(rocketMQListener.getClass())) {
return rocketMQListener.getClassType();
}
Type[] interfaces = rocketMQListener.getClass().getGenericInterfaces();
if (Objects.nonNull(interfaces)) {
for (Type type : interfaces) {
if (type instanceof ParameterizedType) {
ParameterizedType parameterizedType = (ParameterizedType) type;
if (Objects.equals(parameterizedType.getRawType(), RocketMqListener.class)
|| Objects.equals(parameterizedType.getRawType(), MRocketMqListener.class)) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
return (Class) actualTypeArguments[0];
} else {
return Object.class;
}
}
}
}
return Object.class;
} else {
return Object.class;
}
}
private void initRocketMQPushConsumer() throws MQClientException {
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required");
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setPullBatchSize(pullBatchSize);
consumer.setMessageModel(messageModel);
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
consumer.setUnitName(clientUnitName);
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpress);
break;
case SQL92:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpress));
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
switch (consumeMode) {
case ORDERLY:
consumer.registerMessageListener(new DefaultMessageListenerOrderly(topic, consumerGroup));
break;
case CONCURRENTLY:
consumer.registerMessageListener(new DefaultMessageListenerConcurrently(topic, consumerGroup));
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}
}
}

View File

@@ -0,0 +1,22 @@
package cn.lingniu.framework.plugin.rocketmq.core.consumer;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.rocketmq.common.message.MessageExt;
@Data
@Accessors(chain = true)
@NoArgsConstructor
public class RocketMqConsumerMessage<T> {
public RocketMqConsumerMessage(String topic) {
this.topic = topic;
}
private T msg;
private String topic;
private MessageExt originMsg;
}

View File

@@ -0,0 +1,10 @@
package cn.lingniu.framework.plugin.rocketmq.core.consumer;
import cn.lingniu.framework.plugin.rocketmq.core.consumer.listener.RocketMqListener;
import org.springframework.beans.factory.DisposableBean;
public interface RocketMqListenerContainer extends DisposableBean {
void setupMessageListener(RocketMqListener<?> messageListener);
}

View File

@@ -0,0 +1,55 @@
package cn.lingniu.framework.plugin.rocketmq.core.consumer.listener;
import cn.lingniu.framework.plugin.rocketmq.core.consumer.RocketMqConsumerMessage;
import cn.lingniu.framework.plugin.util.validation.ObjectEmptyUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.util.List;
@Slf4j
public abstract class GeneralMsgListener<T extends Serializable> implements MRocketMqListener<T>, RocketMqListener<T> {
@Getter
private Class<T> messageType;
/**
* 获取真实参数类型
*/
public GeneralMsgListener() {
ParameterizedType pt = (ParameterizedType) this.getClass().getGenericSuperclass();
this.messageType = (Class<T>) pt.getActualTypeArguments()[0];
}
/**
* 消息消费主体
* @param msg
*/
protected abstract void onMessage(RocketMqConsumerMessage<T> msg);
/**
* 解决代理模式范型丢失问题
*/
@Override
public Class<T> getClassType() {
return this.messageType;
}
@Override
public void onMessage(List<RocketMqConsumerMessage<T>> msgs) {
if (ObjectEmptyUtils.isEmpty(msgs)) {
return;
}
msgs.stream().forEach(n -> onMessage(n));
}
@Override
public final void onMessage(T message, MessageExt messageExt) {
RocketMqConsumerMessage<T> msg = new RocketMqConsumerMessage<>();
msg.setMsg(message);
msg.setOriginMsg(messageExt);
onMessage(msg);
}
}

View File

@@ -0,0 +1,13 @@
package cn.lingniu.framework.plugin.rocketmq.core.consumer.listener;
import cn.lingniu.framework.plugin.rocketmq.core.consumer.RocketMqConsumerMessage;
import java.util.List;
/**
* 批量消费
*/
public interface MRocketMqListener<T> extends RocketMqListener<T> {
void onMessage(List<RocketMqConsumerMessage<T>> msgs);
}

View File

@@ -0,0 +1,18 @@
package cn.lingniu.framework.plugin.rocketmq.core.consumer.listener;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @param <T>
*/
public interface RocketMqListener<T> {
void onMessage(T message, MessageExt messageExt);
/**
* 消费端处理数据类型
*/
default Class<T> getClassType() {
return null;
}
}

View File

@@ -0,0 +1,164 @@
package cn.lingniu.framework.plugin.rocketmq.core.producer;
import cn.lingniu.framework.plugin.rocketmq.core.producer.call.SendMessageOnFail;
import cn.lingniu.framework.plugin.rocketmq.core.producer.call.SendMessageOnSuccess;
import cn.lingniu.framework.plugin.util.json.JsonUtil;
import cn.lingniu.framework.plugin.util.validation.ObjectEmptyUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.DisposableBean;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* 生产内部基类
*/
@Slf4j
public abstract class GeneralRocketMqProducerInner<T extends Serializable> implements RocketMqProducerInner<T>, DisposableBean {
private RocketMqTemplate rocketMQTemplate;
private String topic;
private String tag;
@Override
public void setRocketMQTemplate(RocketMqTemplate template, String topic) {
rocketMQTemplate = template;
this.topic = topic;
}
@Override
public void asyncSend(RocketMqSendMsgBody<T> message, SendMessageOnFail<T> onMsgFail) {
asyncSend(message, (m, r) -> {}, onMsgFail);
}
@Override
public void asyncSend(RocketMqSendMsgBody<T> message) {
asyncSend(message, (m, r) -> {}, (m, ex) -> {});
}
@Override
public void asyncSend(RocketMqSendMsgBody<T> message, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail) {
rocketMQTemplate.asyncSend(ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : this.topic,
ObjectEmptyUtils.isNotEmpty(message.getMessageTag()) ? message.getMessageTag() : this.tag,
message.getBody(), buildMessageProperties(message), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
if (null != onMsgSuccess) {
onMsgSuccess.call(message.getBody(), sendResult);
}
if (log.isInfoEnabled() && ObjectEmptyUtils.isNotEmpty(message) && ObjectEmptyUtils.isNotEmpty(message.getMessageKey())) {
log.info("消息:{} 异步写入:{} 成功", message.getMessageKey(), ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : topic);
}
}
@Override
public void onException(Throwable e) {
log.error(message.getClass().getSimpleName() + " MQ异步写入失败:" + JsonUtil.bean2Json(message), e);
if (null != onMsgFail) {
onMsgFail.call(message.getBody(), e);
}
}
}, rocketMQTemplate.getProducer().getSendMsgTimeout());
}
@Override
public SendResult syncSend(RocketMqSendMsgBody<T> message, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail) {
SendResult result = rocketMQTemplate.syncSend(ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : this.topic,
ObjectEmptyUtils.isNotEmpty(message.getMessageTag()) ? message.getMessageTag() : this.tag, message.getBody(), buildMessageProperties(message));
if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error("消息 {} 同步写入 {} 失败,结果:{}", message.getMessageKey(), ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : this.topic, result.getSendStatus());
if (!ObjectEmptyUtils.isEmpty(onMsgFail)) {
onMsgFail.call(message.getBody(), null);
}
} else {
onMsgSuccess.call(message.getBody(), result);
}
return result;
}
@Override
public SendResult syncSend(RocketMqSendMsgBody<T> message, MessageQueueSelector selector, String selectorKey, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail) {
SendResult result = rocketMQTemplate.syncSend(ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : this.topic,
ObjectEmptyUtils.isNotEmpty(message.getMessageTag()) ? message.getMessageTag() : this.tag, message.getBody(), buildMessageProperties(message), selector, selectorKey);
if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error("消息 {} 同步写入 {} 失败,结果:{}", message.getMessageKey(), ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : this.topic, result.getSendStatus());
if (!ObjectEmptyUtils.isEmpty(onMsgFail)) {
onMsgFail.call(message.getBody(), null);
}
} else {
onMsgSuccess.call(message.getBody(), result);
}
return result;
}
@Override
public SendResult syncSend(RocketMqSendMsgBody<T> message, String selectorKey, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail) {
return syncSend(message, new SelectMessageQueueByHash(), selectorKey, onMsgSuccess, onMsgFail);
}
@Override
public SendResult syncSend(RocketMqSendMsgBody<T> message) {
return syncSend(message, (m, r) -> {}, null);
}
@Override
public SendResult syncSendInTransaction(RocketMqSendMsgBody<T> message, Object arg) {
return syncSendInTransaction(message, arg, (n, r) -> {}, (t, e) -> {});
}
@Override
public SendResult syncSendInTransaction(RocketMqSendMsgBody<T> message, Object arg, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail) {
SendResult result = rocketMQTemplate.sendMessageInTransaction(ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : this.topic,
ObjectEmptyUtils.isNotEmpty(message.getMessageTag()) ? message.getMessageTag() : this.tag,
message.getBody(), buildMessageProperties(message), arg);
if (log.isInfoEnabled() && ObjectEmptyUtils.isNotEmpty(message) && ObjectEmptyUtils.isNotEmpty(message.getMessageKey())) {
log.info("消息:{} 事务写入:{} 成功", message.getMessageKey(), ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : topic);
}
if (!result.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error("消息 {} 事务写入 {} 失败,结果:{}", message.getMessageKey(), ObjectEmptyUtils.isNotEmpty(message.getTopic()) ? message.getTopic() : this.topic, result.getSendStatus());
} else {
onMsgSuccess.call(message.getBody(), result);
}
return result;
}
@Override
public void asyncSend(Collection<RocketMqSendMsgBody<T>> messageBodies) {
if (ObjectEmptyUtils.isEmpty(messageBodies)) {
return;
}
RocketMqSendMsgBody<T> message = messageBodies.stream().findAny().get();
rocketMQTemplate.asyncSend(message.getTopic(), message.getMessageTag(),
messageBodies.stream().filter(ObjectEmptyUtils::isNotEmpty).collect(Collectors.toList()),
n -> ((RocketMqSendMsgBody<T>) n).getBody(),
n -> buildMessageProperties((RocketMqSendMsgBody<T>) n),
rocketMQTemplate.getProducer().getSendMsgTimeout());
}
protected final Map<String, Object> buildMessageProperties(RocketMqSendMsgBody<T> message) {
Map<String, Object> properties = new HashMap<>();
if (ObjectEmptyUtils.isNotEmpty(message.getMessageKey())) {
properties.put(MessageConst.PROPERTY_KEYS, message.getMessageKey());
}
if (ObjectEmptyUtils.isNotEmpty(message.getDelayLevel())) {
properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, message.getDelayLevel());
}
return properties;
}
@Override
public void destroy() {
Optional.ofNullable(this.rocketMQTemplate).ifPresent(r -> r.destroy());
}
}

View File

@@ -0,0 +1,36 @@
package cn.lingniu.framework.plugin.rocketmq.core.producer;
import cn.lingniu.framework.plugin.rocketmq.core.producer.call.SendMessageOnFail;
import cn.lingniu.framework.plugin.rocketmq.core.producer.call.SendMessageOnSuccess;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import java.io.Serializable;
import java.util.Collection;
/**
* 生产者接口
*/
public interface RocketMqProducerInner<T extends Serializable> {
void setRocketMQTemplate(RocketMqTemplate template, String topic);
void asyncSend(RocketMqSendMsgBody<T> message, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail);
void asyncSend(RocketMqSendMsgBody<T> message, SendMessageOnFail<T> onMsgFail);
void asyncSend(RocketMqSendMsgBody<T> message);
SendResult syncSend(RocketMqSendMsgBody<T> message);
SendResult syncSend(RocketMqSendMsgBody<T> message, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail);
SendResult syncSend(RocketMqSendMsgBody<T> message, MessageQueueSelector selector, String selectorKey, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail);
SendResult syncSend(RocketMqSendMsgBody<T> message, String selectorKey, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail);
void asyncSend(Collection<RocketMqSendMsgBody<T>> msgs);
SendResult syncSendInTransaction(RocketMqSendMsgBody<T> message, Object args);
SendResult syncSendInTransaction(RocketMqSendMsgBody<T> message, Object args, SendMessageOnSuccess<T> onMsgSuccess, SendMessageOnFail<T> onMsgFail);
}

View File

@@ -0,0 +1,27 @@
package cn.lingniu.framework.plugin.rocketmq.core.producer;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* mq 发送消息body
*/
@Data
@Accessors(chain = true)
public class RocketMqSendMsgBody<T extends Serializable> {
T body;
String messageKey;
String messageTag;
String topic;
/**
* 级别的定时消息如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
Integer delayLevel;
}

View File

@@ -0,0 +1,326 @@
package cn.lingniu.framework.plugin.rocketmq.core.producer;
import cn.lingniu.framework.plugin.rocketmq.core.RocketMqBodySerializer;
import cn.lingniu.framework.plugin.util.validation.ObjectEmptyUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* RockMQTemplate
*/
@Slf4j
public class RocketMqTemplate implements InitializingBean, DisposableBean {
@Getter
@Setter
private DefaultMQProducer producer;
@Getter
@Setter
private RocketMqBodySerializer serializer;
@Getter
@Setter
private String charset = "UTF-8";
public SendResult syncSend(Message message) {
return this.syncSend(message, producer.getSendMsgTimeout());
}
public SendResult syncSend(Message message, long timeout) {
try {
SendResult sendResult = producer.send(message, timeout);
return sendResult;
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
public SendResult syncSend(String topic, String tag, Object msgObj, Map<String, Object> properties) {
return this.syncSend(topic, tag, msgObj, properties, this.producer.getSendMsgTimeout());
}
/**
* 同步发送消息
* @param topic
* @param tag
* @param msgObj
* @param properties
* @param timeout
* @return
*/
public SendResult syncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, long timeout) {
Message message = createMessage(topic, tag, msgObj, properties);
return this.syncSend(message, timeout);
}
public void asyncSend(Message message, SendCallback sendCallback) {
this.asyncSend(message, sendCallback, producer.getSendMsgTimeout());
}
public void asyncSend(Message message, SendCallback sendCallback, long timeout) {
try {
producer.send(message, sendCallback, timeout);
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
public void asyncSend(String topic, String tag, Collection<Object> message, Function<Object, Object> bodyFun, Function<Object, Map<String, Object>> properties, long timeout) {
List<Message> messages = message.stream().filter(ObjectEmptyUtils::isNotEmpty)
.map(n -> createMessage(topic, tag, bodyFun.apply(n), ObjectEmptyUtils.isNotEmpty(properties) ? properties.apply(n) : null))
.collect(Collectors.toList());
try {
producer.send(messages, timeout);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
public void asyncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, SendCallback sendCallback) {
this.asyncSend(topic, tag, msgObj, properties, sendCallback, producer.getSendMsgTimeout());
}
/**
* 异步发送消息
* @param topic
* @param tag
* @param msgObj
* @param properties
* @param sendCallback
* @param timeout
*/
public void asyncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, SendCallback sendCallback, long timeout) {
Message message = createMessage(topic, tag, msgObj, properties);
this.asyncSend(message, sendCallback, timeout);
}
public void sendOneWay(Message message) {
try {
producer.sendOneway(message);
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* 不关心消息是否送达
* @param topic
* @param tag
* @param msgObj
* @param properties
*/
public void sendOneWay(String topic, String tag, Object msgObj, Map<String, Object> properties) {
Message message = createMessage(topic, tag, msgObj, properties);
sendOneWay(message);
}
public SendResult syncSend(Message message, MessageQueue messageQueue) {
return this.syncSend(message, messageQueue, producer.getSendMsgTimeout());
}
public SendResult syncSend(Message message, MessageQueue messageQueue, long timeout) {
try {
SendResult sendResult = producer.send(message, messageQueue, timeout);
return sendResult;
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
public SendResult syncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueue mq) {
return this.syncSend(topic, tag, msgObj, properties, mq, producer.getSendMsgTimeout());
}
/**
* 同步发送到指定队列
* @param topic
* @param tag
* @param msgObj
* @param properties
* @param mq
* @param timeout
*/
public SendResult syncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueue mq, long timeout) {
Message message = createMessage(topic, tag, msgObj, properties);
return this.syncSend(message, mq, timeout);
}
public void asyncSend(Message message, MessageQueue messageQueue, SendCallback sendCallback) {
this.asyncSend(message, messageQueue, sendCallback, producer.getSendMsgTimeout());
}
public void asyncSend(Message message, MessageQueue messageQueue, SendCallback sendCallback, long timeout) {
try {
producer.send(message, messageQueue, sendCallback, timeout);
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* 异步发送到指定队列
*
* @param topic
* @param tag
* @param msgObj
* @param properties
* @param mq
* @param sendCallback
*/
public void asyncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueue mq, SendCallback sendCallback) {
this.asyncSend(topic, tag, msgObj, properties, mq, sendCallback, producer.getSendMsgTimeout());
}
public void asyncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueue mq, SendCallback sendCallback, long timeout) {
Message message = createMessage(topic, tag, msgObj, properties);
this.asyncSend(message, mq, sendCallback, timeout);
}
/**
* 不关心是否送达到指定队列
*
* @param message
* @param messageQueue
*/
public void sendOneWay(Message message, MessageQueue messageQueue) {
try {
producer.sendOneway(message, messageQueue);
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
public void sendOneWay(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueue mq) {
Message message = createMessage(topic, tag, msgObj, properties);
this.sendOneWay(message, mq);
}
public SendResult syncSend(Message message, MessageQueueSelector selector, Object arg) {
return syncSend(message, selector, arg, producer.getSendMsgTimeout());
}
public SendResult syncSend(Message message, MessageQueueSelector selector, Object arg, long timeout) {
try {
SendResult sendResult = producer.send(message, selector, arg, timeout);
return sendResult;
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
public SendResult syncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueueSelector selector, Object arg) {
return this.syncSend(topic, tag, msgObj, properties, selector, arg, producer.getSendMsgTimeout());
}
public SendResult syncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueueSelector selector, Object arg, long timeout) {
Message message = createMessage(topic, tag, msgObj, properties);
return this.syncSend(message, selector, arg, timeout);
}
public void asyncSend(Message message, MessageQueueSelector selector, Object arg, SendCallback sendCallback) {
this.asyncSend(message, selector, arg, sendCallback, producer.getSendMsgTimeout());
}
public void asyncSend(Message message, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) {
try {
producer.send(message, selector, arg, sendCallback, timeout);
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
public void asyncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueueSelector selector, Object arg, SendCallback sendCallback) {
this.asyncSend(topic, tag, msgObj, properties, selector, arg, sendCallback, producer.getSendMsgTimeout());
}
public void asyncSend(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) {
Message message = createMessage(topic, tag, msgObj, properties);
this.asyncSend(message, selector, arg, sendCallback, timeout);
}
public TransactionSendResult sendMessageInTransaction(String topic, String tag, Object msgObj, Map<String, Object> properties, Object arg) {
Message message = createMessage(topic, tag, msgObj, properties);
try {
TransactionSendResult result = producer.sendMessageInTransaction(message, arg);
return result;
} catch (Exception e) {
log.error("send transaction message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
public void sendOneWay(Message message, MessageQueueSelector selector, Object arg) {
try {
producer.sendOneway(message, selector, arg);
} catch (Exception e) {
log.error("send message failed. destination:{}, message:{} ", message.getTopic() + ":" + message.getTags(), message);
throw new RuntimeException(e.getMessage(), e);
}
}
public void sendOneWay(String topic, String tag, Object msgObj, Map<String, Object> properties, MessageQueueSelector selector, Object arg) {
Message message = createMessage(topic, tag, msgObj, properties);
this.sendOneWay(message, selector, arg);
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(producer, "Property 'producer' is required");
producer.start();
}
@Override
public void destroy() {
Optional.ofNullable(producer).ifPresent(p -> p.shutdown());
}
private Message createMessage(String topic, String tag, Object msgObj, Map<String, Object> properties) {
Message rocketMsg = new Message(topic, tag, serializer.serialize(msgObj));
if (!CollectionUtils.isEmpty(properties)) {
rocketMsg.setFlag((Integer) properties.getOrDefault("FLAG", 0));
rocketMsg.setWaitStoreMsgOK((Boolean) properties.getOrDefault(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, true));
Optional.ofNullable((String) properties.get(MessageConst.PROPERTY_KEYS)).ifPresent(keys -> rocketMsg.setKeys(keys));
Optional.ofNullable((Integer) properties.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL)).ifPresent(delay -> rocketMsg.setDelayTimeLevel(delay));
properties.entrySet().stream().filter(entry -> !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !Objects.equals(entry.getKey(), "FLAG"))
.forEach(entry -> {rocketMsg.putUserProperty(entry.getKey(), String.valueOf(entry.getValue()));});
}
return rocketMsg;
}
}

View File

@@ -0,0 +1,8 @@
package cn.lingniu.framework.plugin.rocketmq.core.producer.call;
@FunctionalInterface
public interface SendMessageOnFail<T> {
void call(T message, Throwable ex);
}

View File

@@ -0,0 +1,9 @@
package cn.lingniu.framework.plugin.rocketmq.core.producer.call;
import org.apache.rocketmq.client.producer.SendResult;
@FunctionalInterface
public interface SendMessageOnSuccess<T> {
void call(T message, SendResult result);
}

View File

@@ -0,0 +1,48 @@
package cn.lingniu.framework.plugin.rocketmq.core.producer.call;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @description: TransactionListenerImpl
**/
@Slf4j
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

View File

@@ -0,0 +1,33 @@
package cn.lingniu.framework.plugin.rocketmq.init;
import cn.lingniu.framework.plugin.rocketmq.config.RocketMqConfig;
import cn.lingniu.framework.plugin.rocketmq.core.RocketMqBodyJacksonSerializer;
import cn.lingniu.framework.plugin.rocketmq.core.RocketMqBodySerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* RocketMQ 自动装载
*/
@Configuration
@EnableConfigurationProperties(RocketMqConfig.class)
@ConditionalOnClass({MQClientAPIImpl.class, DefaultMQPushConsumer.class})
@Import({RocketMqStartAutoConfiguration.class})
@Slf4j
public class RocketMqInit {
@Bean
@ConditionalOnMissingBean
public RocketMqBodySerializer rocketMQSerializer() {
return new RocketMqBodyJacksonSerializer();
}
}

View File

@@ -0,0 +1,208 @@
package cn.lingniu.framework.plugin.rocketmq.init;
import cn.lingniu.framework.plugin.core.context.SpringBeanApplicationContext;
import cn.lingniu.framework.plugin.rocketmq.RocketMqConsumer;
import cn.lingniu.framework.plugin.rocketmq.RocketMqProducer;
import cn.lingniu.framework.plugin.rocketmq.core.consumer.DefaultRocketMqListenerContainer;
import cn.lingniu.framework.plugin.rocketmq.core.consumer.listener.RocketMqListener;
import cn.lingniu.framework.plugin.rocketmq.config.ConsumerProperties;
import cn.lingniu.framework.plugin.rocketmq.config.ProducerProperties;
import cn.lingniu.framework.plugin.rocketmq.config.RocketMqConfig;
import cn.lingniu.framework.plugin.rocketmq.core.producer.RocketMqProducerInner;
import cn.lingniu.framework.plugin.rocketmq.core.producer.RocketMqTemplate;
import cn.lingniu.framework.plugin.rocketmq.core.RocketMqBodySerializer;
import cn.lingniu.framework.plugin.util.validation.ObjectEmptyUtils;
import cn.lingniu.framework.plugin.util.string.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.util.Assert;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
@Slf4j
@Configuration
@ConditionalOnClass(DefaultMQPushConsumer.class)
@EnableConfigurationProperties(RocketMqConfig.class)
public class RocketMqStartAutoConfiguration implements ApplicationContextAware {
@Autowired
private RocketMqConfig rocketMQConfig;
@Autowired
private RocketMqBodySerializer mqSerializer;
private AtomicLong counter = new AtomicLong(0);
private ConfigurableApplicationContext applicationContext;
@Async
@Order(1000)
@EventListener(WebServerInitializedEvent.class)
public void afterStart(WebServerInitializedEvent event) {
if ("management".equalsIgnoreCase(event.getApplicationContext().getServerNamespace())) {
return;
}
startContainer();
}
public void startContainer() {
Map<String, RocketMqProducer> producerBeans = SpringBeanApplicationContext.getBeans(RocketMqProducer.class);
Optional.ofNullable(producerBeans).ifPresent(b -> b.forEach((k, v) -> {
List<String> keys = Arrays.asList(StringUtil.split(v.getClass().getName(), "."));
if (ObjectEmptyUtils.isNotEmpty(keys)) {
String beanName = keys.get(keys.size() - 1);
if (ObjectEmptyUtils.isNotEmpty(rocketMQConfig.getProducers())) {
String mapName = rocketMQConfig.getProducers().keySet().stream().filter(m -> m.equalsIgnoreCase(beanName)).findAny().orElse(null);
if (ObjectEmptyUtils.isNotEmpty(mapName)) {
registerContainer(k, v, rocketMQConfig.getProducers().get(mapName));
} else {
log.error("{} 生产端配置异常,未开启!", k);
}
}
}
}));
//region 配置消费端
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMqConsumer.class);
Optional.ofNullable(beans).ifPresent(b -> b.forEach((k, v) -> {
List<String> keys = Arrays.asList(StringUtil.split(replacePattern(v.toString(), "\\@[A-Za-z0-9]+$", ""), "."));
if (ObjectEmptyUtils.isNotEmpty(keys)) {
String beanName = keys.get(keys.size() - 1);
if (ObjectEmptyUtils.isNotEmpty(rocketMQConfig.getConsumers())
&& (rocketMQConfig.getConsumers().entrySet().stream().anyMatch(n -> beanName.equalsIgnoreCase(n.getKey()))
|| rocketMQConfig.getConsumers().entrySet().stream().anyMatch(n -> beanName.equalsIgnoreCase(n.getValue().getConsumerBeanName())))) {
if (rocketMQConfig.getConsumers().entrySet().stream().anyMatch(n -> beanName.equalsIgnoreCase(n.getValue().getConsumerBeanName()))) {
registerContainer(k, v, rocketMQConfig.getConsumers().entrySet().stream().filter(n -> beanName.equalsIgnoreCase(n.getValue().getConsumerBeanName()))
.findAny().get().getValue());
return;
}
registerContainer(k, v, rocketMQConfig.getConsumers().entrySet().stream().filter(r -> beanName.equalsIgnoreCase(r.getKey())).findAny().get().getValue());
} else {
log.error("{} 消费端配置异常,未开启!", k);
}
}
}));
}
private void registerContainer(String beanName, Object bean, ProducerProperties producerProperties) {
Class<?> clazz = AopUtils.getTargetClass(bean);
if (!RocketMqProducerInner.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMqProducerInner.class.getName());
}
RocketMqProducerInner producer = (RocketMqProducerInner) bean;
String groupName = producerProperties.getGroup();
Assert.hasText(groupName, "[framework.lingniu.rocketmq.producers.xxx.group] must not be null");
DefaultMQProducer defaultMQProducer = !(producer instanceof TransactionListener) ?
new DefaultMQProducer(producerProperties.getGroup())
: new TransactionMQProducer(producerProperties.getGroup());
if (producer instanceof TransactionListener) {
((TransactionMQProducer) defaultMQProducer).setExecutorService(new ThreadPoolExecutor(
producerProperties.getTransaction().getCorePoolSize(), producerProperties.getTransaction().getMaximumPoolSize(),
producerProperties.getTransaction().getKeepAliveTime(), TimeUnit.SECONDS,
new ArrayBlockingQueue<>(producerProperties.getTransaction().getQueueCapacity()), r -> {
Thread thread = new Thread(r);
thread.setName(String.format("client-transaction-msg-check-thread-%s", groupName));
return thread;
})
);
((TransactionMQProducer) defaultMQProducer).setTransactionListener((TransactionListener) producer);
}
defaultMQProducer.setNamesrvAddr(producerProperties.getNameServer());
defaultMQProducer.setSendMsgTimeout(producerProperties.getSendMsgTimeout());
defaultMQProducer.setRetryTimesWhenSendFailed(producerProperties.getRetryTimesWhenSendFailed());
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(producerProperties.getRetryTimesWhenSendAsyncFailed());
defaultMQProducer.setMaxMessageSize(producerProperties.getMaxMessageSize());
defaultMQProducer.setCompressMsgBodyOverHowmuch(producerProperties.getCompressMsgBodyOverHowmuch());
defaultMQProducer.setRetryAnotherBrokerWhenNotStoreOK(producerProperties.isRetryAnotherBrokerWhenNotStoreOk());
defaultMQProducer.setUnitName(producerProperties.getClientUnitName());
defaultMQProducer.setSendLatencyFaultEnable(producerProperties.isSendLatencyFaultEnable());
RocketMqTemplate template = new RocketMqTemplate();
template.setProducer(defaultMQProducer);
template.setSerializer(mqSerializer);
producer.setRocketMQTemplate(template, producerProperties.getTopic());
try {
template.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void registerContainer(String beanName, Object bean, ConsumerProperties consumerProperties) {
Class<?> clazz = AopUtils.getTargetClass(bean);
if (!RocketMqListener.class.isAssignableFrom(bean.getClass())) {
log.warn("{} is not instance of {}", clazz, RocketMqListener.class.getName());
return;
}
RocketMqListener rocketMQListener = (RocketMqListener) bean;
BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultRocketMqListenerContainer.class);
if (ObjectEmptyUtils.isNotEmpty(consumerProperties)) {
beanBuilder.addPropertyValue("nameServer", consumerProperties.getNameServer());
beanBuilder.addPropertyValue("topic", consumerProperties.getTopic());
beanBuilder.addPropertyValue("consumerGroup", consumerProperties.getConsumerGroup());
beanBuilder.addPropertyValue("consumeMode", consumerProperties.getConsumeMode());
beanBuilder.addPropertyValue("consumeThreadMax", consumerProperties.getConsumeThreadMax());
beanBuilder.addPropertyValue("consumeThreadMin", consumerProperties.getConsumeThreadMin());
beanBuilder.addPropertyValue("messageModel", consumerProperties.getMessageModel());
beanBuilder.addPropertyValue("selectorExpress", consumerProperties.getSelectorExpress());
beanBuilder.addPropertyValue("selectorType", consumerProperties.getSelectorType());
beanBuilder.addPropertyValue("rocketMQListener", rocketMQListener);
beanBuilder.addPropertyValue("consumeMessageBatchMaxSize", consumerProperties.getConsumeMessageBatchMaxSize());
beanBuilder.addPropertyValue("pullBatchSize", consumerProperties.getPullBatchSize());
beanBuilder.addPropertyValue("maxRetryTimes", consumerProperties.getMaxRetryTimes());
beanBuilder.addPropertyValue("isBatchMode", consumerProperties.getConsumerbatchMode());
beanBuilder.addPropertyValue("clientUnitName", consumerProperties.getClientUnitName());
} else {
log.error("消费端 {} 缺少配置信息,请查看!", rocketMQListener.getClass().getSimpleName());
}
beanBuilder.addPropertyValue("serializer", mqSerializer);
beanBuilder.setDestroyMethodName("destroy");
String containerBeanName = String.format("%s_%s", DefaultRocketMqListenerContainer.class.getName(), counter.incrementAndGet());
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getBeanFactory();
beanFactory.registerBeanDefinition(containerBeanName, beanBuilder.getBeanDefinition());
DefaultRocketMqListenerContainer container = beanFactory.getBean(containerBeanName, DefaultRocketMqListenerContainer.class);
if (!container.isStarted()) {
try {
container.start();
} catch (Exception e) {
log.error("started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
private String replacePattern(final String source, final String regex, final String replacement) {
return Pattern.compile(regex, Pattern.DOTALL).matcher(source).replaceAll(replacement);
}
}

View File

@@ -0,0 +1,3 @@
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.lingniu.framework.plugin.rocketmq.init.RocketMqInit

View File

@@ -0,0 +1,118 @@
# 【重要】RocketMq服务端搭建和更多详细资料---参考官网
## 概述 (Overview)
1. 基于 RocketMqClient 封装的分布式消息处理组件
2. 核心能力
* 消息处理能力
- 双模式监听:支持批量消息处理(MRocketMQListener)和单条消息处理(RocketMQListener)
- 生产端支持:通过继承 GeneralMqProducer 快速实现消息生产功能
- 消费端支持:通过继承 GeneralMsgListener 和实现监听接口处理消息
- 多样化消息类型:支持普通消息、定时消息、延迟消息等多种消息类型
* 配置管理
- 统一配置前缀:所有配置项使用 framework.lingniu.rocketmq 作为根路径
- NameServer配置支持直接配置 nameServer 地址连接RocketMQ集群
- 消费策略配置:支持配置消费线程数、批量大小、重试次数等参数
3. 适用场景:
- 异步任务处理:数据同步等后台任务处理
- 系统解耦:微服务间通过消息队列进行松耦合通信
- 流量削峰:应对突发业务高峰,平滑处理大量消息
- 可靠消息传输:通过重试机制保证重要业务消息不丢失
## 如何配置--更多参数参考RedissonConfig/RedissonProperties
```yaml 最小配置例子
framework:
lingniu:
# rocketmq 最小化配置示例
rocketmq:
consumers:
TestC1Consumer:
nameServer: mq_test_n1.tst.mid:9876
consumerGroup: consumer1
topic: test1-yw-topic
consumerBeanName: TestC1Consumer
producers:
TestMqProducer:
nameServer: mq_test_n1.tst.mid:9876
group: test1-yw-topic-producer
topic: test1-yw-topic
TestTransProducer:
nameServer: mq_test_n1.tst.mid:9876
group: test1-yw-trans-topic-producer
topic: test1-yw-trans-topic
isTransactionMQ: true
```
```yaml 详细配置
framework:
lingniu:
# rocketmq所有配置示例
rocketmq:
# 是否开启数据源加密
encryptEnabled: false
# 消费者配置
consumers:
# 消费者bean名称
consumer1:
nameServer: mq_test_n1.tst.mid:9876
# 消费组
consumerGroup: consumer1
# 消费对象名字
consumerBeanName: consumer1
# Topic
topic: test1-yw-topic
# 订阅表达式,默认为 *
selectorExpress: "*"
# 消费者最小线程数
consumeThreadMin: 20
# 消费者最大线程数
consumeThreadMax: 32
# 批量消费数量
consumeMessageBatchMaxSize: 1
# 批量拉取消息数量
pullBatchSize: 32
# 消息的最大重试次数
maxRetryTimes: 16
# 生产者配置
producers:
# 生产者的bean名称
TestMqProducer:
nameServer: mq_test_n1.tst.mid:9876
# 生产者组
group: test1-yw-topic-producer
# 生产端Topic
topic: test1-yw-topic
# 发送超时ms默认 3000
sendMsgTimeout: 3000
# 消息体压缩阀值kb默认 4096
compressMsgBodyOverHowmuch: 4096
# 同步发送消息失败,是否重新发送,默认 2
retryTimesWhenSendFailed: 2
# 异步发送消息失败,是否重新发送,默认 2
retryTimesWhenSendAsyncFailed: 2
# 重试另一个Broker当发送消息失败, 默认 false
retryAnotherBrokerWhenNotStoreOk: false
# 默认不启用延迟容错通过统计每个队列的发送耗时情况来计算broker是否可用
sendLatencyFaultEnable: false
TestTransMqProducer:
group: test1-yw-trans-topic-producer
topic: test1-yw-trans-topic
# 是否事务型
isTransactionMQ: true
# 事务消息线程配置
transaction:
corePoolSize: 5
# 本地事务执行结果查询线程池配置
maximumPoolSize: 10
keepAliveTime: 200
queueCapacity: 2000
```