为了更好的阅读体验,建议移步至笔者的博客阅读:JetLinks设备接入的认识与理解
官网:https://www.jetlinks.cn/
JetLinks 有两个产品:JetLinks-lot和JetLinks-view
官方文档:
JetLinks 是可支持多种方式接入设备的物联网设备管理平台
https://hanta.yuque.com/px7kg1/yfac2l/fwqriw24lp3cy2lw
JetLinks IOT 是一个开源的、企业级的物联网平台,它集成了设备管理、数据安全通信、消息订阅、规则引擎等一系列物联网核心能力,支持以平台适配设备的方式连接海量设备,采集设备数据上云,提供云端API,通过调用云端API实现远程控制。JetLinks物联网平台还支持多种设备接入协议,并提供了丰富的协议库。
支持:多协议(MQTT、HTTP、CoAP、UDP、TCP、WebSocket)自定义编解码插件接入;
支持:云平台对接接入;
支持: ModBus/TCP、OPC UA通道接入;
支持:基于GB/T 28181国标协议视频接入;
支持:自研边缘计算网关接入。
https://hanta.yuque.com/px7kg1/yfac2l/tvlxz93cht8zyl94
通过不同层级功能职责的封装、组合,以支持多设备、多协议接入平台
设备连接层:支持MQTT、TCP、UDP、CoAP、HTTP、WebSocket协议,提供统一设备接入的能力。
设备管理层:提供设备注册、配置、维护和监控的功能,支持设备属性、状态实时展示和历史属性、设备日志记录查询等。
业务逻辑层:提供规则引擎、数据转发和数据解析等功能,支持多种业务场景下的数据处理和交互操作。
应用开发层:提供RESTAPI和WebSocket接口,支持前端对接和自定义应用开发。同时还提供了可视化的数据展示和操作页面,方便用户快速搭建物联网应用系统。
设备接入JetLinks物联网平台后,可实现:设备通讯、数据的采集、认证、流转、存储、分析和实时监控
开发者需要自行实现编解码器逻辑,才可以让平台对设备数据进行全面管理
https://hanta.yuque.com/px7kg1/yfac2l/dagxgfzc3vnul0sn
产品是指一组具有相同功能和规格的设备集合,通常由同一家生产厂家制造。
设备可能是传感器、执行器、控制器等各种不同类型的物联网设备,它们可以通过网络连接到物联网平台。通过将这些设备组合到一个产品中,企业可以对这些设备进行统一管理和监控,以便更有效地控制其行为和状态。
设备是指物理存在的、可通过网络连接的单个物联网设备。
设备可以是各种类型的物品,例如传感器、执行器、控制器等。这些设备通过物联网连接到平台,以便与其他设备或应用程序进行通信、交换数据和接收命令。
物模型说明:http://doc.jetlinks.cn/function-description/metadata_description.html
物模型是物理空间中的实体在云端的数字化表示,有 4 个纬度:属性、功能、事件、标签。
属性:用于描述设备运行时具体信息和状态。例如温湿度传感器包含“温度”、“湿度”两个属性。
功能:设备可被外部调用的能力或方法,可设置输入参数和输出参数。相比于属性,服务可通过一条指令实现更复杂的业务逻辑
事件:用于描述设备上报云端的多个参数,多用于复杂报文结构或设备本身在某个阈值触发的报文。
标签:统一为设备添加拓展字段,添加后将在设备信息页显示。
社区版后端工程:
- github 仓库:https://github.com/jetlinks/jetlinks-community
- gitee 仓库:https://gitee.com/jetlinks/jetlinks-community
社区版系统模块说明:https://hanta.yuque.com/px7kg1/nn1gdr/gfqb3xmxg8fsvyxf#lR7Pd
技术栈 | 描述 |
---|---|
Java8 | 编程语言 |
hsweb Framework | 业务基础框架 |
Spring Boot 2.7.x | 响应式web支持 |
vert.x,netty | 高性能网络框架 |
R2DBC | 关系型数据库响应式驱动 |
Postgresql | 关系型数据库,可更换为mysql、sqlserver |
ElasticSearch | 设备数据与日志存储,可更换为其他中间件 |
Redis | 用户信息与权限缓存、设备注册中心缓存 |
scalecube | 基于JVM的分布式服务框架,支持响应式 |
micrometer | 监控指标框架 |
响应式编程:http://doc.jetlinks.cn/dev-guide/reactor.html
事件驱动:http://doc.jetlinks.cn/dev-guide/event-driver.html
添加自定义模块:https://hanta.yuque.com/px7kg1/dev/wdymp6flcfa1vwh5
设备接入流程:http://doc.jetlinks.cn/function-description/device_message_description.html#%E8%AE%BE%E5%A4%87%E6%8E%A5%E5%85%A5%E6%B5%81%E7%A8%8B
HTTP协议设备接入:https://hanta.yuque.com/px7kg1/yfac2l/qlr6nz5btr5rwrgk
开发者自行实现自定义协议,官方教程:http://doc.jetlinks.cn/dev-guide/custom-message-protocol.html
官方提供了协议开发示例工程:https://github.com/jetlinks/jetlinks-official-protocol
JetLinks 官方协议 jar 包:https://github.com/jetlinks/jetlinks-official-protocol/blob/v3/package/jetlinks-official-protocol-3.0.0.jar
编写自定义编解码器
:
创建org.jetlinks.core.message.codec.DeviceMessageCodec
接口实现类,重写encode()
、decode()
、getSupportTransport()
方法
编写协议的元信息
创建org.jetlinks.core.metadata.DefaultConfigMetadata
对象并设置对应属性
编写自定义设备协议支持提供商
:
创建org.jetlinks.core.spi.ProtocolSupportProvider
接口实现类,并重写create()
方法,
在create()
方法中将:将自定义编解码器
注册到协议中
配置路由配置:
在org.jetlinks.core.spi.ProtocolSupportProvider
接口实现类的create()
方法中创建org.jetlinks.core.defaults.CompositeProtocolSupport
对象,在其中配置路由配置、身份认证(可选)
将协议包上传到协议管理中
将上述的协议包和网络组件进行绑定
配置:产品信息
绑定:上述的自定义网络组件(官方定义:设备接入)
配置:认证信息
配置存储策略
配置:物模型
启用:产品
行式存储
ElasticSearch-行式存储是系统默认情况下使用的存储方案。每一个属性值都保存为一条索引记录。
典型应用场景:设备每次只会上报一部分属性, 以及支持读取部分属性数据的时候。
列式存储
一个属性作为一列,一条属性消息作为一条索引记录进行存储。
典型应用场景:适合设备每次都上报所有的属性值的场景。
关于协议包:https://hanta.yuque.com/px7kg1/nn1gdr/kcqv8dn8y6778t2a
协议包主要包含 4 个部分
数据传输协议:协议包约定了常见的网络通信协议,例如MQTT、HTTP、TCP、CoAP等,来实现物联网设备与JetLinks平台之间的数据传输。开发者可根据设备实际情况选择对应的通信协议。
数据解析标准:协议包定义了一套设备数据解析标准,使得各种类型的物联网设备通过网络协议传输至JetLinks后,根据协议包内的数据解析标准将不同类型的报文转换成平台统一的消息。
设备管理功能:协议包内可以获取平台内定义的设备数据,包括设备信息、设备配置、设备状态等,方便开发者在接入设备时获取设备相关数据进行自定义的业务逻辑处理。
身份认证:协议包支持物联网设备的身份认证,用户可以在协议包内编写身份认证逻辑来验证连接的客户端身份,以保护设备和数据的安全。
自定义 DeviceMessageCodec 接口实现类,重写 encode()、decode() 方法
重写 DeviceMessageCodec 接口中的encode()
方法
重写 DeviceMessageCodec 接口中的decode()
方法
自定义 ProtocolSupportProvider 接口实现类,配置元数据信息
配置路由与 DeviceMessage 的绑定关系
自定义 Authenticator 接口实现并配置
协议加载设计:https://hanta.yuque.com/px7kg1/nn1gdr/gascdx49ia6u4lsf
平台统一设备消息定义:http://doc.jetlinks.cn/function-description/device_message_description.html
POST
/api/file/upload
表单请求,接收参数名为:file 的文件数据对象
Content-Disposition: form-data; name="file"; filename="jetlinks-official-protocol-3.0.0.jar"
Content-Type: application/octet-stream
接口类:org.jetlinks.community.io.file.web.FileManagerController#upload
获取文件信息,并将文件数据保存到本地指定目录
默认文件目录为:./data/files/yyyyMMdd/
重命名 jar 文件名,生成规则:md5(uuid())
计算当前文件的 md5 和 sha256 值
将文件相关信息保存到数据库中,数据对象:org.jetlinks.community.io.file.FileEntity
保存成功的文件数据记录主键和文件信息一起通过接口返回
返回文件数据相关记录信息,核心信息:
{ "message": "success", "result": { "id": "9c9ce661a1fadb8019ca50145b33a074", "name": "jetlinks-official-protocol-3.0.0.jar", "extension": "jar", "length": 102512, "md5": "24504ceb0d6570b84b86e6180d9fca9f", "sha256": "fb0c6144ad056326e26eb829c13759b5080da095c7bb02386c7f064ac059f24e", "createTime": 1699859432789, "creatorId": "1199596756811550720", "options": [], "others": { "accessKey": "c24b19b0c91119c6673fa1a06a4d2ae0" } }, "status": 200, "timestamp": 1699859454032 }
PATCH
/api/protocol
接口类:org.jetlinks.community.device.web.ProtocolSupportController
{
"id": "1722876422724329472",
"name": "官方协议v3.0",
"description": "",
"type": "jar",
"state": 1,
"creatorId": "1199596756811550720",
"createTime": 1699600723328,
"configuration": {
"location": "http://localhost:5173/api/file/9c9ce661a1fadb8019ca50145b33a074?accessKey=c24b19b0c91119c6673fa1a06a4d2ae0"
}
}
org.jetlinks.community.device.entity.ProtocolSupportEntity
前端逻辑:将步骤1 的响应结果拼接成:文件地址(用户不可编辑)+ 用户填写的协议包基本信息(名称、类型、说明)
org.jetlinks.community.device.web.ProtocolSupportController
实现了org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController
接口。
org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController
接口又继承了三个接口:org.hswebframework.web.crud.web.reactive.ReactiveServiceSaveController
、org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController
、org.hswebframework.web.crud.web.reactive.ReactiveServiceDeleteController
ProtocolSupportController
package org.jetlinks.community.device.web;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.jetlinks.community.device.service.LocalProtocolSupportService;
@RestController
@RequestMapping("/protocol")
public class ProtocolSupportController
implements ReactiveServiceCrudController<ProtocolSupportEntity, String> {
@Autowired
@Getter
private LocalProtocolSupportService service;
}
ReactiveServiceCrudController
package org.hswebframework.web.crud.web.reactive;
public interface ReactiveServiceCrudController<E, K> extends
ReactiveServiceSaveController<E, K>,
ReactiveServiceQueryController<E, K>,
ReactiveServiceDeleteController<E, K> {
}
PATH /api/protocol
接口实际由:ReactiveServiceSaveController
接口提供的默认 save()
方法处理数据,最终调用getService()
方法进行save()
操作。
package org.hswebframework.web.crud.web.reactive;
import org.hswebframework.web.authorization.annotation.Authorize;
public interface ReactiveServiceSaveController<E, K> {
@Authorize(ignore = true)
ReactiveCrudService<E, K> getService();
@PatchMapping
@Operation(summary = "保存数据", description = "如果传入了id,并且对应数据存在,则尝试覆盖,不存在则新增.")
default Mono<SaveResult> save(@RequestBody Flux<E> payload) {
return Authentication
.currentReactive()
.flatMapMany(auth -> payload.map(entity -> applyAuthentication(entity, auth)))
.switchIfEmpty(payload)
.as(getService()::save);
}
}
由于ProtocolSupportController
注入了org.jetlinks.community.device.service.LocalProtocolSupportService
,并且属性名为:service,因此ProtocolSupportController
的getService()
就是ReactiveServiceSaveController
接口的getService()
方法实现。显而易见,确定协议的核心逻辑就在:LocalProtocolSupportService
的save()
方法。
org.jetlinks.community.device.service.LocalProtocolSupportService
类继承了org.hswebframework.web.crud.service.GenericReactiveCrudService
抽象类,而GenericReactiveCrudService
抽象类又实现了org.hswebframework.web.crud.service.ReactiveCrudService
接口,在ReactiveCrudService
中有save()
方法
ProtocolSupportController
package org.jetlinks.community.device.service;
import org.jetlinks.community.reference.DataReferenceManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
@Service
public class LocalProtocolSupportService extends GenericReactiveCrudService<ProtocolSupportEntity, String> {
@Autowired
private ProtocolSupportManager supportManager;
@Autowired
private DataReferenceManager referenceManager;
}
GenericReactiveCrudService
package org.hswebframework.web.crud.service;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.springframework.beans.factory.annotation.Autowired;
public abstract class GenericReactiveCrudService<E, K> implements ReactiveCrudService<E, K> {
@Autowired
private ReactiveRepository<E, K> repository;
@Override
public ReactiveRepository<E, K> getRepository() {
return repository;
}
}
GenericReactiveCrudService 注入了 ReactiveRepository 接口,该接口的实现类为:org.hswebframework.ezorm.rdb.mapping.defaults.DefaultReactiveRepository
,里面实现了save()
方法:
package org.hswebframework.ezorm.rdb.mapping.defaults;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
public class DefaultReactiveRepository<E, K> extends DefaultRepository<E> implements ReactiveRepository<E, K> {
@Override
public Mono<SaveResult> save(Publisher<E> data) {
return Flux
.from(data)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> doSave(list).reactive().as(this::setupLogger))
.defaultIfEmpty(SaveResult.of(0, 0));
}
}
上述doSave()
方法是org.hswebframework.ezorm.rdb.mapping.defaults.DefaultRepository
抽象类提供的默认方法:
package org.hswebframework.ezorm.rdb.mapping.defaults;
import org.hswebframework.ezorm.rdb.mapping.events.EventResultOperator;
public abstract class DefaultRepository<E> {
protected SaveResultOperator doSave(Collection<E> data) {
RDBTableMetadata table = getTable();
UpsertOperator upsert = operator.dml().upsert(table.getFullName());
return EventResultOperator.create(
() -> {
upsert.columns(getProperties());
List<String> ignore = new ArrayList<>();
for (E e : data) {
upsert.values(Stream.of(getProperties())
.map(property -> getInsertColumnValue(e, property, (prop, val) -> ignore.add(prop)))
.toArray());
}
upsert.ignoreUpdate(ignore.toArray(new String[0]));
return upsert.execute();
},
SaveResultOperator.class,
table,
MappingEventTypes.save_before,
MappingEventTypes.save_after,
getDefaultContextKeyValue(instance(data),
type("batch"),
tableMetadata(table),
upsert(upsert))
);
}
}
上述EventResultOperator
的create()
方法中,发布了EntitySavedEvent
事件(通过 Spring的ApplicationEventPublisher 发送事件)。
在org.jetlinks.community.device.service.ProtocolSupportHandler
中订阅了EntitySavedEvent
事件:
package org.jetlinks.community.device.service;
import lombok.AllArgsConstructor;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.community.reference.DataReferenceManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
@Component
@AllArgsConstructor
public class ProtocolSupportHandler {
private final DataReferenceManager referenceManager;
private ProtocolSupportLoader loader;
private ProtocolSupportManager supportManager;
@EventListener
public void handleCreated(EntityCreatedEvent<ProtocolSupportEntity> event) {
event.async(reloadProtocol(event.getEntity()));
}
@EventListener
public void handleSaved(EntitySavedEvent<ProtocolSupportEntity> event) {
event.async(reloadProtocol(event.getEntity()));
}
@EventListener
public void handleModify(EntityModifyEvent<ProtocolSupportEntity> event) {
event.async(reloadProtocol(event.getAfter()));
}
// 重新加载协议
private Mono<Void> reloadProtocol(Collection<ProtocolSupportEntity> protocol) {
return Flux
.fromIterable(protocol)
.filter(entity -> entity.getState() != null)
.map(entity -> entity.getState() == 1 ? entity.toDeployDefinition() : entity.toUnDeployDefinition())
.flatMap(def -> loader
//加载一下检验是否正确,然后就卸载
.load(def)
.doOnNext(ProtocolSupport::dispose)
.thenReturn(def))
.onErrorMap(err -> new BusinessException("error.unable_to_load_protocol", 500, err.getMessage()))
.flatMap(supportManager::save)
.then();
}
}
上述ProtocolSupportLoader
接口的实现类为org.jetlinks.community.protocol.SpringProtocolSupportLoader
,其中load()
方法会动态加载 jar 包为org.jetlinks.core.spi.ProtocolSupportProvider
接口实现,并执行create()
方法。
{
"message": "success",
"result": {
"added": 0,
"updated": 1,
"total": 1
},
"status": 200,
"timestamp": 1699859463951
}
通过org.jetlinks.community.device.service.ProtocolSupportHandler
监听EntityCreatedEvent
、EntitySavedEvent
、EntityModifyEvent
事件,调用ProtocolSupportLoader
的load()
方法加载协议
在 ProtocolSupportLoader 的 load() 方法中:会调用 org.jetlinks.core.spi.ProtocolSupportProvider 接口实现,并执行 create() 方法
通过org.jetlinks.community.protocol.LazyInitManagementProtocolSupports
实现org.springframework.boot.CommandLineRunner
接口,在项目启动时执行init()
方法,调用ProtocolSupportLoader
的load()
方法加载协议
在 ProtocolSupportLoader 的 load() 方法中:会调用 org.jetlinks.core.spi.ProtocolSupportProvider 接口实现,并执行 create() 方法
为了更好的阅读体验,建议移步至笔者的博客阅读:JetLinks设备接入的认识与理解