【同步】BOOT 和 CLOUD 的功能

This commit is contained in:
YunaiV
2026-01-18 19:01:29 +08:00
parent 2e317b165b
commit 304b2f102a
75 changed files with 3249 additions and 155 deletions

View File

@@ -31,4 +31,7 @@ public class IotDevicePageReqVO extends PageParam {
@Schema(description = "设备分组编号", example = "1024")
private Long groupId;
@Schema(description = "网关设备 ID", example = "16380")
private Long gatewayId;
}

View File

@@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
@@ -22,6 +23,10 @@ public class IotDataSinkPageReqVO extends PageParam {
@InEnum(CommonStatusEnum.class)
private Integer status;
@Schema(description = "数据目的类型", example = "1")
@InEnum(IotDataSinkTypeEnum.class)
private Integer type;
@Schema(description = "创建时间")
@DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)
private LocalDateTime[] createTime;

View File

@@ -21,6 +21,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.List;
/**
@@ -56,6 +57,11 @@ public class IotSceneRuleDO extends TenantBaseDO {
*/
private Integer status;
/**
* 最后触发时间
*/
private LocalDateTime lastTriggerTime;
/**
* 场景定义配置
*/

View File

@@ -10,6 +10,35 @@ import lombok.Data;
@Data
public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
/**
* 默认连接超时时间(毫秒)
*/
public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000;
/**
* 默认读取超时时间(毫秒)
*/
public static final int DEFAULT_READ_TIMEOUT_MS = 10000;
/**
* 默认是否启用 SSL
*/
public static final boolean DEFAULT_SSL = false;
/**
* 默认数据格式
*/
public static final String DEFAULT_DATA_FORMAT = "JSON";
/**
* 默认心跳间隔时间(毫秒)
*/
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L;
/**
* 默认重连间隔时间(毫秒)
*/
public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L;
/**
* 默认最大重连次数
*/
public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3;
/**
* TCP 服务器地址
*/
@@ -23,17 +52,17 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
/**
* 连接超时时间(毫秒)
*/
private Integer connectTimeoutMs = 5000;
private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS;
/**
* 读取超时时间(毫秒)
*/
private Integer readTimeoutMs = 10000;
private Integer readTimeoutMs = DEFAULT_READ_TIMEOUT_MS;
/**
* 是否启用 SSL
*/
private Boolean ssl = false;
private Boolean ssl = DEFAULT_SSL;
/**
* SSL 证书路径(当 ssl=true 时需要)
@@ -43,21 +72,21 @@ public class IotDataSinkTcpConfig extends IotAbstractDataSinkConfig {
/**
* 数据格式JSON 或 BINARY
*/
private String dataFormat = "JSON";
private String dataFormat = DEFAULT_DATA_FORMAT;
/**
* 心跳间隔时间毫秒0 表示不启用心跳
*/
private Long heartbeatIntervalMs = 30000L;
private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
/**
* 重连间隔时间(毫秒)
*/
private Long reconnectIntervalMs = 5000L;
private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS;
/**
* 最大重连次数
*/
private Integer maxReconnectAttempts = 3;
private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS;
}

View File

