### iot物联网网络中间件(2.0) 使用java语言且基于netty, spring boot, redis等开源项目开发来的物联网网络中间件, 支持udp, tcp底层协议和http, mqtt, modbus等上层协议. 主打工业物联网底层网络交互、设备管理、数据存储、大数据处理 #### 主要特性 1. 支持服务端启动监听多个端口, 统一所有协议可使用的api接口 2. 包含一套代理客户端通信协议,支持调用:客户端 -> 服务端 -> 设备 -> 服务端 -> 客户端 3. 支持设备协议对象和其业务对象进行分离(支持默认业务处理器【spring单例注入】和自定义业务处理器) 4. 支持redis, 可以方便的将设备上报的数据进行缓存到redis和从redis获取数据 5. 支持对数据的生产采集和数据的消费进行分离 6. 支持同步和异步调用设备, 支持应用程序客户端和设备服务端和设备三端之间的同步和异步调用 7. 服务端支持设备上线/下线/异常的事件通知, 支持自定义心跳事件, 客户端支持断线重连 8. 丰富的日志打印功能,包括设备上线,下线提示, 一个协议的生命周期(请求或者请求+响应)等 9. 支持本地和服务端的并发测试 #### 支持的协议 1. tcp(固定长度解码, 长度字段解码, 换行符解码,自定义分隔符解码,自定义字节到报文解码)、udp(已完成) 2. mqtt协议客户端(已完成) 3. 新增modbus支持(待开发) 4. 新增plc支持(待开发) #### 更新日志 1. 完成和测试mqtt协议的客户端实现(2021/9/7) #### 合作方式(qq: 97235681) 1. 设备对接(不包括业务, 只负责设备协议部分,按设备协议数量收费(500+)) 2. 物联网系统开发(包括设备和业务完整系统) 3. 为厂家提供专门的设备交互sdk(java) #### 使用教程(以固定长度解码器为例) ##### 添加服务端口 **1. 创建一个springboot应用, 在maven文件里面引入服务端jar包** ``` 2.0.0 com.iteaj iot-server ``` **2. 创建组件报文类** ``` package com.iteaj.iot.test.server.fixed; import com.iteaj.iot.server.ServerMessage; import com.iteaj.iot.test.MessageCreator; /** * 固定长度报文 28 * 8 8 4 8 * 设备编号 + messageId + type + 递增值 * _注:服务端报文必须继承父类:ServerMessage _ */ public class FixedLengthServerMessage extends ServerMessage { // 注意:此构造函数一定要存在 public FixedLengthServerMessage(byte[] message) { super(message); } public FixedLengthServerMessage(MessageHead head) { super(head); } public FixedLengthServerMessage(MessageHead head, MessageBody body) { super(head, body); } // 解析出客户端请求报文的报文头 @Override protected MessageHead doBuild(byte[] message) { return MessageCreator.buildFixedMessageHead(message); } } ``` **3. 创建固定长度解码器组件** ``` package com.iteaj.iot.test.server.fixed; import com.iteaj.iot.AbstractProtocol; import com.iteaj.iot.config.ConnectProperties; import com.iteaj.iot.server.component.FixedLengthFrameDecoderComponent; import com.iteaj.iot.server.protocol.HeartProtocol; import com.iteaj.iot.test.TestProtocolType; /** * 固定长度解码器组件测试 * _注意:此组件必须交由spring容器管理_ * _注意:此处必须指定此组件使用的报文类型_ */ @Component public class TestFixedLengthDecoderComponent extends FixedLengthFrameDecoderComponent { // 构造函数需要指定要监听的端口 public TestFixedLengthDecoderComponent(ConnectProperties connectProperties) { super(connectProperties, 28); } @Override public String getDesc() { return "用于测试服务端固定长度字段解码器[FixedLengthFrameDecoder]"; } /** * 通过报文头获取对应的协议 */ @Override public AbstractProtocol getProtocol(FixedLengthServerMessage message) { TestProtocolType protocolType = message.getHead().getType(); // 心跳类型返回心跳协议 if(protocolType == TestProtocolType.Heart) { return HeartProtocol.getInstance(message); // 客户端主动请求服务器类型 - new一个协议对象 } else if(protocolType == TestProtocolType.CIReq) { return new FixedLengthClientRequestProtocol(message); } else { // 客户端响应服务器协议类型 - 移除保存在服务端的协议 return remove(message.getHead().getMessageId()); } } @Override public String getName() { return "固定长度字段解码组件"; } } ``` ``` // 说明:以下是已经适配好的常用的解码器组件 1. DelimiterBasedFrameDecoderComponent 自定义分隔符解码器组件 2. FixedLengthFrameDecoderComponent 固定长度解码器组件 3. LengthFieldBasedFrameDecoderComponent 长度字段解码器组件 4. LineBasedFrameDecoderComponent 换行符解码器组件 5. ByteToMessageDecoderComponent 自定义字节到报文解码器组件 ``` 4. 创建交互协议对象 交互协议包含两种 ``` // 1. 客户端主动请求服务端,然后服务端响应客户端(比如设备要获取服务器的当前时间) package com.iteaj.iot.test.server.fixed; import com.iteaj.iot.Message; import com.iteaj.iot.ProtocolType; import com.iteaj.iot.server.manager.DevicePipelineManager; import com.iteaj.iot.server.protocol.ClientInitiativeProtocol; import com.iteaj.iot.test.MessageCreator; import com.iteaj.iot.test.TestProtocolType; public class FixedLengthClientRequestProtocol extends ClientInitiativeProtocol { public FixedLengthClientRequestProtocol(FixedLengthServerMessage requestMessage) { super(requestMessage); } // 构建要响应给客户端的报文 @Override protected FixedLengthServerMessage doBuildResponseMessage() { Message.MessageHead head = requestMessage().getHead(); String equipCode = head.getEquipCode(); String messageId = head.getMessageId(); DevicePipelineManager instance = DevicePipelineManager.getInstance(); return MessageCreator.buildFixedLengthServerMessage(equipCode, messageId, instance.size(), head.getType()); } // 解析出客户端请求的报文内容 @Override protected void doBuildRequestMessage(FixedLengthServerMessage requestMessage) { } @Override public ProtocolType protocolType() { return TestProtocolType.CIReq; } } // 2. 服务端主动请求客户端, 然后客户端响应服务端(比如服务端控制设备的开关状态) ``` #### 2. 架构图 ![iot架构图](https://images.gitee.com/uploads/images/2021/0303/210010_da7cfaa4_1230742.png "iot.png") 1. 设备端和设备管理端同步和异步如线3、4所示 * 平台主动向设备发起请求4->3:如果是同步调用则线程将等待设备返回在往下执行业务, 如果设备由于某些原因没有返回则将在指定的超时时间内解锁线程继续向下执行业务。如果是异步调用则在设备响应后会执行业务,并不会锁住当前调用的线程 * 设备主动向平台发起请求3->4:如线3设备主动发起请求给平台, 平台将先解析设备请求的报文然后调用执行业务,执行完业务之后如线4平台将生成需要响应给设备的报文并推送给设备 2. 应用管理端和设备管理端同步和异步(1、2所示)和1相同的流程 3. 应用管理端和设备进行同步和异步调用如:1、2、3、4 * 如果是同步:由应用客户端发起调用如线1所示,发起调用的线程将阻塞, 然后如线4所示设备管理端向设备发起异步调用,接着等待设备响应如线3,再然后设备管理端响应应用客户端如线2,最后将释放应用客户端的线程锁继续向下执行 * 如果是异步:和同步的区别是应用客户端不加锁