【同步】BOOT 和 CLOUD 的功能

This commit is contained in:
YunaiV
2025-10-02 17:51:49 +08:00
parent ec3a391981
commit f02c004736
90 changed files with 3143 additions and 1365 deletions

View File

@@ -17,6 +17,8 @@ import lombok.Data;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = IotDataSinkHttpConfig.class, name = "1"),
@JsonSubTypes.Type(value = IotDataSinkTcpConfig.class, name = "2"),
@JsonSubTypes.Type(value = IotDataSinkWebSocketConfig.class, name = "3"),
@JsonSubTypes.Type(value = IotDataSinkMqttConfig.class, name = "10"),
@JsonSubTypes.Type(value = IotDataSinkRedisConfig.class, name = "21"),
@JsonSubTypes.Type(value = IotDataSinkRocketMQConfig.class, name = "30"),

View File

@@ -0,0 +1,63 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT TCP 配置 {@link IotAbstractDataSinkConfig} 实现类
*
* @author HUIHUI
*/
@Data
public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
/**
* TCP 服务器地址
*/
private String host;
/**
* TCP 服务器端口
*/
private Integer port;
/**
* 连接超时时间(毫秒)
*/
private Integer connectTimeoutMs = 5000;
/**
* 读取超时时间(毫秒)
*/
private Integer readTimeoutMs = 10000;
/**
* 是否启用 SSL
*/
private Boolean ssl = false;
/**
* SSL 证书路径(当 ssl=true 时需要)
*/
private String sslCertPath;
/**
* 数据格式JSON 或 BINARY
*/
private String dataFormat = "JSON";
/**
* 心跳间隔时间毫秒0 表示不启用心跳
*/
private Long heartbeatIntervalMs = 30000L;
/**
* 重连间隔时间(毫秒)
*/
private Long reconnectIntervalMs = 5000L;
/**
* 最大重连次数
*/
private Integer maxReconnectAttempts = 3;
}

View File

@@ -0,0 +1,87 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule.config;
import lombok.Data;
/**
* IoT WebSocket 配置 {@link IotAbstractDataSinkConfig} 实现类
* <p>
* 配置设备消息通过 WebSocket 协议发送到外部 WebSocket 服务器
* 支持 WebSocket (ws://) 和 WebSocket Secure (wss://) 连接
*
* @author HUIHUI
*/
@Data
public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
/**
* WebSocket 服务器地址
* 例如ws://localhost:8080/ws 或 wss://example.com/ws
*/
private String serverUrl;
/**
* 连接超时时间(毫秒)
*/
private Integer connectTimeoutMs = 5000;
/**
* 发送超时时间(毫秒)
*/
private Integer sendTimeoutMs = 10000;
/**
* 心跳间隔时间毫秒0 表示不启用心跳
*/
private Long heartbeatIntervalMs = 30000L;
/**
* 心跳消息内容JSON 格式)
*/
private String heartbeatMessage = "{\"type\":\"heartbeat\"}";
/**
* 子协议列表(逗号分隔)
*/
private String subprotocols;
/**
* 自定义请求头JSON 格式)
*/
private String customHeaders;
/**
* 是否启用 SSL 证书验证(仅对 wss:// 生效)
*/
private Boolean verifySslCert = true;
/**
* 数据格式JSON 或 TEXT
*/
private String dataFormat = "JSON";
/**
* 重连间隔时间(毫秒)
*/
private Long reconnectIntervalMs = 5000L;
/**
* 最大重连次数
*/
private Integer maxReconnectAttempts = 3;
/**
* 是否启用压缩
*/
private Boolean enableCompression = false;
/**
* 消息发送重试次数
*/
private Integer sendRetryCount = 1;
/**
* 消息发送重试间隔(毫秒)
*/
private Long sendRetryIntervalMs = 1000L;
}

View File

