• kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战


    1、上一节课我们学习了MQTT producer 生产者步骤,MQTT consumer消费者步骤。该步骤可以从支持MRQTT协议的中间件获取数据,该步骤和kafka consumer 一样可以处理实时数据交互,如下图所示:

     2、双击步骤打开MQTT consumer 配置窗口,如下图所示:

    Step name:自定义步骤名称。

    Transformation:设置子转换,该子转换的作用是从中间件读取流数据,然后将字段返回给MQTT consumer步骤进行使用。

    Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

    Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

    Topics name:在主题名称字段中,输入您希望订阅流数据(消息)的 MQTT 主题的名称。其实这里的topic是RabbitMQ中的routing key(另外这里的routing key 一定不要绑定队列,否则MQTT consumer步骤无法接收数据)。

    Quality of Service:是消息传递的保证级别。选择以下选项之一。
    至多一次(0),这是默认值
    至少一次(1)
    恰好一次(2)

     3、安全验证设置,如下图所示:

    Username:MQTT服务器的用户名,如admin

    Password:MQTT服务器的密码。

    Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

     3、批次设置,使用此选项卡来指定在处理之前要拉取多少消息。您可以指定消息数量和/或特定的时间量。

    Duration(ms):
    请指定一个以毫秒为单位的时间。此值表示在执行转换之前该步骤将花费多少时间来收集记录。
    如果将此选项设置为0,则根据参数Number of records记录数触发消费。要运行转换,持续时间或记录数选项都必须包含一个大于0的值。

    Number of records
    指定一个数字。在每收集到‘X’条记录之后,指定的转换将被执行,并且这些‘X’条记录将被传递给转换过程。
    如果将此选项设置为0,则将参数Duration按持续时间触发消费。为了运行转换,持续时间或记录数选项都必须包含一个大于0的值。

    Maximum concurrent batches

    指定用于同时收集记录的最大批次数。默认值为1,表示使用单个批次来收集记录。仅当您的消费者步骤无法跟上数据流的速度时,才应使用此选项。
    您的计算环境必须具备足够的 CPU 和内存来进行此实现。如果您的环境无法处理指定的最大并发批次数,将会出现错误。


    Message prefetch limit
    请指定此步骤将排队等待处理的传入消息的限制,即从 kfakfa broker接收到的消息。
    设置此值会强制kafka broker处理超过指定限制的消息的背压。默认排队消息的数量是100000条。

     4、字段设置,这里采用默认值就行了,不用调整。

    5、子转换结果字段设置,选择子转换返回数据的步骤。 

     6、同上一节课,本次不再介绍。

     7、子转换配置,将Get records from stream步骤拉到画布,然后填写Message、Topic两个字段,类型都是设置为String即可。

  • 相关阅读:
    [量化投资-学习笔记008]Python+TDengine从零开始搭建量化分析平台-CCI和ATR
    赛后补题L - Non-Prime Factors
    【重铸Java根基】理解Java反射机制
    GBase8a SSL 配置
    ArrayList源码分析
    三大排序(插入排序,选择排序,冒泡排序)
    Java(103):列表转化成map;map和json互换
    xxl-job安装部署
    导数专题解题技巧
    DevEco Studio harmonyOS 模拟器 Unable to install HAXM
  • 原文地址:https://blog.csdn.net/zhangjin1222/article/details/138037025