feat:【IoT 物联网】新版本同步

This commit is contained in:
YunaiV
2025-08-30 09:34:40 +08:00
parent d8fbd0f6c5
commit a89b6d14a8
500 changed files with 19983 additions and 13090 deletions

View File

@@ -69,9 +69,8 @@ public class YudaoRedisMQConsumerAutoConfiguration {
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
RedisMQTemplate redisTemplate,
@Value("${spring.application.name}") String groupName,
RedissonClient redissonClient) {
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
return new RedisPendingMessageResendJob(listeners, redisTemplate, redissonClient);
}
/**
@@ -141,14 +140,14 @@ public class YudaoRedisMQConsumerAutoConfiguration {
*
* @return 消费者名字
*/
private static String buildConsumerName() {
public static String buildConsumerName() {
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
}
/**
* 校验 Redis 版本号,是否满足最低的版本号要求!
*/
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
public static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
// 获得 Redis 版本
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
String version = MapUtil.getStr(info, "redis_version");

View File

@@ -35,7 +35,6 @@ public class RedisPendingMessageResendJob {
private final List<AbstractRedisStreamMessageListener<?>> listeners;
private final RedisMQTemplate redisTemplate;
private final String groupName;
private final RedissonClient redissonClient;
/**
@@ -64,13 +63,13 @@ public class RedisPendingMessageResendJob {
private void execute() {
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
listeners.forEach(listener -> {
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), listener.getGroup()));
// 每个消费者的 pending 队列消息数量
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
// 每个消费者的 pending消息的详情信息
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(listener.getGroup(), consumerName), Range.unbounded(), pendingMessageCount);
if (pendingMessages.isEmpty()) {
return;
}
@@ -91,7 +90,7 @@ public class RedisPendingMessageResendJob {
.ofObject(records.get(0).getValue()) // 设置内容
.withStreamKey(listener.getStreamKey()));
// ack 消息消费完成
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
redisTemplate.getRedisTemplate().opsForStream().acknowledge(listener.getGroup(), records.get(0));
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
});
});

View File

@@ -53,6 +53,12 @@ public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedis
this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
}
protected AbstractRedisStreamMessageListener(String streamKey, String group) {
this.messageType = null;
this.streamKey = streamKey;
this.group = group;
}
@Override
public void onMessage(ObjectRecord<String, String> message) {
// 消费消息