@@ -41,7 +41,7 @@ public class IotAlertConfigServiceImpl implements IotAlertConfigService {
public Long createAlertConfig(IotAlertConfigSaveReqVO createReqVO) {
// 校验关联数据是否存在
sceneRuleService.validateSceneRuleList(createReqVO.getSceneRuleIds());
adminUserApi.validateUserList(createReqVO.getReceiveUserIds());
adminUserApi.validateUserList(createReqVO.getReceiveUserIds()).checkError();
// 插入
IotAlertConfigDO alertConfig = BeanUtils.toBean(createReqVO, IotAlertConfigDO.class);
@@ -55,7 +55,7 @@ public class IotAlertConfigServiceImpl implements IotAlertConfigService {
validateAlertConfigExists(updateReqVO.getId());
// 校验关联数据是否存在
sceneRuleService.validateSceneRuleList(updateReqVO.getSceneRuleIds());
adminUserApi.validateUserList(updateReqVO.getReceiveUserIds());
adminUserApi.validateUserList(updateReqVO.getReceiveUserIds()).checkError();
// 更新
IotAlertConfigDO updateObj = BeanUtils.toBean(updateReqVO, IotAlertConfigDO.class);

View File

@@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkKafkaConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;

View File

@@ -0,0 +1,91 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Duration;
/**
* TCP 的 {@link IotDataRuleAction} 实现类
* <p>
* 负责将设备消息发送到外部 TCP 服务器
* 支持普通 TCP 和 SSL TCP 连接,支持 JSON 和 BINARY 数据格式
* 使用连接池管理 TCP 连接,提高性能和资源利用率
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotTcpDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkTcpConfig, IotTcpClient> {
private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(5);
private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10);
@Override
public Integer getType() {
return IotDataSinkTypeEnum.TCP.getType();
}
@Override
protected IotTcpClient initProducer(IotDataSinkTcpConfig config) throws Exception {
// 1.1 参数校验
if (config.getHost() == null || config.getHost().trim().isEmpty()) {
throw new IllegalArgumentException("TCP 服务器地址不能为空");
}
if (config.getPort() == null || config.getPort() <= 0 || config.getPort() > 65535) {
throw new IllegalArgumentException("TCP 服务器端口无效");
}
// 2.1 创建 TCP 客户端
IotTcpClient tcpClient = new IotTcpClient(
config.getHost(),
config.getPort(),
config.getConnectTimeoutMs(),
config.getReadTimeoutMs(),
config.getSsl(),
config.getSslCertPath(),
config.getDataFormat()
);
// 2.2 连接服务器
tcpClient.connect();
log.info("[initProducer][TCP 客户端创建并连接成功,服务器: {}:{}SSL: {},数据格式: {}]",
config.getHost(), config.getPort(), config.getSsl(), config.getDataFormat());
return tcpClient;
}
@Override
protected void closeProducer(IotTcpClient producer) throws Exception {
if (producer != null) {
producer.close();
}
}
@Override
protected void execute(IotDeviceMessage message, IotDataSinkTcpConfig config) throws Exception {
try {
// 1.1 获取或创建 TCP 客户端
IotTcpClient tcpClient = getProducer(config);
// 1.2 检查连接状态,如果断开则重新连接
if (!tcpClient.isConnected()) {
log.warn("[execute][TCP 连接已断开,尝试重新连接,服务器: {}:{}]", config.getHost(), config.getPort());
tcpClient.connect();
}
// 2.1 发送消息并等待结果
tcpClient.sendMessage(message);
// 2.2 记录发送成功日志
log.info("[execute][message({}) config({}) 发送成功TCP 服务器: {}:{}]",
message, config, config.getHost(), config.getPort());
} catch (Exception e) {
log.error("[execute][message({}) config({}) 发送失败TCP 服务器: {}:{}]",
message, config, config.getHost(), config.getPort(), e);
throw e;
}
}
}

View File

@@ -0,0 +1,184 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import javax.net.ssl.SSLSocketFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* IoT TCP 客户端
* <p>
* 负责与外部 TCP 服务器建立连接并发送设备消息
* 支持 JSON 和 BINARY 两种数据格式,支持 SSL 加密连接
*
* @author HUIHUI
*/
@Slf4j
public class IotTcpClient {
private final String host;
private final Integer port;
private final Integer connectTimeoutMs;
private final Integer readTimeoutMs;
private final Boolean ssl;
private final String sslCertPath;
private final String dataFormat;
private Socket socket;
private OutputStream outputStream;
private BufferedReader reader;
private final AtomicBoolean connected = new AtomicBoolean(false);
// TODO @puhui999default 值IotDataSinkTcpConfig.java 枚举起来哈;
public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs,
Boolean ssl, String sslCertPath, String dataFormat) {
this.host = host;
this.port = port;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000;
this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : 10000;
this.ssl = ssl != null ? ssl : false;
this.sslCertPath = sslCertPath;
this.dataFormat = dataFormat != null ? dataFormat : "JSON";
}
/**
* 连接到 TCP 服务器
*/
public void connect() throws Exception {
if (connected.get()) {
log.warn("[connect][TCP 客户端已经连接,无需重复连接]");
return;
}
try {
if (ssl) {
// SSL 连接
SSLSocketFactory sslSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
socket = sslSocketFactory.createSocket();
} else {
// 普通连接
socket = new Socket();
}
// 连接服务器
socket.connect(new InetSocketAddress(host, port), connectTimeoutMs);
socket.setSoTimeout(readTimeoutMs);
// 获取输入输出流
outputStream = socket.getOutputStream();
reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// 更新状态
connected.set(true);
log.info("[connect][TCP 客户端连接成功,服务器地址: {}:{}]", host, port);
} catch (Exception e) {
close();
log.error("[connect][TCP 客户端连接失败,服务器地址: {}:{}]", host, port, e);
throw e;
}
}
/**
* 发送设备消息
*
* @param message 设备消息
* @throws Exception 发送异常
*/
public void sendMessage(IotDeviceMessage message) throws Exception {
if (!connected.get()) {
throw new IllegalStateException("TCP 客户端未连接");
}
try {
// TODO @puhui999枚举值
String messageData;
if ("JSON".equalsIgnoreCase(dataFormat)) {
// JSON 格式
messageData = JsonUtils.toJsonString(message);
} else {
// BINARY 格式(这里简化为字符串,实际可能需要自定义二进制协议)
messageData = message.toString();
}
// 发送消息
outputStream.write(messageData.getBytes(StandardCharsets.UTF_8));
outputStream.write('\n'); // 添加换行符作为消息分隔符
outputStream.flush();
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
message.getDeviceId(), messageData.length());
} catch (Exception e) {
log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e);
throw e;
}
}
/**
* 关闭连接
*/
public void close() {
if (!connected.get()) {
return;
}
try {
// 关闭资源
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
log.warn("[close][关闭输入流失败]", e);
}
}
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
log.warn("[close][关闭输出流失败]", e);
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
log.warn("[close][关闭 Socket 失败]", e);
}
}
// 更新状态
connected.set(false);
log.info("[close][TCP 客户端连接已关闭,服务器地址: {}:{}]", host, port);
} catch (Exception e) {
log.error("[close][关闭 TCP 客户端连接异常]", e);
}
}
/**
* 检查连接状态
*
* @return 是否已连接
*/
public boolean isConnected() {
return connected.get() && socket != null && !socket.isClosed();
}
@Override
public String toString() {
return "IotTcpClient{" +
"host='" + host + '\'' +
", port=" + port +
", ssl=" + ssl +
", dataFormat='" + dataFormat + '\'' +
", connected=" + connected.get() +
'}';
}
}

View File

