本文讲解4.6版jxTMS中的数据总线,整个系列的文章请查看:docker版jxTMS使用指南:4.6版升级内容
docker版本的使用,请查看:docker版jxTMS使用指南
4.0版jxTMS的说明,请查看:4.0版升级内容
4.2版jxTMS的说明,请查看:4.2版升级内容
4.4版jxTMS的说明,请查看:4.4版升级内容
jxTMS是建构在rabbitMQ之上的,主系统和数据采集系统都是挂在rabbitMQ上的。包括:
主系统的web模块,用来和web前端交互,将页面的静态json描述映射到用户session中的一个个动态页面,作为桥梁来完成web页面上的各种控件和后端capa中的输入输出语句间的勾连与映射,同时将用户行为映射为capa中的事件。也就是说,完成capa编程模型中的ui相关工作
主系统中代理各组织的ORG模块,用于映射一个个组织,完成组织架构、角色、用户的映射与管理,组织中各种资源的统一管理,为capa准备上下文、数据库连接,将每个事件的响应中所有的数据库增删改操作纳入同一个数据库事件进行提交活错误时回滚
以catalogService为核心的服务扩展。catalogService是jxTMS中服务体系的核心机制,主系统中建设有服务中心,统一负责各服务的注册、保活与调用。新的服务,通过MQ向catalogService进行注册加入到系统中,其它模块可以通过catalogService来查询并访问相应的服务
数据采集系统就是以服务的方式挂在MQ上,成为jxTMS主系统的扩展。所以将数据采集系统进一步拆分为分布式处理,主要有两个措施:
横向拆分,即当数据采集压力过大时,可以将所有的站点分片,分别由不同的数据采集系统来采集。数据采集系统目前主要是通过mqtt订阅站点名的主题来接收前方设备发送的数据的,所以只要将所有站点分片提供给不同的数据采集服务即可
纵向拆分,之前在本地数据总线的文章中也介绍过了,数据采集系统是一个完整的数据采集、解析、整理、保存的框架。由于数据的加工、应用都是面向业务的,所以强烈建议不要将数据的加工、应用放到数据采集系统中来执行,而是通过数据总线进行隔离,由独立的数据加工处理、应用等模块来执行
数据总线功能组件包括两种工作模式、两种参与角色。
1、工作模式
数据总线提供了两种数据服务模式:
主从模式:即从端向主端发起的数据查询与获取请求,然后主端返回响应的数据。一条数据总线上只有一个主端,从端数量不限
广播模式:所有参与方都可以将数据广播给总线上的其它各方
需要说明的是,数据总线的这两种模式同时存在,即一条数据总线可以有一个主端来为各从端提供数据查询服务,而所有从端都可以广播以及监听广播。
2、参与角色
根据上面工作模式的说明,可以看到数据总线包括两种参与角色:
主从模式下的主端,主要是作为数据中心,为其它成员提供数据查询与读取服务【也可执行数据设置】
主从模式下的从端,在必要时向主端请求数据
而广播模式下所有参与方都是平等的,都可自由的广播数据与监听总线来超收数据。
注:数据总线可以有多条。每条不同的数据总线都有自己的服务主端与广播地址
使用数据源前需导入:
from jx.jxDataBus import dataBus
构造函数(self, maxSize=0, serviceDual=None, broadcastAddr=_broadcast_dataBusService, serviceName=_serviceName_dataBusService)
参数:
maxSize:缓存数据的容量
serviceDual:主端的服务命令处理函数,从端部需要提供
broadcastAddr:本数据总线的广播地址
serviceName:本数据总线的服务名
示例:
#设备专用的数据总线
serviceName_deviceData = 'deviceData'
broadcast_deviceData = 'deviceData'
service_deviceData = 'deviceData.service'
dataBus.registerService(serviceName_deviceData, dataBus(broadcastAddr=broadcast_deviceData,serviceName=service_deviceData))
说明:
数据总线内置了一个LRU的缓存,如果指定了maxSize,则会启用该缓存,而当其中的数据容量超过此限额时,就会以LRU的方式淘汰掉最老的数据
registerInterest(cls, name, receiveFunc, interestedVisitorList):
注册本条数据总线上的一个兴趣点
参数:
name:兴趣点的名字
receiveFunc:接收函数
interestedVisitorList:前述用getInterestedVisitor获取到的访问器的列表
返回值:
无
unRegisterInterest(cls, name):
取消注册一个兴趣点
参数:
name:兴趣点的名字
registerSource(cls, type, source, siteType=None, siteName=None):
注册本条数据总线上的一个数据源
参数:
type:数据源的类型
source:数据源的名字
siteType:数据源所在站点的类型
siteName:数据源所在站点的名字
unRegisterSource(cls, source):
取消注册一个数据源
参数:
source:数据源的名字
inform(self, sendObj, source, data)
通过本数据总线广播一条数据
参数:
sendObj:拥有数据的对象,即可以通过sendObj.data()取得数据
source:发送数据者的名字
data:发送的数据
注:广播数据前前需registerSource,接收数据需registerInterest
list(self, ty, st=None, sn=None)
通过本数据总线获取查询某类型的所有数据
参数:
ty:数据源的类型
st:数据源所在站点的类型
sn:数据源所在站点的名字
get(self, ty, source, st=None, sn=None)
通过本数据总线获取查询某指定数据源的当前数据
参数:
ty:数据源的类型
source:数据源的名字
st:数据源所在站点的类型
sn:数据源所在站点的名字
注:数据总线是针对【某站点的某设备的当前数据】这一场景开发的,所以get就是读取某数据源的当前数据。如果应用场景有所不同,要注意相应的变换
getInterestedVisitor(cls, str)
获取表达式的访问器。在注册一个兴趣点【某个发送数据的数据源】时,需要指定相应的访问器。该访问器是一个表达式形式,用来过滤符合要求的数据源。
参数:
str:表达式
返回值:
该表达式所对应的访问器
示例:
#对类型是【dt_test】名字是【d_name】的数据源发送的数据感兴趣,【all】对应的是变量名,这里用不到
v = dataBus.getInterestedVisitor('dt_test.d_name.all')
#对类型是【dt_test】的数据源发送的数据感兴趣,【all】对应的是变量名,这里用不到
v = dataBus.getInterestedVisitor('dt_test.*.all')
registerServiceInterest(cls, serviceName, name, receiveFunc, interestedVisitorList):
注册一个兴趣点
参数:
serviceName:数据总线名【即上述的多条数据总线中的哪一条】
name:兴趣点的名字
receiveFunc:接收函数
interestedVisitorList:前述用getInterestedVisitor获取到的访问器的列表
返回值:
无
示例:
v = dataBus.getInterestedVisitor('dt_test.*.all')
dataBus.registerServiceInterest('myDataBusNmae','myName',recvFunc,[v])
说明:
receiveFunc的函数签名为:
receiveFunc(cmd,sendObj,data)
命令为inform时,接收到的就是数据,数据放入:sendObj.data()
其它命令由用户自行约定
unRegisterServiceInterest(cls, serviceName, name):
取消注册一个兴趣点
参数:
serviceName:数据总线名【即上述的多条数据总线中的哪一条】
name:兴趣点的名字
registerServiceSource(cls, serviceName, type, source, siteType=None, siteName=None):
注册一个数据源
参数:
serviceName:数据总线名【即上述的多条数据总线中的哪一条】
type:数据源的类型
source:数据源的名字
siteType:数据源所在站点的类型
siteName:数据源所在站点的名字
返回值:
无
说明:
数据总线目前只有一个使用场景:设备接收到数据后发送到数据总线上,所以其才有站点的信息
unRegisterServiceSource(cls, serviceName, source):
取消注册一个数据源
参数:
serviceName:数据总线名【即上述的多条数据总线中的哪一条】
source:数据源的名字
registerService(cls, serviceName, dataBusObj):
注册一条数据总线
参数:
serviceName:数据总线名【即上述的多条数据总线中的哪一条】
dataBusObj:数据总线对象
getService(cls, serviceName):
获取数据总线
参数:
serviceName:数据总线名【即上述的多条数据总线中的哪一条】
返回值:
数据总线对象
informService(cls, serviceName, sendObj, source, data):
通过某条数据总线广播数据
参数:
serviceName:数据总线名【即上述的多条数据总线中的哪一条】
sendObj:拥有数据的对象,即可以通过sendObj取得数据
source:发送数据者的名字
data:发送的数据
还有三个大写字母开头的类函数:
Inform(self, sendObj, source, data)
通过系统的数据总线广播一条数据
参数:
sendObj:拥有数据的对象,即可以通过sendObj.data()取得数据
source:发送数据者的名字
data:发送的数据
注:广播数据前前需registerSource,接收数据需registerInterest
List(self, ty, st=None, sn=None)
通过系统的数据总线获取查询某类型的所有数据
参数:
ty:数据源的类型
st:数据源所在站点的类型
sn:数据源所在站点的名字
Get(self, ty, source, st=None, sn=None)
通过系统的数据总线获取查询某指定数据源的当前数据
参数:
ty:数据源的类型
source:数据源的名字
st:数据源所在站点的类型
sn:数据源所在站点的名字
即Inform、List、Get就是系统数据总线的inform、list、get。
1、在命令行设置了开关dataBus,就会自动启动系统的数据总线
系统数据总线名为:dataBus。告诉大家只是希望不要重复了,建议大家不要直接使用。
目前系统数据总线主要用来提供系统配置,有:资源、用户、权限、站点、主题订阅、钉钉组共六个配置项会使用到系统数据总线。
2、在设备采集中,使用了名为【deviceData】的数据总线
各设备在接收到数据后,会通过此总线发送数据通知,感兴趣的模块可通过此数据总线来收听并完成自己的数据处理与应用工作。
当然,不建议直接使用【deviceData】的数据总线名,而是通过引用来使用:
from jx.device import serviceName_deviceData
3、如果想使用自己的数据总线,可:
from jx.jxDataBus import dataBus
myDS = dataBus(broadcastAddr='myBroadcastName',serviceName='myServiceName')
dataBus.registerService('myServiceName',myDS)
#之后就可以用上述带Service的api和【myServiceName】来发送通知与挂载监听了
参考资料:
下面的系列文章讲述了如何用jxTMS开发一个实用的业务功能:
下面的系列文章讲述了jxTMS的一些基本开发能力: