protocols;
@Data
public static class RpcProperties {
@@ -65,582 +76,158 @@ public class IotGatewayProperties {
}
+ /**
+ * 协议实例配置
+ */
@Data
public static class ProtocolProperties {
/**
- * HTTP 组件配置
+ * 协议实例 ID,如 "http-alink"、"tcp-binary"
*/
- private HttpProperties http;
-
+ @NotEmpty(message = "协议实例 ID 不能为空")
+ private String id;
/**
- * EMQX 组件配置
+ * 是否启用
*/
- private EmqxProperties emqx;
-
+ @NotNull(message = "是否启用不能为空")
+ private Boolean enabled = true;
/**
- * TCP 组件配置
+ * 协议类型
+ *
+ * @see cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum
*/
- private TcpProperties tcp;
-
- /**
- * MQTT 组件配置
- */
- private MqttProperties mqtt;
-
- /**
- * MQTT WebSocket 组件配置
- */
- private MqttWsProperties mqttWs;
-
- /**
- * UDP 组件配置
- */
- private UdpProperties udp;
-
- /**
- * CoAP 组件配置
- */
- private CoapProperties coap;
-
- /**
- * WebSocket 组件配置
- */
- private WebSocketProperties websocket;
-
- }
-
- @Data
- public static class HttpProperties {
-
- /**
- * 是否开启
- */
- @NotNull(message = "是否开启不能为空")
- private Boolean enabled;
+ @NotEmpty(message = "协议类型不能为空")
+ private String protocol;
/**
* 服务端口
- */
- private Integer serverPort;
-
- /**
- * 是否开启 SSL
- */
- @NotNull(message = "是否开启 SSL 不能为空")
- private Boolean sslEnabled = false;
-
- /**
- * SSL 证书路径
- */
- private String sslKeyPath;
- /**
- * SSL 证书路径
- */
- private String sslCertPath;
-
- }
-
- @Data
- public static class EmqxProperties {
-
- /**
- * 是否开启
- */
- @NotNull(message = "是否开启不能为空")
- private Boolean enabled;
-
- /**
- * HTTP 服务端口(默认:8090)
- */
- private Integer httpPort = 8090;
-
- /**
- * MQTT 服务器地址
- */
- @NotEmpty(message = "MQTT 服务器地址不能为空")
- private String mqttHost;
-
- /**
- * MQTT 服务器端口(默认:1883)
- */
- @NotNull(message = "MQTT 服务器端口不能为空")
- private Integer mqttPort = 1883;
-
- /**
- * MQTT 用户名
- */
- @NotEmpty(message = "MQTT 用户名不能为空")
- private String mqttUsername;
-
- /**
- * MQTT 密码
- */
- @NotEmpty(message = "MQTT 密码不能为空")
- private String mqttPassword;
-
- /**
- * MQTT 客户端的 SSL 开关
- */
- @NotNull(message = "MQTT 是否开启 SSL 不能为空")
- private Boolean mqttSsl = false;
-
- /**
- * MQTT 客户端 ID(如果为空,系统将自动生成)
- */
- @NotEmpty(message = "MQTT 客户端 ID 不能为空")
- private String mqttClientId;
-
- /**
- * MQTT 订阅的主题
- */
- @NotEmpty(message = "MQTT 主题不能为空")
- private List<@NotEmpty(message = "MQTT 主题不能为空") String> mqttTopics;
-
- /**
- * 默认 QoS 级别
*
- * 0 - 最多一次
- * 1 - 至少一次
- * 2 - 刚好一次
+ * 不同协议含义不同:
+ * 1. TCP/UDP/HTTP/WebSocket/MQTT/CoAP:对应网关自身监听的服务端口
+ * 2. EMQX:对应网关提供给 EMQX 回调的 HTTP Hook 端口(/mqtt/auth、/mqtt/acl、/mqtt/event)
*/
- private Integer mqttQos = 1;
+ @NotNull(message = "服务端口不能为空")
+ private Integer port;
+ /**
+ * 序列化类型(可选)
+ *
+ * @see cn.iocoder.yudao.module.iot.core.enums.IotSerializeTypeEnum
+ *
+ * 为什么是可选的呢?
+ * 1. {@link IotProtocolTypeEnum#HTTP}、{@link IotProtocolTypeEnum#COAP} 协议,目前强制是 JSON 格式
+ * 2. {@link IotProtocolTypeEnum#EMQX} 协议,目前支持根据产品(设备)配置的序列化类型来解析
+ */
+ private String serialize;
+
+ // ========== SSL 配置 ==========
/**
- * 连接超时时间(秒)
+ * SSL 配置(可选,配置文件中不配置则为 null)
*/
- private Integer connectTimeoutSeconds = 10;
+ @Valid
+ private SslConfig ssl;
+
+ // ========== 各协议配置 ==========
/**
- * 重连延迟时间(毫秒)
+ * HTTP 协议配置
*/
- private Long reconnectDelayMs = 5000L;
+ @Valid
+ private IotHttpConfig http;
+ /**
+ * WebSocket 协议配置
+ */
+ @Valid
+ private IotWebSocketConfig websocket;
/**
- * 是否启用 Clean Session (清理会话)
- * true: 每次连接都是新会话,Broker 不保留离线消息和订阅关系。
- * 对于网关这类“永远在线”且会主动重新订阅的应用,建议为 true。
+ * TCP 协议配置
*/
- private Boolean cleanSession = true;
+ @Valid
+ private IotTcpConfig tcp;
+ /**
+ * UDP 协议配置
+ */
+ @Valid
+ private IotUdpConfig udp;
/**
- * 心跳间隔(秒)
- * 用于保持连接活性,及时发现网络中断。
+ * CoAP 协议配置
*/
- private Integer keepAliveIntervalSeconds = 60;
+ @Valid
+ private IotCoapConfig coap;
/**
- * 最大未确认消息队列大小
- * 限制已发送但未收到 Broker 确认的 QoS 1/2 消息数量,用于流量控制。
+ * MQTT 协议配置
*/
- private Integer maxInflightQueue = 10000;
+ @Valid
+ private IotMqttConfig mqtt;
+ /**
+ * EMQX 协议配置
+ */
+ @Valid
+ private IotEmqxConfig emqx;
/**
- * 是否信任所有 SSL 证书
- * 警告:此配置会绕过证书验证,仅建议在开发和测试环境中使用!
- * 在生产环境中,应设置为 false,并配置正确的信任库。
+ * Modbus TCP Client 协议配置
*/
- private Boolean trustAll = false;
+ @Valid
+ private IotModbusTcpClientConfig modbusTcpClient;
/**
- * 遗嘱消息配置 (用于网关异常下线时通知其他系统)
+ * Modbus TCP Server 协议配置
*/
- private final Will will = new Will();
-
- /**
- * 高级 SSL/TLS 配置 (用于生产环境)
- */
- private final Ssl sslOptions = new Ssl();
-
- /**
- * 遗嘱消息 (Last Will and Testament)
- */
- @Data
- public static class Will {
-
- /**
- * 是否启用遗嘱消息
- */
- private boolean enabled = false;
- /**
- * 遗嘱消息主题
- */
- private String topic;
- /**
- * 遗嘱消息内容
- */
- private String payload;
- /**
- * 遗嘱消息 QoS 等级
- */
- private Integer qos = 1;
- /**
- * 遗嘱消息是否作为保留消息发布
- */
- private boolean retain = true;
-
- }
-
- /**
- * 高级 SSL/TLS 配置
- */
- @Data
- public static class Ssl {
-
- /**
- * 密钥库(KeyStore)路径,例如:classpath:certs/client.jks
- * 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)。
- */
- private String keyStorePath;
- /**
- * 密钥库密码
- */
- private String keyStorePassword;
- /**
- * 信任库(TrustStore)路径,例如:classpath:certs/trust.jks
- * 包含服务端信任的 CA 证书,用于验证服务端的身份,防止中间人攻击。
- */
- private String trustStorePath;
- /**
- * 信任库密码
- */
- private String trustStorePassword;
-
- }
+ @Valid
+ private IotModbusTcpServerConfig modbusTcpServer;
}
+ /**
+ * SSL 配置
+ */
@Data
- public static class TcpProperties {
-
- /**
- * 是否开启
- */
- @NotNull(message = "是否开启不能为空")
- private Boolean enabled;
-
- /**
- * 服务器端口
- */
- private Integer port = 8091;
-
- /**
- * 心跳超时时间(毫秒)
- */
- private Long keepAliveTimeoutMs = 30000L;
-
- /**
- * 最大连接数
- */
- private Integer maxConnections = 1000;
-
- /**
- * 是否启用SSL
- */
- private Boolean sslEnabled = false;
-
- /**
- * SSL证书路径
- */
- private String sslCertPath;
-
- /**
- * SSL私钥路径
- */
- private String sslKeyPath;
-
- }
-
- @Data
- public static class MqttProperties {
-
- /**
- * 是否开启
- */
- @NotNull(message = "是否开启不能为空")
- private Boolean enabled;
-
- /**
- * 服务器端口
- */
- private Integer port = 1883;
-
- /**
- * 最大消息大小(字节)
- */
- private Integer maxMessageSize = 8192;
-
- /**
- * 连接超时时间(秒)
- */
- private Integer connectTimeoutSeconds = 60;
- /**
- * 保持连接超时时间(秒)
- */
- private Integer keepAliveTimeoutSeconds = 300;
+ public static class SslConfig {
/**
* 是否启用 SSL
*/
- private Boolean sslEnabled = false;
- /**
- * SSL 配置
- */
- private SslOptions sslOptions = new SslOptions();
-
- /**
- * SSL 配置选项
- */
- @Data
- public static class SslOptions {
-
- /**
- * 密钥证书选项
- */
- private io.vertx.core.net.KeyCertOptions keyCertOptions;
- /**
- * 信任选项
- */
- private io.vertx.core.net.TrustOptions trustOptions;
- /**
- * SSL 证书路径
- */
- private String certPath;
- /**
- * SSL 私钥路径
- */
- private String keyPath;
- /**
- * 信任存储路径
- */
- private String trustStorePath;
- /**
- * 信任存储密码
- */
- private String trustStorePassword;
-
- }
-
- }
-
- @Data
- public static class MqttWsProperties {
-
- /**
- * 是否开启
- */
- @NotNull(message = "是否开启不能为空")
- private Boolean enabled;
-
- /**
- * WebSocket 服务器端口(默认:8083)
- */
- private Integer port = 8083;
-
- /**
- * WebSocket 路径(默认:/mqtt)
- */
- @NotEmpty(message = "WebSocket 路径不能为空")
- private String path = "/mqtt";
-
- /**
- * 最大消息大小(字节)
- */
- private Integer maxMessageSize = 8192;
-
- /**
- * 连接超时时间(秒)
- */
- private Integer connectTimeoutSeconds = 60;
-
- /**
- * 保持连接超时时间(秒)
- */
- private Integer keepAliveTimeoutSeconds = 300;
-
- /**
- * 是否启用 SSL(wss://)
- */
- private Boolean sslEnabled = false;
-
- /**
- * SSL 配置
- */
- private SslOptions sslOptions = new SslOptions();
-
- /**
- * WebSocket 子协议(通常为 "mqtt" 或 "mqttv3.1")
- */
- @NotEmpty(message = "WebSocket 子协议不能为空")
- private String subProtocol = "mqtt";
-
- /**
- * 最大帧大小(字节)
- */
- private Integer maxFrameSize = 65536;
-
- /**
- * SSL 配置选项
- */
- @Data
- public static class SslOptions {
-
- /**
- * 密钥证书选项
- */
- private io.vertx.core.net.KeyCertOptions keyCertOptions;
-
- /**
- * 信任选项
- */
- private io.vertx.core.net.TrustOptions trustOptions;
-
- /**
- * SSL 证书路径
- */
- private String certPath;
-
- /**
- * SSL 私钥路径
- */
- private String keyPath;
-
- /**
- * 信任存储路径
- */
- private String trustStorePath;
-
- /**
- * 信任存储密码
- */
- private String trustStorePassword;
-
- }
-
- }
-
- @Data
- public static class UdpProperties {
-
- /**
- * 是否开启
- */
- @NotNull(message = "是否开启不能为空")
- private Boolean enabled;
-
- /**
- * 服务端口(默认 8093)
- */
- private Integer port = 8093;
-
- /**
- * 接收缓冲区大小(默认 64KB)
- */
- private Integer receiveBufferSize = 65536;
-
- /**
- * 发送缓冲区大小(默认 64KB)
- */
- private Integer sendBufferSize = 65536;
-
- /**
- * 会话超时时间(毫秒,默认 60 秒)
- *
- * 用于清理不活跃的设备地址映射
- */
- private Long sessionTimeoutMs = 60000L;
-
- /**
- * 会话清理间隔(毫秒,默认 30 秒)
- */
- private Long sessionCleanIntervalMs = 30000L;
-
- }
-
- @Data
- public static class CoapProperties {
-
- /**
- * 是否开启
- */
- @NotNull(message = "是否开启不能为空")
- private Boolean enabled;
-
- /**
- * 服务端口(CoAP 默认端口 5683)
- */
- @NotNull(message = "服务端口不能为空")
- private Integer port = 5683;
-
- /**
- * 最大消息大小(字节)
- */
- @NotNull(message = "最大消息大小不能为空")
- private Integer maxMessageSize = 1024;
-
- /**
- * ACK 超时时间(毫秒)
- */
- @NotNull(message = "ACK 超时时间不能为空")
- private Integer ackTimeout = 2000;
-
- /**
- * 最大重传次数
- */
- @NotNull(message = "最大重传次数不能为空")
- private Integer maxRetransmit = 4;
-
- }
-
- @Data
- public static class WebSocketProperties {
-
- /**
- * 是否开启
- */
- @NotNull(message = "是否开启不能为空")
- private Boolean enabled;
-
- /**
- * 服务器端口(默认:8094)
- */
- private Integer port = 8094;
-
- /**
- * WebSocket 路径(默认:/ws)
- */
- @NotEmpty(message = "WebSocket 路径不能为空")
- private String path = "/ws";
-
- /**
- * 最大消息大小(字节,默认 64KB)
- */
- private Integer maxMessageSize = 65536;
-
- /**
- * 最大帧大小(字节,默认 64KB)
- */
- private Integer maxFrameSize = 65536;
-
- /**
- * 空闲超时时间(秒,默认 60)
- */
- private Integer idleTimeoutSeconds = 60;
-
- /**
- * 是否启用 SSL(wss://)
- */
- private Boolean sslEnabled = false;
+ @NotNull(message = "是否启用 SSL 不能为空")
+ private Boolean ssl = false;
/**
* SSL 证书路径
*/
+ @NotEmpty(message = "SSL 证书路径不能为空")
private String sslCertPath;
/**
* SSL 私钥路径
*/
+ @NotEmpty(message = "SSL 私钥路径不能为空")
private String sslKeyPath;
+ /**
+ * 密钥库(KeyStore)路径
+ *
+ * 包含客户端自己的证书和私钥,用于向服务端证明身份(双向认证)
+ */
+ private String keyStorePath;
+ /**
+ * 密钥库密码
+ */
+ private String keyStorePassword;
+
+ /**
+ * 信任库(TrustStore)路径
+ *
+ * 包含服务端信任的 CA 证书,用于验证服务端的身份
+ */
+ private String trustStorePath;
+ /**
+ * 信任库密码
+ */
+ private String trustStorePassword;
+
}
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/AbstractIotProtocolDownstreamSubscriber.java
similarity index 57%
rename from yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java
rename to yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/AbstractIotProtocolDownstreamSubscriber.java
index 61bf12376..efd61e13a 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/emqx/IotEmqxDownstreamSubscriber.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/AbstractIotProtocolDownstreamSubscriber.java
@@ -1,49 +1,53 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.emqx;
+package cn.iocoder.yudao.module.iot.gateway.protocol;
+import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
-import cn.iocoder.yudao.module.iot.gateway.protocol.emqx.router.IotEmqxDownstreamHandler;
-import jakarta.annotation.PostConstruct;
+import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
- * IoT 网关 EMQX 订阅者:接收下行给设备的消息
+ * IoT 协议下行消息订阅者抽象类
+ *
+ * 负责接收来自消息总线的下行消息,并委托给子类进行业务处理
*
* @author 芋道源码
*/
+@AllArgsConstructor
@Slf4j
-public class IotEmqxDownstreamSubscriber implements IotMessageSubscriber {
+public abstract class AbstractIotProtocolDownstreamSubscriber implements IotMessageSubscriber {
- private final IotEmqxDownstreamHandler downstreamHandler;
+ private final IotProtocol protocol;
private final IotMessageBus messageBus;
- private final IotEmqxUpstreamProtocol protocol;
-
- public IotEmqxDownstreamSubscriber(IotEmqxUpstreamProtocol protocol, IotMessageBus messageBus) {
- this.protocol = protocol;
- this.messageBus = messageBus;
- this.downstreamHandler = new IotEmqxDownstreamHandler(protocol);
- }
-
- @PostConstruct
- public void init() {
- messageBus.register(this);
- }
-
@Override
public String getTopic() {
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
+ /**
+ * 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group
+ */
@Override
public String getGroup() {
- // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group
return getTopic();
}
+ @Override
+ public void start() {
+ messageBus.register(this);
+ log.info("[start][{} 下行消息订阅成功,Topic:{}]", protocol.getType().name(), getTopic());
+ }
+
+ @Override
+ public void stop() {
+ messageBus.unregister(this);
+ log.info("[stop][{} 下行消息订阅已停止,Topic:{}]", protocol.getType().name(), getTopic());
+ }
+
@Override
public void onMessage(IotDeviceMessage message) {
log.debug("[onMessage][接收到下行消息, messageId: {}, method: {}, deviceId: {}]",
@@ -51,18 +55,25 @@ public class IotEmqxDownstreamSubscriber implements IotMessageSubscriber protocols = new ArrayList<>();
+
+ @Getter
+ private volatile boolean running = false;
+
+ public IotProtocolManager(IotGatewayProperties gatewayProperties) {
+ this.gatewayProperties = gatewayProperties;
+ }
+
+ @Override
+ public void start() {
+ if (running) {
+ return;
+ }
+ List protocolConfigs = gatewayProperties.getProtocols();
+ if (CollUtil.isEmpty(protocolConfigs)) {
+ log.info("[start][没有配置协议实例,跳过启动]");
+ return;
+ }
+
+ for (IotGatewayProperties.ProtocolProperties config : protocolConfigs) {
+ if (BooleanUtil.isFalse(config.getEnabled())) {
+ log.info("[start][协议实例 {} 未启用,跳过]", config.getId());
+ continue;
+ }
+ IotProtocol protocol = createProtocol(config);
+ if (protocol == null) {
+ continue;
+ }
+ protocol.start();
+ protocols.add(protocol);
+ }
+ running = true;
+ log.info("[start][协议管理器启动完成,共启动 {} 个协议实例]", protocols.size());
+ }
+
+ @Override
+ public void stop() {
+ if (!running) {
+ return;
+ }
+ for (IotProtocol protocol : protocols) {
+ try {
+ protocol.stop();
+ } catch (Exception e) {
+ log.error("[stop][协议实例 {} 停止失败]", protocol.getId(), e);
+ }
+ }
+ protocols.clear();
+ running = false;
+ log.info("[stop][协议管理器已停止]");
+ }
+
+ /**
+ * 创建协议实例
+ *
+ * @param config 协议实例配置
+ * @return 协议实例
+ */
+ @SuppressWarnings({"EnhancedSwitchMigration"})
+ private IotProtocol createProtocol(IotGatewayProperties.ProtocolProperties config) {
+ IotProtocolTypeEnum protocolType = IotProtocolTypeEnum.of(config.getProtocol());
+ if (protocolType == null) {
+ log.error("[createProtocol][协议实例 {} 的协议类型 {} 不存在]", config.getId(), config.getProtocol());
+ return null;
+ }
+ switch (protocolType) {
+ case HTTP:
+ return createHttpProtocol(config);
+ case TCP:
+ return createTcpProtocol(config);
+ case UDP:
+ return createUdpProtocol(config);
+ case COAP:
+ return createCoapProtocol(config);
+ case WEBSOCKET:
+ return createWebSocketProtocol(config);
+ case MQTT:
+ return createMqttProtocol(config);
+ case EMQX:
+ return createEmqxProtocol(config);
+ case MODBUS_TCP_CLIENT:
+ return createModbusTcpClientProtocol(config);
+ case MODBUS_TCP_SERVER:
+ return createModbusTcpServerProtocol(config);
+ default:
+ throw new IllegalArgumentException(String.format(
+ "[createProtocol][协议实例 %s 的协议类型 %s 暂不支持]", config.getId(), protocolType));
+ }
+ }
+
+ /**
+ * 创建 HTTP 协议实例
+ *
+ * @param config 协议实例配置
+ * @return HTTP 协议实例
+ */
+ private IotHttpProtocol createHttpProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotHttpProtocol(config);
+ }
+
+ /**
+ * 创建 TCP 协议实例
+ *
+ * @param config 协议实例配置
+ * @return TCP 协议实例
+ */
+ private IotTcpProtocol createTcpProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotTcpProtocol(config);
+ }
+
+ /**
+ * 创建 UDP 协议实例
+ *
+ * @param config 协议实例配置
+ * @return UDP 协议实例
+ */
+ private IotUdpProtocol createUdpProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotUdpProtocol(config);
+ }
+
+ /**
+ * 创建 CoAP 协议实例
+ *
+ * @param config 协议实例配置
+ * @return CoAP 协议实例
+ */
+ private IotCoapProtocol createCoapProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotCoapProtocol(config);
+ }
+
+ /**
+ * 创建 WebSocket 协议实例
+ *
+ * @param config 协议实例配置
+ * @return WebSocket 协议实例
+ */
+ private IotWebSocketProtocol createWebSocketProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotWebSocketProtocol(config);
+ }
+
+ /**
+ * 创建 MQTT 协议实例
+ *
+ * @param config 协议实例配置
+ * @return MQTT 协议实例
+ */
+ private IotMqttProtocol createMqttProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotMqttProtocol(config);
+ }
+
+ /**
+ * 创建 EMQX 协议实例
+ *
+ * @param config 协议实例配置
+ * @return EMQX 协议实例
+ */
+ private IotEmqxProtocol createEmqxProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotEmqxProtocol(config);
+ }
+
+ /**
+ * 创建 Modbus TCP Client 协议实例
+ *
+ * @param config 协议实例配置
+ * @return Modbus TCP Client 协议实例
+ */
+ private IotModbusTcpClientProtocol createModbusTcpClientProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotModbusTcpClientProtocol(config);
+ }
+
+ /**
+ * 创建 Modbus TCP Server 协议实例
+ *
+ * @param config 协议实例配置
+ * @return Modbus TCP Server 协议实例
+ */
+ private IotModbusTcpServerProtocol createModbusTcpServerProtocol(IotGatewayProperties.ProtocolProperties config) {
+ return new IotModbusTcpServerProtocol(config);
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapConfig.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapConfig.java
new file mode 100644
index 000000000..45fe3007e
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapConfig.java
@@ -0,0 +1,36 @@
+package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
+
+import jakarta.validation.constraints.Min;
+import jakarta.validation.constraints.NotNull;
+import lombok.Data;
+
+/**
+ * IoT CoAP 协议配置
+ *
+ * @author 芋道源码
+ */
+@Data
+public class IotCoapConfig {
+
+ /**
+ * 最大消息大小(字节)
+ */
+ @NotNull(message = "最大消息大小不能为空")
+ @Min(value = 64, message = "最大消息大小必须大于 64 字节")
+ private Integer maxMessageSize = 1024;
+
+ /**
+ * ACK 超时时间(毫秒)
+ */
+ @NotNull(message = "ACK 超时时间不能为空")
+ @Min(value = 100, message = "ACK 超时时间必须大于 100 毫秒")
+ private Integer ackTimeoutMs = 2000;
+
+ /**
+ * 最大重传次数
+ */
+ @NotNull(message = "最大重传次数不能为空")
+ @Min(value = 0, message = "最大重传次数必须大于等于 0")
+ private Integer maxRetransmit = 4;
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java
deleted file mode 100644
index d01cdc416..000000000
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapDownstreamSubscriber.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
-
-import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
-import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
-import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
-import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
-import jakarta.annotation.PostConstruct;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * IoT 网关 CoAP 订阅者:接收下行给设备的消息
- *
- * @author 芋道源码
- */
-@RequiredArgsConstructor
-@Slf4j
-public class IotCoapDownstreamSubscriber implements IotMessageSubscriber {
-
- private final IotCoapUpstreamProtocol protocol;
-
- private final IotMessageBus messageBus;
-
- @PostConstruct
- public void init() {
- messageBus.register(this);
- }
-
- @Override
- public String getTopic() {
- return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
- }
-
- @Override
- public String getGroup() {
- // 保证点对点消费,需要保证独立的 Group,所以使用 Topic 作为 Group
- return getTopic();
- }
-
- @Override
- public void onMessage(IotDeviceMessage message) {
- // 如需支持,可通过 CoAP Observe 模式实现(设备订阅资源,服务器推送变更)
- log.warn("[onMessage][IoT 网关 CoAP 协议暂不支持下行消息,忽略消息:{}]", message);
- }
-
-}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java
new file mode 100644
index 000000000..d797ef8bc
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapProtocol.java
@@ -0,0 +1,168 @@
+package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
+
+import cn.hutool.core.lang.Assert;
+import cn.hutool.extra.spring.SpringUtil;
+import cn.iocoder.yudao.module.iot.core.enums.IotProtocolTypeEnum;
+import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
+import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties.ProtocolProperties;
+import cn.iocoder.yudao.module.iot.gateway.protocol.IotProtocol;
+import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.downstream.IotCoapDownstreamSubscriber;
+import cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstream.*;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.californium.core.CoapResource;
+import org.eclipse.californium.core.CoapServer;
+import org.eclipse.californium.core.config.CoapConfig;
+import org.eclipse.californium.elements.config.Configuration;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * IoT CoAP 协议实现
+ *
+ * 基于 Eclipse Californium 实现,支持:
+ * 1. 认证:POST /auth
+ * 2. 设备动态注册:POST /auth/register/device
+ * 3. 子设备动态注册:POST /auth/register/sub-device/{productKey}/{deviceName}
+ * 4. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
+ * 5. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class IotCoapProtocol implements IotProtocol {
+
+ /**
+ * 协议配置
+ */
+ private final ProtocolProperties properties;
+ /**
+ * 服务器 ID(用于消息追踪,全局唯一)
+ */
+ @Getter
+ private final String serverId;
+
+ /**
+ * 运行状态
+ */
+ @Getter
+ private volatile boolean running = false;
+
+ /**
+ * CoAP 服务器
+ */
+ private CoapServer coapServer;
+
+ /**
+ * 下行消息订阅者
+ */
+ private IotCoapDownstreamSubscriber downstreamSubscriber;
+
+ public IotCoapProtocol(ProtocolProperties properties) {
+ IotCoapConfig coapConfig = properties.getCoap();
+ Assert.notNull(coapConfig, "CoAP 协议配置(coap)不能为空");
+ this.properties = properties;
+ this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
+ }
+
+ @Override
+ public String getId() {
+ return properties.getId();
+ }
+
+ @Override
+ public IotProtocolTypeEnum getType() {
+ return IotProtocolTypeEnum.COAP;
+ }
+
+ @Override
+ public void start() {
+ if (running) {
+ log.warn("[start][IoT CoAP 协议 {} 已经在运行中]", getId());
+ return;
+ }
+
+ try {
+ // 1.1 创建 CoAP 配置
+ IotCoapConfig coapConfig = properties.getCoap();
+ Configuration config = Configuration.createStandardWithoutFile();
+ config.set(CoapConfig.COAP_PORT, properties.getPort());
+ config.set(CoapConfig.MAX_MESSAGE_SIZE, coapConfig.getMaxMessageSize());
+ config.set(CoapConfig.ACK_TIMEOUT, coapConfig.getAckTimeoutMs(), TimeUnit.MILLISECONDS);
+ config.set(CoapConfig.MAX_RETRANSMIT, coapConfig.getMaxRetransmit());
+ // 1.2 创建 CoAP 服务器
+ coapServer = new CoapServer(config);
+
+ // 2.1 添加 /auth 认证资源
+ IotCoapAuthHandler authHandler = new IotCoapAuthHandler(serverId);
+ IotCoapAuthResource authResource = new IotCoapAuthResource(authHandler);
+ coapServer.add(authResource);
+ // 2.2 添加 /auth/register/device 设备动态注册资源(一型一密)
+ IotCoapRegisterHandler registerHandler = new IotCoapRegisterHandler();
+ IotCoapRegisterResource registerResource = new IotCoapRegisterResource(registerHandler);
+ // 2.3 添加 /auth/register/sub-device/{productKey}/{deviceName} 子设备动态注册资源
+ IotCoapRegisterSubHandler registerSubHandler = new IotCoapRegisterSubHandler();
+ IotCoapRegisterSubResource registerSubResource = new IotCoapRegisterSubResource(registerSubHandler);
+ authResource.add(new CoapResource("register") {{
+ add(registerResource);
+ add(registerSubResource);
+ }});
+ // 2.4 添加 /topic 根资源(用于上行消息)
+ IotCoapUpstreamHandler upstreamHandler = new IotCoapUpstreamHandler(serverId);
+ IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(serverId, upstreamHandler);
+ coapServer.add(topicResource);
+
+ // 3. 启动服务器
+ coapServer.start();
+ running = true;
+ log.info("[start][IoT CoAP 协议 {} 启动成功,端口:{},serverId:{}]",
+ getId(), properties.getPort(), serverId);
+
+ // 4. 启动下行消息订阅者
+ IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
+ this.downstreamSubscriber = new IotCoapDownstreamSubscriber(this, messageBus);
+ this.downstreamSubscriber.start();
+ } catch (Exception e) {
+ log.error("[start][IoT CoAP 协议 {} 启动失败]", getId(), e);
+ stop0();
+ throw e;
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (!running) {
+ return;
+ }
+ stop0();
+ }
+
+ private void stop0() {
+ // 1. 停止下行消息订阅者
+ if (downstreamSubscriber != null) {
+ try {
+ downstreamSubscriber.stop();
+ log.info("[stop][IoT CoAP 协议 {} 下行消息订阅者已停止]", getId());
+ } catch (Exception e) {
+ log.error("[stop][IoT CoAP 协议 {} 下行消息订阅者停止失败]", getId(), e);
+ }
+ downstreamSubscriber = null;
+ }
+
+ // 2. 关闭 CoAP 服务器
+ if (coapServer != null) {
+ try {
+ coapServer.stop();
+ coapServer.destroy();
+ coapServer = null;
+ log.info("[stop][IoT CoAP 协议 {} 服务器已停止]", getId());
+ } catch (Exception e) {
+ log.error("[stop][IoT CoAP 协议 {} 服务器停止失败]", getId(), e);
+ }
+ }
+ running = false;
+ log.info("[stop][IoT CoAP 协议 {} 已停止]", getId());
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java
deleted file mode 100644
index e10bd9889..000000000
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/IotCoapUpstreamProtocol.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package cn.iocoder.yudao.module.iot.gateway.protocol.coap;
-
-import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
-import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
-import cn.iocoder.yudao.module.iot.gateway.protocol.coap.router.*;
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.PreDestroy;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.californium.core.CoapResource;
-import org.eclipse.californium.core.CoapServer;
-import org.eclipse.californium.core.config.CoapConfig;
-import org.eclipse.californium.elements.config.Configuration;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * IoT 网关 CoAP 协议:接收设备上行消息
- *
- * 基于 Eclipse Californium 实现,支持:
- * 1. 认证:POST /auth
- * 2. 属性上报:POST /topic/sys/{productKey}/{deviceName}/thing/property/post
- * 3. 事件上报:POST /topic/sys/{productKey}/{deviceName}/thing/event/post
- *
- * @author 芋道源码
- */
-@Slf4j
-public class IotCoapUpstreamProtocol {
-
- private final IotGatewayProperties.CoapProperties coapProperties;
-
- private CoapServer coapServer;
-
- @Getter
- private final String serverId;
-
- public IotCoapUpstreamProtocol(IotGatewayProperties.CoapProperties coapProperties) {
- this.coapProperties = coapProperties;
- this.serverId = IotDeviceMessageUtils.generateServerId(coapProperties.getPort());
- }
-
- @PostConstruct
- public void start() {
- try {
- // 1.1 创建网络配置(Californium 3.x API)
- Configuration config = Configuration.createStandardWithoutFile();
- config.set(CoapConfig.COAP_PORT, coapProperties.getPort());
- config.set(CoapConfig.MAX_MESSAGE_SIZE, coapProperties.getMaxMessageSize());
- config.set(CoapConfig.ACK_TIMEOUT, coapProperties.getAckTimeout(), TimeUnit.MILLISECONDS);
- config.set(CoapConfig.MAX_RETRANSMIT, coapProperties.getMaxRetransmit());
- // 1.2 创建 CoAP 服务器
- coapServer = new CoapServer(config);
-
- // 2.1 添加 /auth 认证资源
- IotCoapAuthHandler authHandler = new IotCoapAuthHandler();
- IotCoapAuthResource authResource = new IotCoapAuthResource(this, authHandler);
- coapServer.add(authResource);
- // 2.2 添加 /auth/register/device 设备动态注册资源(一型一密)
- IotCoapRegisterHandler registerHandler = new IotCoapRegisterHandler();
- IotCoapRegisterResource registerResource = new IotCoapRegisterResource(registerHandler);
- authResource.add(new CoapResource("register") {{
- add(registerResource);
- }});
- // 2.3 添加 /topic 根资源(用于上行消息)
- IotCoapUpstreamHandler upstreamHandler = new IotCoapUpstreamHandler();
- IotCoapUpstreamTopicResource topicResource = new IotCoapUpstreamTopicResource(this, upstreamHandler);
- coapServer.add(topicResource);
-
- // 3. 启动服务器
- coapServer.start();
- log.info("[start][IoT 网关 CoAP 协议启动成功,端口:{},资源:/auth, /auth/register/device, /topic]", coapProperties.getPort());
- } catch (Exception e) {
- log.error("[start][IoT 网关 CoAP 协议启动失败]", e);
- throw e;
- }
- }
-
- @PreDestroy
- public void stop() {
- if (coapServer != null) {
- try {
- coapServer.stop();
- log.info("[stop][IoT 网关 CoAP 协议已停止]");
- } catch (Exception e) {
- log.error("[stop][IoT 网关 CoAP 协议停止失败]", e);
- }
- }
- }
-
-}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java
new file mode 100644
index 000000000..3309d2cd4
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/downstream/IotCoapDownstreamSubscriber.java
@@ -0,0 +1,27 @@
+package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.downstream;
+
+import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
+import cn.iocoder.yudao.module.iot.gateway.protocol.AbstractIotProtocolDownstreamSubscriber;
+import cn.iocoder.yudao.module.iot.gateway.protocol.coap.IotCoapProtocol;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * IoT 网关 CoAP 订阅者:接收下行给设备的消息
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class IotCoapDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {
+
+ public IotCoapDownstreamSubscriber(IotCoapProtocol protocol, IotMessageBus messageBus) {
+ super(protocol, messageBus);
+ }
+
+ @Override
+ protected void handleMessage(IotDeviceMessage message) {
+ // 如需支持,可通过 CoAP Observe 模式实现(设备订阅资源,服务器推送变更)
+ log.warn("[handleMessage][IoT 网关 CoAP 协议暂不支持下行消息,忽略消息:{}]", message);
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstream/IotCoapAbstractHandler.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstream/IotCoapAbstractHandler.java
new file mode 100644
index 000000000..994fb147d
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/coap/handler/upstream/IotCoapAbstractHandler.java
@@ -0,0 +1,186 @@
+package cn.iocoder.yudao.module.iot.gateway.protocol.coap.handler.upstream;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.ObjUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.extra.spring.SpringUtil;
+import cn.iocoder.yudao.framework.common.exception.ServiceException;
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.module.iot.core.topic.IotDeviceIdentity;
+import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.californium.core.coap.CoAP;
+import org.eclipse.californium.core.coap.MediaTypeRegistry;
+import org.eclipse.californium.core.coap.Option;
+import org.eclipse.californium.core.server.resources.CoapExchange;
+
+import java.util.List;
+
+import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.*;
+import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
+
+/**
+ * IoT 网关 CoAP 协议的处理器抽象基类:提供通用的前置处理(认证)、请求解析、响应处理、全局的异常捕获等
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public abstract class IotCoapAbstractHandler {
+
+ /**
+ * 自定义 CoAP Option 编号,用于携带 Token
+ *
+ * CoAP Option 范围 2048-65535 属于实验/自定义范围
+ */
+ public static final int OPTION_TOKEN = 2088;
+
+ private final IotDeviceTokenService deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
+
+ /**
+ * 处理 CoAP 请求(模板方法)
+ *
+ * @param exchange CoAP 交换对象
+ */
+ public final void handle(CoapExchange exchange) {
+ try {
+ // 1. 前置处理
+ beforeHandle(exchange);
+
+ // 2. 执行业务逻辑
+ CommonResult