@@ -16,14 +16,17 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotSceneRuleMapper;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.framework.job.core.IotSchedulerManager;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import cn.iocoder.yudao.module.iot.service.rule.scene.action.IotSceneRuleAction;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherManager;
import cn.iocoder.yudao.module.iot.service.rule.scene.timer.IotSceneRuleTimerHandler;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
@@ -47,9 +50,6 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
@Resource
private IotSceneRuleMapper sceneRuleMapper;
// TODO @puhui999定时任务基于它调度
@Resource(name = "iotSchedulerManager")
private IotSchedulerManager schedulerManager;
@Resource
private IotProductService productService;
@Resource
@@ -59,38 +59,68 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
private IotSceneRuleMatcherManager sceneRuleMatcherManager;
@Resource
private List<IotSceneRuleAction> sceneRuleActions;
@Resource
private IotSceneRuleTimerHandler timerHandler;
@Override
@CacheEvict(value = RedisKeyConstants.SCENE_RULE_LIST, allEntries = true)
public Long createSceneRule(IotSceneRuleSaveReqVO createReqVO) {
IotSceneRuleDO sceneRule = BeanUtils.toBean(createReqVO, IotSceneRuleDO.class);
sceneRuleMapper.insert(sceneRule);
// 注册定时触发器
timerHandler.registerTimerTriggers(sceneRule);
return sceneRule.getId();
}
@Override
@CacheEvict(value = RedisKeyConstants.SCENE_RULE_LIST, allEntries = true)
public void updateSceneRule(IotSceneRuleSaveReqVO updateReqVO) {
// 校验存在
validateSceneRuleExists(updateReqVO.getId());
// 更新
IotSceneRuleDO updateObj = BeanUtils.toBean(updateReqVO, IotSceneRuleDO.class);
sceneRuleMapper.updateById(updateObj);
// 更新定时触发器
timerHandler.updateTimerTriggers(updateObj);
}
@Override
@CacheEvict(value = RedisKeyConstants.SCENE_RULE_LIST, allEntries = true)
public void updateSceneRuleStatus(Long id, Integer status) {
// 校验存在
// 1. 校验存在
validateSceneRuleExists(id);
// 更新状态
// 2. 更新状态
IotSceneRuleDO updateObj = new IotSceneRuleDO().setId(id).setStatus(status);
sceneRuleMapper.updateById(updateObj);
// 3. 根据状态管理定时触发器
if (CommonStatusEnum.isEnable(status)) {
// 启用时,获取完整的场景规则信息并注册定时触发器
IotSceneRuleDO sceneRule = sceneRuleMapper.selectById(id);
if (sceneRule != null) {
timerHandler.registerTimerTriggers(sceneRule);
}
} else {
// 禁用时,暂停定时触发器
timerHandler.pauseTimerTriggers(id);
}
}
@Override
@CacheEvict(value = RedisKeyConstants.SCENE_RULE_LIST, allEntries = true)
public void deleteSceneRule(Long id) {
// 校验存在
// 1. 校验存在
validateSceneRuleExists(id);
// 删除
// 2. 删除
sceneRuleMapper.deleteById(id);
// 3. 删除定时触发器
timerHandler.unregisterTimerTriggers(id);
}
private void validateSceneRuleExists(Long id) {
@@ -126,16 +156,14 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
return sceneRuleMapper.selectListByStatus(status);
}
// TODO 芋艿,缓存待实现 @puhui999
@Override
@Cacheable(value = RedisKeyConstants.SCENE_RULE_LIST, key = "#productId + '_' + #deviceId ")
@TenantIgnore // 忽略租户隔离:因为 IotSceneRuleMessageHandler 调用时,一般未传递租户,所以需要忽略
public List<IotSceneRuleDO> getSceneRuleListByProductIdAndDeviceIdFromCache(Long productId, Long deviceId) {
List<IotSceneRuleDO> list = sceneRuleMapper.selectList();
// 只返回启用状态的规则场景
List<IotSceneRuleDO> enabledList = filterList(list,
sceneRule -> CommonStatusEnum.isEnable(sceneRule.getStatus()));
// 1. 查询启用状态的规则场景
List<IotSceneRuleDO> enabledList = sceneRuleMapper.selectList(IotSceneRuleDO::getStatus, CommonStatusEnum.ENABLE.getStatus());
// 根据 productKey 和 deviceName 进行匹配
// 2. 根据 productKey 和 deviceName 进行匹配
return filterList(enabledList, sceneRule -> {
if (CollUtil.isEmpty(sceneRule.getTriggers())) {
return false;
@@ -144,21 +172,19 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
for (IotSceneRuleDO.Trigger trigger : sceneRule.getTriggers()) {
// 检查触发器是否匹配指定的产品和设备
try {
// 1. 检查产品是否匹配
if (trigger.getProductId() == null) {
return false;
}
if (trigger.getDeviceId() == null) {
// 检查产品是否匹配
if (trigger.getProductId() == null || trigger.getDeviceId() == null) {
return false;
}
// 检查是否是全部设备的特殊标识
if (IotDeviceDO.DEVICE_ID_ALL.equals(trigger.getDeviceId())) {
return true; // 匹配所有设备
return true;
}
// 检查具体设备 ID 是否匹配
return ObjUtil.equal(productId, trigger.getProductId()) && ObjUtil.equal(deviceId, trigger.getDeviceId());
} catch (Exception e) {
log.warn("[isMatchProductAndDevice][产品({}) 设备({}) 匹配触发器异常]", productId, deviceId, e);
log.warn("[getSceneRuleListByProductIdAndDeviceIdFromCache][产品({}) 设备({}) 匹配触发器异常]",
productId, deviceId, e);
return false;
}
}
@@ -168,9 +194,10 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
@Override
public void executeSceneRuleByDevice(IotDeviceMessage message) {
// TODO @芋艿:这里的 tenantId通过设备获取@puhui999
TenantUtils.execute(message.getTenantId(), () -> {
// 1. 获得设备匹配的规则场景
// 1.1 这里的 tenantId通过设备获取
IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId());
TenantUtils.execute(device.getTenantId(), () -> {
// 1.2 获得设备匹配的规则场景
List<IotSceneRuleDO> sceneRules = getMatchedSceneRuleListByMessage(message);
if (CollUtil.isEmpty(sceneRules)) {
return;
@@ -214,16 +241,16 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
*/
private List<IotSceneRuleDO> getMatchedSceneRuleListByMessage(IotDeviceMessage message) {
// 1. 匹配设备
// TODO @芋艿:可能需要 getSelf(); 缓存 @puhui999
// TODO 缓存 @puhui999:可能需要 getSelf()
// 1.1 通过 deviceId 获取设备信息
IotDeviceDO device = deviceService.getDeviceFromCache(message.getDeviceId());
IotDeviceDO device = getSelf().deviceService.getDeviceFromCache(message.getDeviceId());
if (device == null) {
log.warn("[getMatchedSceneRuleListByMessage][设备({}) 不存在]", message.getDeviceId());
return List.of();
}
// 1.2 通过 productId 获取产品信息
IotProductDO product = productService.getProductFromCache(device.getProductId());
IotProductDO product = getSelf().productService.getProductFromCache(device.getProductId());
if (product == null) {
log.warn("[getMatchedSceneRuleListByMessage][产品({}) 不存在]", device.getProductId());
return List.of();
@@ -273,7 +300,6 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
*/
private boolean matchSingleTrigger(IotDeviceMessage message, IotSceneRuleDO.Trigger trigger, IotSceneRuleDO sceneRule) {
try {
// 2. 检查触发器的条件分组
return sceneRuleMatcherManager.isMatched(message, trigger) && isTriggerConditionGroupsMatched(message, trigger, sceneRule);
} catch (Exception e) {
log.error("[matchSingleTrigger][触发器匹配异常] sceneRuleId: {}, triggerType: {}, message: {}",
@@ -290,18 +316,19 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
* @param sceneRule 场景规则(用于日志)
* @return 是否匹配
*/
private boolean isTriggerConditionGroupsMatched(IotDeviceMessage message, IotSceneRuleDO.Trigger trigger, IotSceneRuleDO sceneRule) {
// 如果没有条件分组,则认为匹配成功(只依赖基础触发器匹配)
private boolean isTriggerConditionGroupsMatched(IotDeviceMessage message,
IotSceneRuleDO.Trigger trigger,
IotSceneRuleDO sceneRule) {
// 1. 如果没有条件分组,则认为匹配成功(只依赖基础触发器匹配)
if (CollUtil.isEmpty(trigger.getConditionGroups())) {
return true;
}
// 检查条件分组:分组与分组之间是"或"的关系,条件与条件之间是"且"的关系
// 2. 检查条件分组:分组与分组之间是"或"的关系,条件与条件之间是"且"的关系
for (List<IotSceneRuleDO.TriggerCondition> conditionGroup : trigger.getConditionGroups()) {
if (CollUtil.isEmpty(conditionGroup)) {
continue;
}
// 检查当前分组中的所有条件是否都匹配(且关系)
boolean allConditionsMatched = true;
for (IotSceneRuleDO.TriggerCondition condition : conditionGroup) {
@@ -310,14 +337,13 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
break;
}
}
// 如果当前分组的所有条件都匹配,则整个触发器匹配成功
if (allConditionsMatched) {
return true;
}
}
// 所有分组都不匹配
// 3. 所有分组都不匹配
return false;
}
@@ -352,13 +378,13 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
sceneRules.forEach(sceneRule -> {
// 2. 遍历规则场景的动作
sceneRule.getActions().forEach(actionConfig -> {
// 3.1 获取对应的动作 Action 数组
// 2.1 获取对应的动作 Action 数组
List<IotSceneRuleAction> actions = filterList(sceneRuleActions,
action -> action.getType().getType().equals(actionConfig.getType()));
if (CollUtil.isEmpty(actions)) {
return;
}
// 3.2 执行动作
// 2.2 执行动作
actions.forEach(action -> {
try {
action.execute(message, sceneRule, actionConfig);

View File

@@ -14,7 +14,6 @@ import java.util.List;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
// TODO @puhui999、@芋艿:未测试;需要场景联动开发完
/**
* IoT 告警恢复的 {@link IotSceneRuleAction} 实现类
*

View File

@@ -17,7 +17,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.Nullable;
import java.util.List;
// TODO @puhui999、@芋艿:未测试;需要场景联动开发完
/**
* IoT 告警触发的 {@link IotSceneRuleAction} 实现类
*

View File

@@ -1,6 +1,10 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.action;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleActionTypeEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
@@ -9,8 +13,11 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* IoT 设备控制的 {@link IotSceneRuleAction} 实现类
* IoT 设备属性设置的 {@link IotSceneRuleAction} 实现类
*
* @author 芋道源码
*/
@@ -23,28 +30,108 @@ public class IotDeviceControlSceneRuleAction implements IotSceneRuleAction {
@Resource
private IotDeviceMessageService deviceMessageService;
// TODO @puhui999这里
@Override
public void execute(IotDeviceMessage message,
IotSceneRuleDO rule, IotSceneRuleDO.Action actionConfig) {
//IotSceneRuleDO.ActionDeviceControl control = actionConfig.getDeviceControl();
//Assert.notNull(control, "设备控制配置不能为空");
//// 遍历每个设备,下发消息
//control.getDeviceNames().forEach(deviceName -> {
// IotDeviceDO device = deviceService.getDeviceFromCache(control.getProductKey(), deviceName);
// if (device == null) {
// log.error("[execute][message({}) actionConfig({}) 对应的设备不存在]", message, actionConfig);
// return;
// }
// try {
// // TODO @芋艿:@puhui999这块可能要改从 type => method
// IotDeviceMessage downstreamMessage = deviceMessageService.sendDeviceMessage(IotDeviceMessage.requestOf(
// control.getType() + control.getIdentifier(), control.getData()).setDeviceId(device.getId()));
// log.info("[execute][message({}) actionConfig({}) 下发消息({})成功]", message, actionConfig, downstreamMessage);
// } catch (Exception e) {
// log.error("[execute][message({}) actionConfig({}) 下发消息失败]", message, actionConfig, e);
// }
//});
// 1. 参数校验
if (actionConfig.getDeviceId() == null) {
log.error("[execute][规则场景({}) 动作配置({}) 设备编号不能为空]", rule.getId(), actionConfig);
return;
}
if (StrUtil.isEmpty(actionConfig.getIdentifier())) {
log.error("[execute][规则场景({}) 动作配置({}) 属性标识符不能为空]", rule.getId(), actionConfig);
return;
}
// 2. 判断是否为全部设备
if (IotDeviceDO.DEVICE_ID_ALL.equals(actionConfig.getDeviceId())) {
executeForAllDevices(message, rule, actionConfig);
} else {
executeForSingleDevice(message, rule, actionConfig);
}
}
/**
* 为单个设备执行属性设置
*/
private void executeForSingleDevice(IotDeviceMessage message,
IotSceneRuleDO rule, IotSceneRuleDO.Action actionConfig) {
// 1. 获取设备信息
IotDeviceDO device = deviceService.getDeviceFromCache(actionConfig.getDeviceId());
if (device == null) {
log.error("[executeForSingleDevice][规则场景({}) 动作配置({}) 对应的设备({}) 不存在]",
rule.getId(), actionConfig, actionConfig.getDeviceId());
return;
}
// 2. 执行属性设置
executePropertySetForDevice(rule, actionConfig, device);
}
/**
* 为产品下的所有设备执行属性设置
*/
private void executeForAllDevices(IotDeviceMessage message,
IotSceneRuleDO rule, IotSceneRuleDO.Action actionConfig) {
// 1. 参数校验
if (actionConfig.getProductId() == null) {
log.error("[executeForAllDevices][规则场景({}) 动作配置({}) 产品编号不能为空]", rule.getId(), actionConfig);
return;
}
// 2. 获取产品下的所有设备
List<IotDeviceDO> devices = deviceService.getDeviceListByProductId(actionConfig.getProductId());
if (CollUtil.isEmpty(devices)) {
log.warn("[executeForAllDevices][规则场景({}) 动作配置({}) 产品({}) 下没有设备]",
rule.getId(), actionConfig, actionConfig.getProductId());
return;
}
// 3. 遍历所有设备执行属性设置
for (IotDeviceDO device : devices) {
executePropertySetForDevice(rule, actionConfig, device);
}
}
/**
* 为指定设备执行属性设置
*/
private void executePropertySetForDevice(IotSceneRuleDO rule, IotSceneRuleDO.Action actionConfig, IotDeviceDO device) {
// 1. 构建属性设置消息
IotDeviceMessage downstreamMessage = buildPropertySetMessage(actionConfig, device);
if (downstreamMessage == null) {
log.error("[executePropertySetForDevice][规则场景({}) 动作配置({}) 设备({}) 构建属性设置消息失败]",
rule.getId(), actionConfig, device.getId());
return;
}
// 2. 发送设备消息
try {
IotDeviceMessage result = deviceMessageService.sendDeviceMessage(downstreamMessage, device);
log.info("[executePropertySetForDevice][规则场景({}) 动作配置({}) 设备({}) 属性设置消息({}) 发送成功]",
rule.getId(), actionConfig, device.getId(), result.getId());
} catch (Exception e) {
log.error("[executePropertySetForDevice][规则场景({}) 动作配置({}) 设备({}) 属性设置消息发送失败]",
rule.getId(), actionConfig, device.getId(), e);
}
}
/**
* 构建属性设置消息
*
* @param actionConfig 动作配置
* @param device 设备信息
* @return 设备消息
*/
private IotDeviceMessage buildPropertySetMessage(IotSceneRuleDO.Action actionConfig, IotDeviceDO device) {
try {
// 属性设置参数格式: {"properties": {"identifier": value}}
Object params = Map.of("properties", Map.of(actionConfig.getIdentifier(), actionConfig.getParams()));
return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.PROPERTY_SET.getMethod(), params);
} catch (Exception e) {
log.error("[buildPropertySetMessage][构建属性设置消息异常]", e);
return null;
}
}
@Override

View File

@@ -0,0 +1,145 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.action;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleActionTypeEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.message.IotDeviceMessageService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* IoT 设备服务调用的 {@link IotSceneRuleAction} 实现类
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotDeviceServiceInvokeSceneRuleAction implements IotSceneRuleAction {
@Resource
private IotDeviceService deviceService;
@Resource
private IotDeviceMessageService deviceMessageService;
@Override
public void execute(IotDeviceMessage message,
IotSceneRuleDO rule, IotSceneRuleDO.Action actionConfig) {
// 1. 参数校验
if (actionConfig.getDeviceId() == null) {
log.error("[execute][规则场景({}) 动作配置({}) 设备编号不能为空]", rule.getId(), actionConfig);
return;
}
if (StrUtil.isEmpty(actionConfig.getIdentifier())) {
log.error("[execute][规则场景({}) 动作配置({}) 服务标识符不能为空]", rule.getId(), actionConfig);
return;
}
// 2. 判断是否为全部设备
if (IotDeviceDO.DEVICE_ID_ALL.equals(actionConfig.getDeviceId())) {
executeForAllDevices(message, rule, actionConfig);
} else {
executeForSingleDevice(message, rule, actionConfig);
}
}
/**
* 为单个设备执行服务调用
*/
private void executeForSingleDevice(IotDeviceMessage message,
IotSceneRuleDO rule, IotSceneRuleDO.Action actionConfig) {
// 1. 获取设备信息
IotDeviceDO device = deviceService.getDeviceFromCache(actionConfig.getDeviceId());
if (device == null) {
log.error("[executeForSingleDevice][规则场景({}) 动作配置({}) 对应的设备({}) 不存在]",
rule.getId(), actionConfig, actionConfig.getDeviceId());
return;
}
// 2. 执行服务调用
executeServiceInvokeForDevice(rule, actionConfig, device);
}
/**
* 为产品下的所有设备执行服务调用
*/
private void executeForAllDevices(IotDeviceMessage message,
IotSceneRuleDO rule, IotSceneRuleDO.Action actionConfig) {
// 1. 参数校验
if (actionConfig.getProductId() == null) {
log.error("[executeForAllDevices][规则场景({}) 动作配置({}) 产品编号不能为空]", rule.getId(), actionConfig);
return;
}
// 2. 获取产品下的所有设备
List<IotDeviceDO> devices = deviceService.getDeviceListByProductId(actionConfig.getProductId());
if (CollUtil.isEmpty(devices)) {
log.warn("[executeForAllDevices][规则场景({}) 动作配置({}) 产品({}) 下没有设备]",
rule.getId(), actionConfig, actionConfig.getProductId());
return;
}
// 3. 遍历所有设备执行服务调用
for (IotDeviceDO device : devices) {
executeServiceInvokeForDevice(rule, actionConfig, device);
}
}
/**
* 为指定设备执行服务调用
*/
private void executeServiceInvokeForDevice(IotSceneRuleDO rule, IotSceneRuleDO.Action actionConfig, IotDeviceDO device) {
// 1. 构建服务调用消息
IotDeviceMessage downstreamMessage = buildServiceInvokeMessage(actionConfig, device);
if (downstreamMessage == null) {
log.error("[executeServiceInvokeForDevice][规则场景({}) 动作配置({}) 设备({}) 构建服务调用消息失败]",
rule.getId(), actionConfig, device.getId());
return;
}
// 2. 发送设备消息
try {
IotDeviceMessage result = deviceMessageService.sendDeviceMessage(downstreamMessage, device);
log.info("[executeServiceInvokeForDevice][规则场景({}) 动作配置({}) 设备({}) 服务调用消息({}) 发送成功]",
rule.getId(), actionConfig, device.getId(), result.getId());
} catch (Exception e) {
log.error("[executeServiceInvokeForDevice][规则场景({}) 动作配置({}) 设备({}) 服务调用消息发送失败]",
rule.getId(), actionConfig, device.getId(), e);
}
}
/**
* 构建服务调用消息
*
* @param actionConfig 动作配置
* @param device 设备信息
* @return 设备消息
*/
private IotDeviceMessage buildServiceInvokeMessage(IotSceneRuleDO.Action actionConfig, IotDeviceDO device) {
try {
// 服务调用参数格式: {"identifier": "serviceId", "params": {...}}
Object params = Map.of(
"identifier", actionConfig.getIdentifier(),
"params", actionConfig.getParams() != null ? actionConfig.getParams() : Map.of()
);
return IotDeviceMessage.requestOf(IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod(), params);
} catch (Exception e) {
log.error("[buildServiceInvokeMessage][构建服务调用消息异常]", e);
return null;
}
}
@Override
public IotSceneRuleActionTypeEnum getType() {
return IotSceneRuleActionTypeEnum.DEVICE_SERVICE_INVOKE;
}
}

View File

@@ -4,10 +4,8 @@ import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.condition.IotScene
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.trigger.IotSceneRuleTriggerMatcher;
/**
* IoT 场景规则匹配器基础接口
* <p>
* 定义所有匹配器的通用行为,包括优先级、名称和启用状态
* <p>
* IoT 场景规则匹配器基础接口:定义所有匹配器的通用行为,包括优先级、名称和启用状态
*
* - {@link IotSceneRuleTriggerMatcher} 触发器匹配器
* - {@link IotSceneRuleConditionMatcher} 条件匹配器
*

View File

@@ -18,10 +18,8 @@ import java.util.Map;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
/**
* IoT 场景规则匹配器工具类
* <p>
* 提供通用的条件评估逻辑和工具方法,供触发器和条件匹配器使用
* <p>
* IoT 场景规则匹配器工具类:提供通用的条件评估逻辑和工具方法,供触发器和条件匹配器使用
*
* 该类包含了匹配器实现中常用的工具方法,如条件评估、参数校验、日志记录等
*
* @author HUIHUI
@@ -91,7 +89,7 @@ public final class IotSceneRuleMatcherHelper {
Map<String, Object> springExpressionVariables = new HashMap<>();
// 设置源值
springExpressionVariables.put(IotSceneRuleConditionOperatorEnum.SPRING_EXPRESSION_SOURCE, sourceValue);
springExpressionVariables.put(IotSceneRuleConditionOperatorEnum.SPRING_EXPRESSION_SOURCE, StrUtil.toString(sourceValue));
// 处理参数值
if (StrUtil.isNotBlank(paramValue)) {

View File

@@ -16,9 +16,7 @@ import java.util.function.Function;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertMap;
/**
* IoT 场景规则匹配器统一管理器
* <p>
* 负责管理所有匹配器(触发器匹配器和条件匹配器),并提供统一的匹配入口
* IoT 场景规则匹配器统一管理器:负责管理所有匹配器(触发器匹配器和条件匹配器),并提供统一的匹配入口
*
* @author HUIHUI
*/
@@ -44,13 +42,12 @@ public class IotSceneRuleMatcherManager {
return;
}
// 按优先级排序并过滤启用的匹配器
// 1.1 按优先级排序并过滤启用的匹配器
List<IotSceneRuleMatcher> allMatchers = matchers.stream()
.filter(IotSceneRuleMatcher::isEnabled)
.sorted(Comparator.comparing(IotSceneRuleMatcher::getPriority))
.toList();
// 分离触发器匹配器和条件匹配器
// 1.2 分离触发器匹配器和条件匹配器
List<IotSceneRuleTriggerMatcher> triggerMatchers = allMatchers.stream()
.filter(matcher -> matcher instanceof IotSceneRuleTriggerMatcher)
.map(matcher -> (IotSceneRuleTriggerMatcher) matcher)
@@ -60,7 +57,7 @@ public class IotSceneRuleMatcherManager {
.map(matcher -> (IotSceneRuleConditionMatcher) matcher)
.toList();
// 构建触发器匹配器映射表
// 2.1 构建触发器匹配器映射表
this.triggerMatchers = convertMap(triggerMatchers, IotSceneRuleTriggerMatcher::getSupportedTriggerType,
Function.identity(),
(existing, replacement) -> {
@@ -70,7 +67,7 @@ public class IotSceneRuleMatcherManager {
existing.getSupportedTriggerType() : replacement.getSupportedTriggerType());
return existing.getPriority() <= replacement.getPriority() ? existing : replacement;
}, LinkedHashMap::new);
// 构建条件匹配器映射表
// 2.2 构建条件匹配器映射表
this.conditionMatchers = convertMap(conditionMatchers, IotSceneRuleConditionMatcher::getSupportedConditionType,
Function.identity(),
(existing, replacement) -> {
@@ -82,7 +79,7 @@ public class IotSceneRuleMatcherManager {
},
LinkedHashMap::new);
// 日志输出初始化信息
// 3. 日志输出初始化信息
log.info("[IotSceneRuleMatcherManager][初始化完成,共加载({})个匹配器,其中触发器匹配器({})个,条件匹配器({})个]",
allMatchers.size(), this.triggerMatchers.size(), this.conditionMatchers.size());
this.triggerMatchers.forEach((type, matcher) ->
@@ -135,7 +132,7 @@ public class IotSceneRuleMatcherManager {
return false;
}
// 根据条件类型查找对应的匹配器
// 1. 根据条件类型查找对应的匹配器
IotSceneRuleConditionTypeEnum conditionType = IotSceneRuleConditionTypeEnum.typeOf(condition.getType());
if (conditionType == null) {
log.warn("[isConditionMatched][conditionType({}) 未知的条件类型]", condition.getType());
@@ -147,7 +144,7 @@ public class IotSceneRuleMatcherManager {
return false;
}
// 执行匹配逻辑
// 2. 执行匹配逻辑
try {
return matcher.matches(message, condition);
} catch (Exception e) {

View File

@@ -17,15 +17,13 @@ import java.time.format.DateTimeFormatter;
import java.util.List;
/**
* 当前时间条件匹配器
* <p>
* 处理时间相关的子条件匹配逻辑
* 当前时间条件匹配器处理时间相关的子条件匹配逻辑
*
* @author HUIHUI
*/
@Component
@Slf4j
public class CurrentTimeConditionMatcher implements IotSceneRuleConditionMatcher {
public class IotCurrentTimeConditionMatcher implements IotSceneRuleConditionMatcher {
/**
* 时间格式化器 - HH:mm:ss

View File

@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.condition;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
@@ -7,15 +8,14 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper;
import org.springframework.stereotype.Component;
/**
* 设备属性条件匹配器
* <p>
* 处理设备属性相关的子条件匹配逻辑
* 设备属性条件匹配器处理设备属性相关的子条件匹配逻辑
*
* @author HUIHUI
*/
@Component
public class DevicePropertyConditionMatcher implements IotSceneRuleConditionMatcher {
public class IotDevicePropertyConditionMatcher implements IotSceneRuleConditionMatcher {
@Override
public IotSceneRuleConditionTypeEnum getSupportedConditionType() {
@@ -43,10 +43,10 @@ public class DevicePropertyConditionMatcher implements IotSceneRuleConditionMatc
return false;
}
// 2.1. 获取属性值
Object propertyValue = message.getParams();
// 2.1. 获取属性值 - 使用工具类方法正确提取属性值
Object propertyValue = IotDeviceMessageUtils.extractPropertyValue(message, condition.getIdentifier());
if (propertyValue == null) {
IotSceneRuleMatcherHelper.logConditionMatchFailure(message, condition, "消息中属性值为空");
IotSceneRuleMatcherHelper.logConditionMatchFailure(message, condition, "消息中属性值为空或未找到指定属性");
return false;
}

View File

@@ -1,20 +1,19 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.condition;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper;
import org.springframework.stereotype.Component;
/**
* 设备状态条件匹配器
* <p>
* 处理设备状态相关的子条件匹配逻辑
* 设备状态条件匹配器处理设备状态相关的子条件匹配逻辑
*
* @author HUIHUI
*/
@Component
public class DeviceStateConditionMatcher implements IotSceneRuleConditionMatcher {
public class IotDeviceStateConditionMatcher implements IotSceneRuleConditionMatcher {
@Override
public IotSceneRuleConditionTypeEnum getSupportedConditionType() {
@@ -35,8 +34,9 @@ public class DeviceStateConditionMatcher implements IotSceneRuleConditionMatcher
return false;
}
// 2.1 获取设备状态值
Object stateValue = message.getParams();
// 2.1 获取设备状态值 - 使用工具类方法正确提取状态值
// 对于设备状态条件状态值通过 getIdentifier 获取实际是从 params.state 字段
String stateValue = IotDeviceMessageUtils.getIdentifier(message);
if (stateValue == null) {
IotSceneRuleMatcherHelper.logConditionMatchFailure(message, condition, "消息中设备状态值为空");
return false;

View File

@@ -6,12 +6,9 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleConditionTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcher;
/**
* IoT 场景规则条件匹配器接口
* <p>
* 专门处理子条件匹配逻辑,如设备状态、属性值、时间条件等
* <p>
* 条件匹配器负责判断设备消息是否满足场景规则的附加条件,
* 在触发器匹配成功后进行进一步的条件筛选
* IoT 场景规则条件匹配器接口:专门处理子条件的匹配逻辑,如设备状态、属性值、时间条件等
*
* 条件匹配器负责判断设备消息是否满足场景规则的附加条件,在触发器匹配成功后进行进一步的条件筛选
*
* @author HUIHUI
*/

View File

@@ -10,14 +10,12 @@ import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatche
import org.springframework.stereotype.Component;
/**
* 设备事件上报触发器匹配器
* <p>
* 处理设备事件上报的触发器匹配逻辑
* 设备事件上报触发器匹配器处理设备事件上报的触发器匹配逻辑
*
* @author HUIHUI
*/
@Component
public class DeviceEventPostTriggerMatcher implements IotSceneRuleTriggerMatcher {
public class IotDeviceEventPostTriggerMatcher implements IotSceneRuleTriggerMatcher {
@Override
public IotSceneRuleTriggerTypeEnum getSupportedTriggerType() {

View File

@@ -9,14 +9,12 @@ import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatche
import org.springframework.stereotype.Component;
/**
* 设备属性上报触发器匹配器
* <p>
* 处理设备属性数据上报的触发器匹配逻辑
* 设备属性上报触发器匹配器处理设备属性数据上报的触发器匹配逻辑
*
* @author HUIHUI
*/
@Component
public class DevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerMatcher {
public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerMatcher {
@Override
public IotSceneRuleTriggerTypeEnum getSupportedTriggerType() {
@@ -52,10 +50,10 @@ public class DevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerMatc
return false;
}
// 2.1 获取属性值
Object propertyValue = message.getParams();
// 2.1 获取属性值 - 使用工具类方法正确提取属性值
Object propertyValue = IotDeviceMessageUtils.extractPropertyValue(message, trigger.getIdentifier());
if (propertyValue == null) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中属性值为空");
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中属性值为空或未找到指定属性");
return false;
}

View File

@@ -9,14 +9,12 @@ import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatche
import org.springframework.stereotype.Component;
/**
* 设备服务调用触发器匹配器
* <p>
* 处理设备服务调用的触发器匹配逻辑
* 设备服务调用触发器匹配器处理设备服务调用的触发器匹配逻辑
*
* @author HUIHUI
*/
@Component
public class DeviceServiceInvokeTriggerMatcher implements IotSceneRuleTriggerMatcher {
public class IotDeviceServiceInvokeTriggerMatcher implements IotSceneRuleTriggerMatcher {
@Override
public IotSceneRuleTriggerTypeEnum getSupportedTriggerType() {
@@ -46,7 +44,7 @@ public class DeviceServiceInvokeTriggerMatcher implements IotSceneRuleTriggerMat
// 2. 对于服务调用触发器通常只需要匹配服务标识符即可
// 不需要检查操作符和值因为服务调用本身就是触发条件
// TODO @puhui999: 服务调用时校验输入参数是否匹配条件
// TODO @puhui999: 服务调用时校验输入参数是否匹配条件
IotSceneRuleMatcherHelper.logTriggerMatchSuccess(message, trigger);
return true;
}

View File

@@ -2,20 +2,19 @@ package cn.iocoder.yudao.module.iot.service.rule.scene.matcher.trigger;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcherHelper;
import org.springframework.stereotype.Component;
/**
* 设备状态更新触发器匹配器
* <p>
* 处理设备上下线状态变更的触发器匹配逻辑
* 设备状态更新触发器匹配器处理设备上下线状态变更的触发器匹配逻辑
*
* @author HUIHUI
*/
@Component
public class DeviceStateUpdateTriggerMatcher implements IotSceneRuleTriggerMatcher {
public class IotDeviceStateUpdateTriggerMatcher implements IotSceneRuleTriggerMatcher {
@Override
public IotSceneRuleTriggerTypeEnum getSupportedTriggerType() {
@@ -43,16 +42,17 @@ public class DeviceStateUpdateTriggerMatcher implements IotSceneRuleTriggerMatch
return false;
}
// 2.1 获取设备状态值
Object stateValue = message.getParams();
if (stateValue == null) {
// 2.1 获取设备状态值 - 使用工具类方法正确提取状态值
// 对于状态更新消息状态值通过 getIdentifier 获取实际是从 params.state 字段
String stateIdentifier = IotDeviceMessageUtils.getIdentifier(message);
if (stateIdentifier == null) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中设备状态值为空");
return false;
}
// 2.2 使用条件评估器进行匹配
// TODO @puhui999: 状态匹配重新实现
boolean matched = IotSceneRuleMatcherHelper.evaluateCondition(stateValue, trigger.getOperator(), trigger.getValue());
// 状态值通常是字符串或数字直接使用标识符作为状态值
boolean matched = IotSceneRuleMatcherHelper.evaluateCondition(stateIdentifier, trigger.getOperator(), trigger.getValue());
if (matched) {
IotSceneRuleMatcherHelper.logTriggerMatchSuccess(message, trigger);
} else {

View File

@@ -6,12 +6,9 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.scene.matcher.IotSceneRuleMatcher;
/**
* IoT 场景规则触发器匹配器接口
* <p>
* 专门处理主触发条件的匹配逻辑,如设备消息类型、定时器等
* <p>
* 触发器匹配器负责判断设备消息是否满足场景规则的主触发条件,
* 是场景规则执行的第一道门槛
* IoT 场景规则触发器匹配器接口:专门处理主触发条件的匹配逻辑,如设备消息类型、定时器等
*
* 触发器匹配器负责判断设备消息是否满足场景规则的主触发条件,是场景规则执行的第一道门槛
*
* @author HUIHUI
*/

View File

@@ -9,15 +9,14 @@ import org.quartz.CronExpression;
import org.springframework.stereotype.Component;
/**
* 定时触发器匹配器
* <p>
* 处理定时触发的触发器匹配逻辑
* 定时触发器匹配器处理定时触发的触发器匹配逻辑
*
* 注意定时触发器不依赖设备消息主要用于定时任务场景
*
* @author HUIHUI
*/
@Component
public class TimerTriggerMatcher implements IotSceneRuleTriggerMatcher {
public class IotTimerTriggerMatcher implements IotSceneRuleTriggerMatcher {
@Override
public IotSceneRuleTriggerTypeEnum getSupportedTriggerType() {

View File

@@ -0,0 +1,154 @@
package cn.iocoder.yudao.module.iot.service.rule.scene.timer;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotSceneRuleDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotSceneRuleTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.framework.job.core.IotSchedulerManager;
import cn.iocoder.yudao.module.iot.job.rule.IotSceneRuleJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.quartz.SchedulerException;
import org.springframework.stereotype.Component;
import java.util.List;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.filterList;
/**
* IoT 场景规则定时触发器处理器:负责管理定时触发器的注册、更新、删除等操作
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotSceneRuleTimerHandler {
@Resource(name = "iotSchedulerManager")
private IotSchedulerManager schedulerManager;
/**
* 注册场景规则的定时触发器
*
* @param sceneRule 场景规则
*/
public void registerTimerTriggers(IotSceneRuleDO sceneRule) {
// 1. 过滤出定时触发器
if (sceneRule == null || CollUtil.isEmpty(sceneRule.getTriggers())) {
return;
}
List<IotSceneRuleDO.Trigger> timerTriggers = filterList(sceneRule.getTriggers(),
trigger -> ObjUtil.equals(trigger.getType(), IotSceneRuleTriggerTypeEnum.TIMER.getType()));
if (CollUtil.isEmpty(timerTriggers)) {
return;
}
// 2. 注册每个定时触发器
timerTriggers.forEach(trigger -> registerSingleTimerTrigger(sceneRule, trigger));
}
/**
* 更新场景规则的定时触发器
*
* @param sceneRule 场景规则
*/
public void updateTimerTriggers(IotSceneRuleDO sceneRule) {
if (sceneRule == null) {
return;
}
// 1. 先删除旧的定时任务
unregisterTimerTriggers(sceneRule.getId());
// 2.1 如果场景规则已禁用,则不重新注册
if (CommonStatusEnum.isDisable(sceneRule.getStatus())) {
log.info("[updateTimerTriggers][场景规则({}) 已禁用,不注册定时触发器]", sceneRule.getId());
return;
}
// 2.2 重新注册定时触发器
registerTimerTriggers(sceneRule);
}
/**
* 注销场景规则的定时触发器
*
* @param sceneRuleId 场景规则 ID
*/
public void unregisterTimerTriggers(Long sceneRuleId) {
if (sceneRuleId == null) {
return;
}
String jobName = buildJobName(sceneRuleId);
try {
schedulerManager.deleteJob(jobName);
log.info("[unregisterTimerTriggers][场景规则({}) 定时触发器注销成功]", sceneRuleId);
} catch (SchedulerException e) {
log.error("[unregisterTimerTriggers][场景规则({}) 定时触发器注销失败]", sceneRuleId, e);
}
}
/**
* 暂停场景规则的定时触发器
*
* @param sceneRuleId 场景规则 ID
*/
public void pauseTimerTriggers(Long sceneRuleId) {
if (sceneRuleId == null) {
return;
}
String jobName = buildJobName(sceneRuleId);
try {
schedulerManager.pauseJob(jobName);
log.info("[pauseTimerTriggers][场景规则({}) 定时触发器暂停成功]", sceneRuleId);
} catch (SchedulerException e) {
log.error("[pauseTimerTriggers][场景规则({}) 定时触发器暂停失败]", sceneRuleId, e);
}
}
/**
* 注册单个定时触发器
*
* @param sceneRule 场景规则
* @param trigger 定时触发器配置
*/
private void registerSingleTimerTrigger(IotSceneRuleDO sceneRule, IotSceneRuleDO.Trigger trigger) {
// 1. 参数校验
if (StrUtil.isBlank(trigger.getCronExpression())) {
log.error("[registerSingleTimerTrigger][场景规则({}) 定时触发器缺少 CRON 表达式]", sceneRule.getId());
return;
}
try {
// 2.1 构建任务名称和数据
String jobName = buildJobName(sceneRule.getId());
// 2.2 注册定时任务
schedulerManager.addOrUpdateJob(
IotSceneRuleJob.class,
jobName,
trigger.getCronExpression(),
IotSceneRuleJob.buildJobDataMap(sceneRule.getId())
);
log.info("[registerSingleTimerTrigger][场景规则({}) 定时触发器注册成功CRON: {}]",
sceneRule.getId(), trigger.getCronExpression());
} catch (SchedulerException e) {
log.error("[registerSingleTimerTrigger][场景规则({}) 定时触发器注册失败CRON: {}]",
sceneRule.getId(), trigger.getCronExpression(), e);
}
}
/**
* 构建任务名称
*
* @param sceneRuleId 场景规则 ID
* @return 任务名称
*/
private String buildJobName(Long sceneRuleId) {
return "iot_scene_rule_timer_" + sceneRuleId;
}
}