@@ -13,6 +13,51 @@ import lombok.Data;
@Data
public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
/**
* 默认连接超时时间(毫秒)
*/
public static final int DEFAULT_CONNECT_TIMEOUT_MS = 5000;
/**
* 默认发送超时时间(毫秒)
*/
public static final int DEFAULT_SEND_TIMEOUT_MS = 10000;
/**
* 默认心跳间隔时间(毫秒)
*/
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 30000L;
/**
* 默认心跳消息内容
*/
public static final String DEFAULT_HEARTBEAT_MESSAGE = "{\"type\":\"heartbeat\"}";
/**
* 默认是否启用 SSL 证书验证
*/
public static final boolean DEFAULT_VERIFY_SSL_CERT = true;
/**
* 默认数据格式
*/
public static final String DEFAULT_DATA_FORMAT = "JSON";
/**
* 默认重连间隔时间(毫秒)
*/
public static final long DEFAULT_RECONNECT_INTERVAL_MS = 5000L;
/**
* 默认最大重连次数
*/
public static final int DEFAULT_MAX_RECONNECT_ATTEMPTS = 3;
/**
* 默认是否启用压缩
*/
public static final boolean DEFAULT_ENABLE_COMPRESSION = false;
/**
* 默认消息发送重试次数
*/
public static final int DEFAULT_SEND_RETRY_COUNT = 1;
/**
* 默认消息发送重试间隔(毫秒)
*/
public static final long DEFAULT_SEND_RETRY_INTERVAL_MS = 1000L;
/**
* WebSocket 服务器地址
* 例如ws://localhost:8080/ws 或 wss://example.com/ws
@@ -22,22 +67,22 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
/**
* 连接超时时间(毫秒)
*/
private Integer connectTimeoutMs = 5000;
private Integer connectTimeoutMs = DEFAULT_CONNECT_TIMEOUT_MS;
/**
* 发送超时时间(毫秒)
*/
private Integer sendTimeoutMs = 10000;
private Integer sendTimeoutMs = DEFAULT_SEND_TIMEOUT_MS;
/**
* 心跳间隔时间毫秒0 表示不启用心跳
*/
private Long heartbeatIntervalMs = 30000L;
private Long heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
/**
* 心跳消息内容JSON 格式)
*/
private String heartbeatMessage = "{\"type\":\"heartbeat\"}";
private String heartbeatMessage = DEFAULT_HEARTBEAT_MESSAGE;
/**
* 子协议列表(逗号分隔)
@@ -52,36 +97,36 @@ public class IotDataSinkWebSocketConfig extends IotAbstractDataSinkConfig {
/**
* 是否启用 SSL 证书验证(仅对 wss:// 生效)
*/
private Boolean verifySslCert = true;
private Boolean verifySslCert = DEFAULT_VERIFY_SSL_CERT;
/**
* 数据格式JSON 或 TEXT
*/
private String dataFormat = "JSON";
private String dataFormat = DEFAULT_DATA_FORMAT;
/**
* 重连间隔时间(毫秒)
*/
private Long reconnectIntervalMs = 5000L;
private Long reconnectIntervalMs = DEFAULT_RECONNECT_INTERVAL_MS;
/**
* 最大重连次数
*/
private Integer maxReconnectAttempts = 3;
private Integer maxReconnectAttempts = DEFAULT_MAX_RECONNECT_ATTEMPTS;
/**
* 是否启用压缩
*/
private Boolean enableCompression = false;
private Boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
/**
* 消息发送重试次数
*/
private Integer sendRetryCount = 1;
private Integer sendRetryCount = DEFAULT_SEND_RETRY_COUNT;
/**
* 消息发送重试间隔(毫秒)
*/
private Long sendRetryIntervalMs = 1000L;
private Long sendRetryIntervalMs = DEFAULT_SEND_RETRY_INTERVAL_MS;
}

View File

@@ -31,6 +31,7 @@ public interface IotDeviceMapper extends BaseMapperX<IotDeviceDO> {
.eqIfPresent(IotDeviceDO::getDeviceType, reqVO.getDeviceType())
.likeIfPresent(IotDeviceDO::getNickname, reqVO.getNickname())
.eqIfPresent(IotDeviceDO::getState, reqVO.getStatus())
.eqIfPresent(IotDeviceDO::getGatewayId, reqVO.getGatewayId())
.apply(ObjectUtil.isNotNull(reqVO.getGroupId()), "FIND_IN_SET(" + reqVO.getGroupId() + ",group_ids) > 0")
.orderByDesc(IotDeviceDO::getId));
}

View File

@@ -35,4 +35,8 @@ public interface IotDataRuleMapper extends BaseMapperX<IotDataRuleDO> {
return selectList(IotDataRuleDO::getStatus, status);
}
default IotDataRuleDO selectByName(String name) {
return selectOne(IotDataRuleDO::getName, name);
}
}

View File

@@ -21,6 +21,7 @@ public interface IotDataSinkMapper extends BaseMapperX<IotDataSinkDO> {
return selectPage(reqVO, new LambdaQueryWrapperX<IotDataSinkDO>()
.likeIfPresent(IotDataSinkDO::getName, reqVO.getName())
.eqIfPresent(IotDataSinkDO::getStatus, reqVO.getStatus())
.eqIfPresent(IotDataSinkDO::getType, reqVO.getType())
.betweenIfPresent(IotDataSinkDO::getCreateTime, reqVO.getCreateTime())
.orderByDesc(IotDataSinkDO::getId));
}
@@ -29,4 +30,8 @@ public interface IotDataSinkMapper extends BaseMapperX<IotDataSinkDO> {
return selectList(IotDataSinkDO::getStatus, status);
}
default IotDataSinkDO selectByName(String name) {
return selectOne(IotDataSinkDO::getName, name);
}
}

View File

@@ -41,7 +41,7 @@ public class IotAlertConfigServiceImpl implements IotAlertConfigService {
public Long createAlertConfig(IotAlertConfigSaveReqVO createReqVO) {
// 校验关联数据是否存在
sceneRuleService.validateSceneRuleList(createReqVO.getSceneRuleIds());
adminUserApi.validateUserList(createReqVO.getReceiveUserIds()).checkError();
adminUserApi.validateUserList(createReqVO.getReceiveUserIds());
// 插入
IotAlertConfigDO alertConfig = BeanUtils.toBean(createReqVO, IotAlertConfigDO.class);
@@ -55,7 +55,7 @@ public class IotAlertConfigServiceImpl implements IotAlertConfigService {
validateAlertConfigExists(updateReqVO.getId());
// 校验关联数据是否存在
sceneRuleService.validateSceneRuleList(updateReqVO.getSceneRuleIds());
adminUserApi.validateUserList(updateReqVO.getReceiveUserIds()).checkError();
adminUserApi.validateUserList(updateReqVO.getReceiveUserIds());
// 更新
IotAlertConfigDO updateObj = BeanUtils.toBean(updateReqVO, IotAlertConfigDO.class);

View File

@@ -382,7 +382,7 @@ public class IotDeviceServiceImpl implements IotDeviceService {
return;
}
// 2.2.2 如果存在,判断是否允许更新
if (updateSupport) {
if (!updateSupport) {
throw exception(DEVICE_KEY_EXISTS);
}
updateDevice(new IotDeviceSaveReqVO().setId(existDevice.getId())

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.service.device.property;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
@@ -145,6 +146,12 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
IotDataSpecsDataTypeEnum.STRUCT.getDataType(), IotDataSpecsDataTypeEnum.ARRAY.getDataType())) {
// 特殊STRUCT 和 ARRAY 类型,在 TDengine 里,有没对应数据类型,只能通过 JSON 来存储
properties.put((String) key, JsonUtils.toJsonString(value));
} else if (IotDataSpecsDataTypeEnum.DOUBLE.getDataType().equals(thingModel.getProperty().getDataType())) {
properties.put((String) key, Convert.toDouble(value));
} else if (IotDataSpecsDataTypeEnum.FLOAT.getDataType().equals(thingModel.getProperty().getDataType())) {
properties.put((String) key, Convert.toFloat(value));
} else if (IotDataSpecsDataTypeEnum.BOOL.getDataType().equals(thingModel.getProperty().getDataType())) {
properties.put((String) key, Convert.toByte(value));
} else {
properties.put((String) key, value);
}

View File

@@ -32,6 +32,7 @@ import java.util.*;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NAME_EXISTS;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_RULE_NOT_EXISTS;
/**
@@ -62,6 +63,8 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
@Override
@CacheEvict(value = RedisKeyConstants.DATA_RULE_LIST, allEntries = true)
public Long createDataRule(IotDataRuleSaveReqVO createReqVO) {
// 校验名称唯一
validateDataRuleNameUnique(null, createReqVO.getName());
// 校验数据源配置和数据目的
validateDataRuleConfig(createReqVO);
// 新增
@@ -75,6 +78,8 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
public void updateDataRule(IotDataRuleSaveReqVO updateReqVO) {
// 校验存在
validateDataRuleExists(updateReqVO.getId());
// 校验名称唯一
validateDataRuleNameUnique(updateReqVO.getId(), updateReqVO.getName());
// 校验数据源配置和数据目的
validateDataRuleConfig(updateReqVO);
@@ -98,6 +103,29 @@ public class IotDataRuleServiceImpl implements IotDataRuleService {
}
}
/**
* 校验数据流转规则名称唯一性
*
* @param id 数据流转规则编号(用于更新时排除自身)
* @param name 数据流转规则名称
*/
private void validateDataRuleNameUnique(Long id, String name) {
if (StrUtil.isBlank(name)) {
return;
}
IotDataRuleDO dataRule = dataRuleMapper.selectByName(name);
if (dataRule == null) {
return;
}
// 如果 id 为空,说明不用比较是否为相同 id 的规则
if (id == null) {
throw exception(DATA_RULE_NAME_EXISTS);
}
if (!dataRule.getId().equals(id)) {
throw exception(DATA_RULE_NAME_EXISTS);
}
}
/**
* 校验数据流转规则配置
*

View File

@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.service.rule.data;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.data.sink.IotDataSinkPageReqVO;
@@ -19,6 +20,7 @@ import java.util.List;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_DELETE_FAIL_USED_BY_RULE;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NAME_EXISTS;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_SINK_NOT_EXISTS;
/**
@@ -39,6 +41,9 @@ public class IotDataSinkServiceImpl implements IotDataSinkService {
@Override
public Long createDataSink(IotDataSinkSaveReqVO createReqVO) {
// 校验名称唯一
validateDataSinkNameUnique(null, createReqVO.getName());
// 新增
IotDataSinkDO dataBridge = BeanUtils.toBean(createReqVO, IotDataSinkDO.class);
dataSinkMapper.insert(dataBridge);
return dataBridge.getId();
@@ -48,6 +53,8 @@ public class IotDataSinkServiceImpl implements IotDataSinkService {
public void updateDataSink(IotDataSinkSaveReqVO updateReqVO) {
// 校验存在
validateDataBridgeExists(updateReqVO.getId());
// 校验名称唯一
validateDataSinkNameUnique(updateReqVO.getId(), updateReqVO.getName());
// 更新
IotDataSinkDO updateObj = BeanUtils.toBean(updateReqVO, IotDataSinkDO.class);
dataSinkMapper.updateById(updateObj);
@@ -71,6 +78,29 @@ public class IotDataSinkServiceImpl implements IotDataSinkService {
}
}
/**
* 校验数据流转目的名称唯一性
*
* @param id 数据流转目的编号(用于更新时排除自身)
* @param name 数据流转目的名称
*/
private void validateDataSinkNameUnique(Long id, String name) {
if (StrUtil.isBlank(name)) {
return;
}
IotDataSinkDO dataSink = dataSinkMapper.selectByName(name);
if (dataSink == null) {
return;
}
// 如果 id 为空,说明不用比较是否为相同 id 的目的
if (id == null) {
throw exception(DATA_SINK_NAME_EXISTS);
}
if (!dataSink.getId().equals(id)) {
throw exception(DATA_SINK_NAME_EXISTS);
}
}
@Override
public IotDataSinkDO getDataSink(Long id) {
return dataSinkMapper.selectById(id);

View File

@@ -7,8 +7,6 @@ import cn.iocoder.yudao.module.iot.service.rule.data.action.tcp.IotTcpClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Duration;
/**
* TCP 的 {@link IotDataRuleAction} 实现类
* <p>
@@ -23,9 +21,6 @@ import java.time.Duration;
public class IotTcpDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkTcpConfig, IotTcpClient> {
private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(5);
private static final Duration SEND_TIMEOUT = Duration.ofSeconds(10);
@Override
public Integer getType() {
return IotDataSinkTypeEnum.TCP.getType();

View File

@@ -0,0 +1,85 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataSinkTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.data.action.websocket.IotWebSocketClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* WebSocket 的 {@link IotDataRuleAction} 实现类
* <p>
* 负责将设备消息发送到外部 WebSocket 服务器
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
* 使用连接池管理 WebSocket 连接,提高性能和资源利用率
*
* @author HUIHUI
*/
@Component
@Slf4j
public class IotWebSocketDataRuleAction extends
IotDataRuleCacheableAction<IotDataSinkWebSocketConfig, IotWebSocketClient> {
@Override
public Integer getType() {
return IotDataSinkTypeEnum.WEBSOCKET.getType();
}
@Override
protected IotWebSocketClient initProducer(IotDataSinkWebSocketConfig config) throws Exception {
// 1. 参数校验
if (StrUtil.isBlank(config.getServerUrl())) {
throw new IllegalArgumentException("WebSocket 服务器地址不能为空");
}
if (!StrUtil.startWithAny(config.getServerUrl(), "ws://", "wss://")) {
throw new IllegalArgumentException("WebSocket 服务器地址必须以 ws:// 或 wss:// 开头");
}
// 2.1 创建 WebSocket 客户端
IotWebSocketClient webSocketClient = new IotWebSocketClient(
config.getServerUrl(),
config.getConnectTimeoutMs(),
config.getSendTimeoutMs(),
config.getDataFormat()
);
// 2.2 连接服务器
webSocketClient.connect();
log.info("[initProducer][WebSocket 客户端创建并连接成功,服务器: {},数据格式: {}]",
config.getServerUrl(), config.getDataFormat());
return webSocketClient;
}
@Override
protected void closeProducer(IotWebSocketClient producer) throws Exception {
if (producer != null) {
producer.close();
}
}
@Override
protected void execute(IotDeviceMessage message, IotDataSinkWebSocketConfig config) throws Exception {
try {
// 1.1 获取或创建 WebSocket 客户端
// TODO @puhui999需要加锁保证必须连接上
IotWebSocketClient webSocketClient = getProducer(config);
// 1.2 检查连接状态,如果断开则重新连接
if (!webSocketClient.isConnected()) {
log.warn("[execute][WebSocket 连接已断开,尝试重新连接,服务器: {}]", config.getServerUrl());
webSocketClient.connect();
}
// 2.1 发送消息
webSocketClient.sendMessage(message);
// 2.2 记录发送成功日志
log.info("[execute][message({}) config({}) 发送成功WebSocket 服务器: {}]",
message, config, config.getServerUrl());
} catch (Exception e) {
log.error("[execute][message({}) config({}) 发送失败WebSocket 服务器: {}]",
message, config, config.getServerUrl(), e);
throw e;
}
}
}

View File

@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.service.rule.data.action.tcp;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkTcpConfig;
import lombok.extern.slf4j.Slf4j;
import javax.net.ssl.SSLSocketFactory;
@@ -30,6 +31,7 @@ public class IotTcpClient {
private final Integer connectTimeoutMs;
private final Integer readTimeoutMs;
private final Boolean ssl;
// TODO @puhui999sslCertPath 是不是没在用?
private final String sslCertPath;
private final String dataFormat;
@@ -38,16 +40,16 @@ public class IotTcpClient {
private BufferedReader reader;
private final AtomicBoolean connected = new AtomicBoolean(false);
// TODO @puhui999default 值IotDataSinkTcpConfig.java 枚举起来哈;
public IotTcpClient(String host, Integer port, Integer connectTimeoutMs, Integer readTimeoutMs,
Boolean ssl, String sslCertPath, String dataFormat) {
this.host = host;
this.port = port;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : 5000;
this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : 10000;
this.ssl = ssl != null ? ssl : false;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkTcpConfig.DEFAULT_CONNECT_TIMEOUT_MS;
this.readTimeoutMs = readTimeoutMs != null ? readTimeoutMs : IotDataSinkTcpConfig.DEFAULT_READ_TIMEOUT_MS;
this.ssl = ssl != null ? ssl : IotDataSinkTcpConfig.DEFAULT_SSL;
this.sslCertPath = sslCertPath;
this.dataFormat = dataFormat != null ? dataFormat : "JSON";
// TODO @puhui999可以使用 StrUtil.defaultIfBlank 方法简化
this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT;
}
/**
@@ -99,9 +101,8 @@ public class IotTcpClient {
}
try {
// TODO @puhui999枚举值
String messageData;
if ("JSON".equalsIgnoreCase(dataFormat)) {
if (IotDataSinkTcpConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) {
// JSON 格式
messageData = JsonUtils.toJsonString(message);
} else {

View File

@@ -0,0 +1,177 @@
package cn.iocoder.yudao.module.iot.service.rule.data.action.websocket;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.config.IotDataSinkWebSocketConfig;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* IoT WebSocket 客户端
* <p>
* 负责与外部 WebSocket 服务器建立连接并发送设备消息
* 支持 ws:// 和 wss:// 协议,支持 JSON 和 TEXT 数据格式
* 基于 Java 11+ 内置的 java.net.http.WebSocket 实现
*
* @author HUIHUI
*/
@Slf4j
public class IotWebSocketClient implements WebSocket.Listener {
private final String serverUrl;
private final Integer connectTimeoutMs;
private final Integer sendTimeoutMs;
private final String dataFormat;
private WebSocket webSocket;
private final AtomicBoolean connected = new AtomicBoolean(false);
private final StringBuilder messageBuffer = new StringBuilder();
public IotWebSocketClient(String serverUrl, Integer connectTimeoutMs, Integer sendTimeoutMs, String dataFormat) {
this.serverUrl = serverUrl;
this.connectTimeoutMs = connectTimeoutMs != null ? connectTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_CONNECT_TIMEOUT_MS;
this.sendTimeoutMs = sendTimeoutMs != null ? sendTimeoutMs : IotDataSinkWebSocketConfig.DEFAULT_SEND_TIMEOUT_MS;
this.dataFormat = dataFormat != null ? dataFormat : IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT;
}
/**
* 连接到 WebSocket 服务器
*/
@SuppressWarnings("resource")
public void connect() throws Exception {
if (connected.get()) {
log.warn("[connect][WebSocket 客户端已经连接,无需重复连接]");
return;
}
try {
HttpClient httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
.build();
CompletableFuture<WebSocket> future = httpClient.newWebSocketBuilder()
.connectTimeout(Duration.ofMillis(connectTimeoutMs))
.buildAsync(URI.create(serverUrl), this);
// 等待连接完成
webSocket = future.get(connectTimeoutMs, TimeUnit.MILLISECONDS);
connected.set(true);
log.info("[connect][WebSocket 客户端连接成功,服务器地址: {}]", serverUrl);
} catch (Exception e) {
close();
log.error("[connect][WebSocket 客户端连接失败,服务器地址: {}]", serverUrl, e);
throw e;
}
}
@Override
public void onOpen(WebSocket webSocket) {
log.debug("[onOpen][WebSocket 连接已打开]");
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
messageBuffer.append(data);
if (last) {
log.debug("[onText][收到 WebSocket 消息: {}]", messageBuffer);
messageBuffer.setLength(0);
}
webSocket.request(1);
return null;
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
connected.set(false);
log.info("[onClose][WebSocket 连接已关闭,状态码: {},原因: {}]", statusCode, reason);
return null;
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
connected.set(false);
log.error("[onError][WebSocket 发生错误]", error);
}
/**
* 发送设备消息
*
* @param message 设备消息
* @throws Exception 发送异常
*/
public void sendMessage(IotDeviceMessage message) throws Exception {
if (!connected.get() || webSocket == null) {
throw new IllegalStateException("WebSocket 客户端未连接");
}
try {
String messageData;
if (IotDataSinkWebSocketConfig.DEFAULT_DATA_FORMAT.equalsIgnoreCase(dataFormat)) {
messageData = JsonUtils.toJsonString(message);
} else {
messageData = message.toString();
}
// 发送消息并等待完成
CompletableFuture<WebSocket> future = webSocket.sendText(messageData, true);
future.get(sendTimeoutMs, TimeUnit.MILLISECONDS);
log.debug("[sendMessage][发送消息成功,设备 ID: {},消息长度: {}]",
message.getDeviceId(), messageData.length());
} catch (Exception e) {
log.error("[sendMessage][发送消息失败,设备 ID: {}]", message.getDeviceId(), e);
throw e;
}
}
/**
* 关闭连接
*/
public void close() {
if (!connected.get() && webSocket == null) {
return;
}
try {
if (webSocket != null) {
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "客户端主动关闭")
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(e -> {
log.warn("[close][发送关闭帧失败]", e);
return null;
});
}
connected.set(false);
log.info("[close][WebSocket 客户端连接已关闭,服务器地址: {}]", serverUrl);
} catch (Exception e) {
log.error("[close][关闭 WebSocket 客户端连接异常]", e);
}
}
/**
* 检查连接状态
*
* @return 是否已连接
*/
public boolean isConnected() {
return connected.get() && webSocket != null;
}
@Override
public String toString() {
return "IotWebSocketClient{" +
"serverUrl='" + serverUrl + '\'' +
", dataFormat='" + dataFormat + '\'' +
", connected=" + connected.get() +
'}';
}
}

View File

@@ -30,6 +30,7 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.List;
@@ -392,9 +393,25 @@ public class IotSceneRuleServiceImpl implements IotSceneRuleService {
}
});
});
// 3. 更新最后触发时间
updateLastTriggerTime(sceneRule.getId());
});
}
/**
* 更新规则场景的最后触发时间
*
* @param id 规则场景编号
*/
private void updateLastTriggerTime(Long id) {
try {
sceneRuleMapper.updateById(new IotSceneRuleDO().setId(id).setLastTriggerTime(LocalDateTime.now()));
} catch (Exception e) {
log.error("[updateLastTriggerTime][规则场景编号({}) 更新最后触发时间异常]", id, e);
}
}
private IotSceneRuleServiceImpl getSelf() {
return SpringUtil.getBean(IotSceneRuleServiceImpl.class);
}

View File

@@ -15,6 +15,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* IoT 设备属性设置的 {@link IotSceneRuleAction} 实现类

View File

@@ -15,7 +15,9 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* IoT 设备服务调用的 {@link IotSceneRuleAction} 实现类

View File

@@ -36,11 +36,12 @@ public class IotDevicePropertyPostTriggerMatcher implements IotSceneRuleTriggerM
return false;
}
// 1.3 检查标识符是否匹配
String messageIdentifier = IotDeviceMessageUtils.getIdentifier(message);
if (!IotSceneRuleMatcherHelper.isIdentifierMatched(trigger.getIdentifier(), messageIdentifier)) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "标识符不匹配,期望: " +
trigger.getIdentifier() + ", 实际: " + messageIdentifier);
// 1.3 检查消息中是否包含触发器指定的属性标识符
// 注意:属性上报可能同时上报多个属性,所以需要判断 trigger.getIdentifier() 是否在 message 的 params 中
// TODO @puhui999可以考虑 notXXX 方法,简化代码(尽量取反)
if (!IotDeviceMessageUtils.containsIdentifier(message, trigger.getIdentifier())) {
IotSceneRuleMatcherHelper.logTriggerMatchFailure(message, trigger, "消息中不包含属性: " +
trigger.getIdentifier());
return false;
}