• 【EdgeX(15)】 :在EdgeX环境下配置eKuiper规则引擎服务,配置规则处理device-virtual发送的数据,并转发给HTTP服务


    前言


    相关EdgeX Foundry 全部分类:
    https://blog.csdn.net/freewebsys/category_9437788.html

    本文的原文连接是:
    https://blog.csdn.net/freewebsys/article/details/127585739

    未经博主允许不得转载。
    博主地址是:http://blog.csdn.net/freewebsys

    1,关于 ekuiper 规则引擎


    LF Edge eKuiper 是一个轻量级的物联网数据分析和流处理引擎,运行在资源受限的边缘设备上。
    可以进行配置规则,使用的是golang 进行开发的。
    使用 apache2.0 协议开源的。
    https://ekuiper.org/zh
    github 地址:
    https://github.com/lf-edge/ekuiper

    简单的介绍:
    在这里插入图片描述
    主要就是编写SQL Flow 语句,进行规则配置。
    视频参考:
    https://www.bilibili.com/video/BV11B4y1V7pQ/

    EdgeX 规则引擎 eKuiper 实战

    2,使用 EdgeX 进行相关开发。


    可以增加官方的管理后台界面:

      rulesengine:
        container_name: edgex-kuiper
        depends_on:
        - database
        environment:
          CONNECTION__EDGEX__REDISMSGBUS__PORT: 6379
          CONNECTION__EDGEX__REDISMSGBUS__PROTOCOL: redis
          CONNECTION__EDGEX__REDISMSGBUS__SERVER: edgex-redis
          CONNECTION__EDGEX__REDISMSGBUS__TYPE: redis
          EDGEX__DEFAULT__PORT: 6379
          EDGEX__DEFAULT__PROTOCOL: redis
          EDGEX__DEFAULT__SERVER: edgex-redis
          EDGEX__DEFAULT__TOPIC: rules-events
          EDGEX__DEFAULT__TYPE: redis
          KUIPER__BASIC__CONSOLELOG: "true"
          KUIPER__BASIC__RESTPORT: 59720
        hostname: edgex-kuiper
        image: lfedge/ekuiper:1.4.4-alpine
        networks:
          edgex-network: {}
        ports:
        - 127.0.0.1:59720:59720/tcp
        read_only: true
        restart: always
        security_opt:
        - no-new-privileges:true
        user: kuiper:kuiper
        volumes:
        - kuiper-data:/kuiper/data:z
      rulesengine-manager:
        container_name: edgex-kuiper-manager
        image: emqx/ekuiper-manager:1.6
        networks:
          edgex-network: {}
        ports:
        - 9082:9082/tcp
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    特别注意端口是9082 ,镜像地址是:emqx/ekuiper-manager:1.6,同时放到一个网络里面。就可以直接访问edgex的规则引擎了。

    https://ekuiper.org/docs/zh/latest/operation/manager-ui/overview.html

    # 地址:http://localhost:9082
    用户名:admin
    密码:public
    
    • 1
    • 2
    • 3

    默认进入管理端是空,需要增加服务:http://edgex-kuiper:59720

    在这里插入图片描述
    在这里插入图片描述
    增加成功了,同时相关的配置也读取到了,操作也很方便。
    在这里插入图片描述
    在这里插入图片描述

    3,在edgex 管理端上面进行操作:


    http://127.0.0.1:4000/

    创建一个 edgex 共享的流

    # 简单配置
    CREATE STREAM EdgexStream () WITH (
      FORMAT = "JSON",
      TYPE = "edgex"
    )
    # 或者复杂点的配置也加上
    CREATE STREAM EdgexStream () WITH (
      DATASOURCE = "redis",
      KEY = "",
      FORMAT = "json",
      CONF_KEY = "default",
      TYPE = "edgex",
      STRICT_VALIDATION = "true",
      TIMESTAMP = "",
      TIMESTAMP_FORMAT = "",
      RETAIN_SIZE = "0",
      SHARED = "true"
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述

    SELECT * FROM  EdgexStream
    
    • 1

    创建规则

    在这里插入图片描述
    可以设置一个空消息:
    在这里插入图片描述
    也可以定制一个消息模板,配置好地址和服务:
    在这里插入图片描述
    结果服务没有启动成功:
    在这里插入图片描述

    报错:

    Stopped: read properties map[method:GET retryInterval:1 sendSingle:false url:http://edgex-core-command:59882/edgex/api/getData] fail with error: 2 error(s) decoding: * 'retryInterval' expected type 'int', got unconvertible type 'string', value: '1' * 'sendSingle' expected type 'bool', got unconvertible type 'string', value: 'false'.
    
    • 1

    看样子是界面传递的 false 在 edgex 中转换错误了。

    {
        "triggered": true, 
        "id": "rule1", 
        "sql": "SELECT * FROM  EdgexStream", 
        "actions": [
            {
                "rest": {
                    "retryInterval": 1, 
                    "sendSingle": false, 
                    "url": "http://edgex-core-command:59882/edgex/api/getData", 
                    "method": "GET"
                }
            }
        ], 
        "options": {
            "isEventTime": false, 
            "lateTolerance": 1000, 
            "concurrency": 1, 
            "bufferLength": 1024, 
            "sendMetaToSink": false, 
            "sendError": true, 
            "qos": 0, 
            "checkpointInterval": 300000
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    直接使用 postman 提交数据:
    在这里插入图片描述
    然后可以了,规则服务终于可以了,估计需要升级 ui 和 规则引擎的版本不兼容造成的。
    在这里插入图片描述
    更多详细的配置参考:
    https://ekuiper.org/docs/zh/latest/edgex/edgex_source_tutorial.html

    SELECT * FROM EdgexStream WHERE meta (deviceName) = "device-virtual"
    
    • 1

    或者升级到最新的ui 版本也可以解决这个问题。

    4,或者升级到最新的ui:2.2.0也可以解决问题


    在这里插入图片描述
    要保障 device-virtual 的设备启动成功,就开始往 edgex 当中发送数据。
    配置好了规则引擎之后就可以接受触发的数据了。

    写的假地址,这样就可以算接受数据了。

    2022/10/29 14:00:19.874 [D] [server.go:2836]  |    172.18.0.10| 404 |   1.156644ms| nomatch| GET      /edgex/api/getData
    2022/10/29 14:00:19.876 [D] [server.go:2836]  |    172.18.0.10| 404 |    257.062µs| nomatch| GET      /edgex/api/getData
    2022/10/29 14:00:19.878 [D] [server.go:2836]  |    172.18.0.10| 404 |    297.857µs| nomatch| GET      /edgex/api/getData
    2022/10/29 14:00:19.879 [D] [server.go:2836]  |    172.18.0.10| 404 |    258.943µs| nomatch| GET      /edgex/api/getData
    
    • 1
    • 2
    • 3
    • 4

    也可以配置成发送到edgex的其他服务上:

    在这里插入图片描述
    然后在看处理的消息数量,有所增加。
    在这里插入图片描述

    5,主要是利用redis的topic订阅发布消息


    通过订阅 SUBSCRIBE rules-events 消息是通过这个发布的。格式

    $ docker exec -it edgex-redis sh
    /data # redis-cli 
    127.0.0.1:6379> SUBSCRIBE rules-events
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "rules-events"
    3) (integer) 1
    1) "message"
    2) "rules-events"
    3) "{\"ReceivedTopic\":\"\",\"CorrelationID\":\"9a7a86fd-eac7-43ea-881a-da4672592f1f\",\"Payload\":\"eyJhcGlWZXJzaW9uIjoidjIiLCJpZCI6IjgwZjBjMjgwLTQwNjEtNDRlNy04MjJmLTQ3MGNiZjQ0N2M2OCIsImRldmljZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwic291cmNlTmFtZSI6IkZsb2F0MzIiLCJvcmlnaW4iOjE2NjcwNTU2Nzk3OTM2MzYwNjEsInJlYWRpbmdzIjpbeyJpZCI6Ijg3MzI2ZDE5LWM2NTEtNGE3NC04YWJkLTkxOTkyMjQ0ZTYzZSIsIm9yaWdpbiI6MTY2NzA1NTY3OTc5MzYzNjA2MSwiZGV2aWNlTmFtZSI6IlJhbmRvbS1GbG9hdC1EZXZpY2UiLCJyZXNvdXJjZU5hbWUiOiJGbG9hdDMyIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwidmFsdWVUeXBlIjoiRmxvYXQzMiIsInZhbHVlIjoiMS42NTkwNzllKzM4In1dfQ==\",\"ContentType\":\"application/json\"}"
    1) "message"
    2) "rules-events"
    3) "{\"ReceivedTopic\":\"\",\"CorrelationID\":\"232d1863-be69-4cbc-971f-29c81e8b3d61\",\"Payload\":\"eyJhcGlWZXJzaW9uIjoidjIiLCJpZCI6IjQ2ZGU0ZGE2LTcwMWEtNGQ3ZC1iZmRiLTNmMGI4OTRjNmVjYiIsImRldmljZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwic291cmNlTmFtZSI6IkZsb2F0NjQiLCJvcmlnaW4iOjE2NjcwNTU2Nzk3OTQwOTg3MDUsInJlYWRpbmdzIjpbeyJpZCI6IjBjYTkyZGVkLTIyNjUtNDYyMC1hMzc1LWUxMDllNDU2MThjNSIsIm9yaWdpbiI6MTY2NzA1NTY3OTc5NDA5ODcwNSwiZGV2aWNlTmFtZSI6IlJhbmRvbS1GbG9hdC1EZXZpY2UiLCJyZXNvdXJjZU5hbWUiOiJGbG9hdDY0IiwicHJvZmlsZU5hbWUiOiJSYW5kb20tRmxvYXQtRGV2aWNlIiwidmFsdWVUeXBlIjoiRmxvYXQ2NCIsInZhbHVlIjoiLTUuMzM1MTI4ZSszMDcifV19\",\"ContentType\":\"application/json\"}"
    1) "message"
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    使用的是base64 加密了下,估计是防止json 数据问题。解析后的数据:
    https://tool.oschina.net/encrypt?type=3

    使用工具转换下,可以看到是virtual设备发送的信息。

    {"apiVersion":"v2","id":"80f0c280-4061-44e7-822f-470cbf447c68","deviceName":"Random-Float-Device","profileName":"Random-Float-Device","sourceName":"Float32","origin":1667055679793636061,"readings":[{"id":"87326d19-c651-4a74-8abd-91992244e63e","origin":1667055679793636061,"deviceName":"Random-Float-Device","resourceName":"Float32","profileName":"Random-Float-Device","valueType":"Float32","value":"1.659079e+38"}]}
    
    • 1

    6,编写beego的http服务接收数据


    配置好规则地址是 POST方法,然后数据格式是json。
    使用beego 的POST 方法接受数据:

    func (c *IndexController) GetEdgexData() {
    	body := c.Ctx.Input.RequestBody
    
    	log.Println("########### GetEdgexData :",string(body)," ###########")
    	defer c.ServeJSON()
    	return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    本来以为是给json,但是是给数组,数组里面是个json。

    2022/10/29 15:59:30 ########### GetEdgexData : [{"Bool":true}]  ###########
    2022/10/29 15:59:30.385 [D] [server.go:2836]  |     172.17.0.1| 200 |    308.432µs|   match| POST     /edgex/api/getData   r:/edgex/api/getData
    2022/10/29 15:59:35 filterAdmin request url  :  /edgex/api/getData
    2022/10/29 15:59:35 ########### GetEdgexData : [{"Int8":-46}]  ###########
    2022/10/29 15:59:35.163 [D] [server.go:2836]  |     172.17.0.1| 200 |    162.124µs|   match| POST     /edgex/api/getData   r:/edgex/api/getData 
    2022/10/29 15:59:35 ########### GetEdgexData : [{"Int64":5618840669469908222}]  ###########
    2022/10/29 15:59:35.164 [D] [server.go:2836]  |     172.17.0.1| 200 |    196.329µs|   match| POST     /edgex/api/getData   r:/edgex/api/getData
    2022/10/29 15:59:35 ########### GetEdgexData : [{"Int16":-11546}]  ###########
    2022/10/29 15:59:35.165 [D] [server.go:2836]  |     172.17.0.1| 200 |     132.69µs|   match| POST     /edgex/api/getData   r:/edgex/api/getData
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    7,总结EdgeX规则引擎使用


    总体来说 EdgeX的eKuiper 规则引擎使用起来是非常的方便的。
    通过和 EdgeX 深度整合,可以直接转换调用成各种方法,也可以自定义转发到 rest mqtt mq 等地方。
    同时可以接收到device-virtual的数据。正个流程也非常清晰方便。

    本文的原文连接是:
    https://blog.csdn.net/freewebsys/article/details/127585739

    博主地址是:https://blog.csdn.net/freewebsys

  • 相关阅读:
    【无标题】
    高数有多难?AI高数考试正确率81%
    qmt量化交易策略小白学习笔记第46期【qmt编程之期货行情数据--如何获取5档盘口行情、期货结算价与持仓量】
    微信小程序-2
    c++ 正则表达式的若干难查问题
    tf.while_loop
    java-net-php-python-ssmA公司运维管理系统计算机毕业设计程序
    JS运算符
    三十五、【进阶】MySQL性能查看
    【分布式应用】GFS分布式文件系统
  • 原文地址:https://blog.csdn.net/freewebsys/article/details/127585739