• 阿里云IoT流转到postgresql数据库方案


    之前写过一篇如使用阿里云上部署.NET 3.1自定义运行时的文章,吐槽一下,虽然现在已经2022年了,但是阿里云函数计算的支持依然停留在.NET Core 2.1,更新缓慢,由于程序解包大小的限制,也不能放太复杂的东西的上去,虽然现在.NET 6裁剪包能挺好地解决这个问题,但是心里还是不爽。

    需求#

    言归正传,有这么一个情景:发送数据想接入阿里云的IoT平台,然后直接插入postgresQL数据库中。正常来说,只要数据发送到了IoT平台,然后定义转发到RDS就可以了,不过阿里云有几个限制:

    1. 数据流转到RDS数据库,只能支持mysqlsql server
    2. 数据流转只支持json形式的数据流转,如果发送的是透传的数据,那么发送不了(更新:现在新版的数据流转已经支持了。)

    思前想后,可能只能掏出阿里云的函数计算服务了,运用函数计算作为中转,将透传的数据流转给函数计算,然后在函数计算中执行sql语句。

    IoT平台接收设置#

    阿里云的物联网平台,设置了基本的产品和设备之后,如果是物模型的话,那么自行设置好对应的物模型。对于透传就比较简单了,支持MQTT的设备方只需要定义:

    • 透传的消息发送到/{productKey}/{deviceName}/user/update
    • 订阅阿里云的/{productKey}/{deviceName}/user/get
    • 设置阿里云的Mqtt IoT实例终端节点:({YourProductKey}.iot-as-mqtt.{YourRegionId}.aliyuncs.com:1883
    • 设置设备的ProductKey和ProductSecret

    设置好之后,即可传输数据到阿里云IoT端,数据传输过来,看下日志,如果能看到:

    img

    那说明就已经发送OK了,接收到的是普通的字符串(不是json),需要进行进一步解析。

    IoT流转设置#

    云产品流转中,新建解析器,设置好数据源,数据目的选择函数计算:
    img
    解析器脚本比较简单:

    var data = payload(); 
    writeFc(1000, data);  
    

    注意,payload函数payload(textEncoding)是內建的函数:

    • 不传入参数:默认按照UTF-8编码转换为字符串,即payload()等价于payload('utf-8')。
    • 'json':将payload数据转换成Map格式变量。如果payload不是JSON格式,则返回异常。
    • 'binary':将payload数据转换成二进制变量进行透传。
      这里我使用文本透传的形式,将数据转成UTF8文本传输。writeFc指将转换的内容传递给1000编号的目标函数。详情见文档

    当然还可以使用更为复杂的脚本,实现对脚本数据的初步处理,由于我这里后面还有函数计算,我就直接将数据转到下一个节点。

    函数计算配置#

    按照官方文档新建函数,请注意不需要新建触发器!我们这里的函数使用python语言,通过psycopg2实现数据插入到postgres数据库中。

    由于函数计算中,默认并没有该包,需要手动添加引用,官方建议使用Serverless Devs工具安装部署,这个玩意非常不好用,嗯,我不接受他的建议。推荐大家使用vscode,安装阿里云serverless的插件,这样其实更加方便。

    按照插件的文档,自己建立好服务与函数,默认会给一个函数入口:

    # To enable the initializer feature (https://help.aliyun.com/document_detail/158208.html)
    # please implement the initializer function as below:
    # def initializer(context):
    #   logger = logging.getLogger()
    #   logger.info('initializing')
    
    def handler(event, context):
      logger = logging.getLogger()
      logger.info(event)
      return 'hello world'
    

    我们首先在函数上右键,然后选择Install Package,选择pip安装psycopg2,依赖就自动被安装上了,这个非常方便。

    请注意,通过IOT流转过来的字符串,是b'data'这样的形式的形式,需要先decode一下,然后在处理,修改函数为:

    # -*- coding: utf-8 -*-
    import logging
    import psycopg2
    import uuid
    import time
    
    def insert_database(device_id,data):
        timest = int(time.time()*1000)
        guid = str(uuid.uuid1())
        conn = psycopg2.connect(database="", user="", password="", host="", port="")
        cur = conn.cursor()
        sql = 'INSERT INTO "data"("Id","DeviceId","Timestamp", "DataArray") VALUES (\'{id}\', \'{deviceid}\', \'{timestamp}\', array{data})'
        sql = sql.format(id= guid, deviceid= device_id, timestamp= timest, data= data)
        cur.execute(sql)
        conn.commit()
        print(" Records inserted successfully")
        conn.close() 
        
    def extract_string_array(data: bytes):
        arr = data.decode().strip().split(' ')
        # 写自己的逻辑
        return deviceid, resu   
    
    def handler(event, context):
      logger = logging.getLogger()
      logger.info(event)
      device_id, result = extract_string_array(event)
      insert_database(device_id, result)
      return 'OK'
    

    保存,然后在vscode中deploy即可。

    提示:vscode中也可以进行本地的debug,还是比较方便的,不过这些功能依赖docker,所以还是提前装好比较好。

    弄完了之后,应该是能看见这样的画面:

    img

    至此,数据就正常流转成功。

    要点#

    1. 不要设置触发器,当时为了配置这个触发器弄了非常长的时间
    2. 函数计算与数据库的VPC应该相同,并且赋予权限,否则无法访问。
    3. 函数计算默认无法保持状态,如果有这个需求,最好试试别的方案,或者看下函数计算的预留实例(常驻实例)
    4. 提前在本地安装好Docker,要不会有各种各样的问题出现。
    5. Postgresql插入数组格式的数据,需要注意格式,可以参考这篇文档
    6. 如果长时间不用docker,导致docker无法启动,可以参考这篇文章
  • 相关阅读:
    clickhouse中SummingMergeTree
    Python软件编程等级考试五级——20220618
    我的世界Bukkit服务器插件开发教程(十)实体
    远程调试 idea配置remote debug、在远程服务器的程序中,添加JVM启动参数-Xdebug
    qt example plugandpaint 插件 动态库 pnp_extrafiltersd.dll无法加载问题
    SLAM从入门到精通(gmapping建图)
    P6773 [NOI2020] 命运(dp、线段树合并)
    Go语言学习——map
    C#小项目之记事本
    四探循环依赖 → 当循环依赖遇上 BeanPostProcessor,爱情可能就产生了!
  • 原文地址:https://www.cnblogs.com/podolski/p/16178761.html