• Kafka 插件并创建 Kafka Producer 发送


    相关说明

    • 启动测试前清空所有数据。
    • 每次测试先把所有数据写入 Kafka,再加载 Kafka 插件同步数据到 DolphinDB 中。目的是将同步数据的压力全部集中到 Kafka 插件。
    • 以 Kafka 插件从收到第一批数据到收到最后一批数据的时间差作为同步数据的总耗时。

    测试流程

    • 加载 Kafka 插件并创建 Kafka Producer 发送数据到 Kafka 中(以发送 100 万条数据为例)

    DolphinDB GUI 连接 DolphinDB 执行以下脚本,本例中插件的安装路径为 /DolphinDB/server/plugins/kafka,用户需要根据自己实际环境进行修改:

    1. // 加载插件
    2. try{
    3. loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")
    4. loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt")
    5. } catch(ex){print(ex)}
    6. // 创建 Producer
    7. producerCfg = dict(STRING, ANY);
    8. producerCfg["metadata.broker.list"] = "192.193.168.5:8992";
    9. producer = kafka::producer(producerCfg);
    10. kafka::producerFlush(producer);
    11. //向kafka传100万数据
    12. tbl = table("R5L1B3T1N03D01" as deviceId, "2022-02-22 13:55:47.377" as timestamps, "voltage" as deviceType , 1.5 as value )
    13. // 创建 Consume
    14. consumerCfg = dict(STRING, ANY)
    15. consumerCfg["group.id"] = "test10"
    16. consumerCfg["metadata.broker.list"] = "192.193.168.5:8992";
    17. for(i in 1..1000000) {
    18. aclMsg = select *, string(now()) as pluginSendTime from tbl;
    19. for(i in aclMsg) {
    20. kafka::produce(producer, "test3", "1", i, true);
    21. }
    22. }
    23. consumer = kafka::consumer(consumerCfg)
    24. topics=["test10"];
    25. kafka::subscribe(consumer, topics);
    26. for(i in 1..1000000) {
    27. aclMsg = select *, string(now()) as pluginSendTime from tbl;
    28. for(i in aclMsg) {
    29. kafka::produce(producer, "test10", "1", i, true);
    30. }
    31. }
    • 订阅 Kafka 中数据进行消费
    1. // 创建存储解析完数据的表
    2. colDefName = ["deviceId","timestamps","deviceType","value", "pluginSendTime", "pluginReceived"]
    3. colDefType = [SYMBOL,TIMESTAMP,SYMBOL,DOUBLE,TIMESTAMP,TIMESTAMP]
    4. dest = table(1:0, colDefName, colDefType);
    5. share dest as `streamZd
    6. // 解析函数
    7. def temporalHandle(mutable dictVar, mutable dest){
    8. try{
    9. t = dictVar
    10. t.replaceColumn!(`timestamps, temporalParse(dictVar[`timestamps],"yyyy-MM-dd HH:mm:ss.SSS"))
    11. t.replaceColumn!(`pluginSendTime, timestamp(dictVar[`pluginSendTime]))
    12. t.update!(`received, now());
    13. dest.append!(t);
    14. }catch(ex){
    15. print("kafka errors : " + ex)
    16. }
    17. }
    18. // 创建 decoder
    19. name = colDefName[0:5];
    20. type = colDefType[0:5];
    21. type[1] = STRING;
    22. type[4] = STRING;
    23. decoder = EncoderDecoder::jsonDecoder(name, type, temporalHandle{, dest}, 15, 100000, 0.5)
    24. // 创建subjob函数
    25. kafka::createSubJob(consumer, , decoder, `DecoderKafka)

    此时我们观察共享流表的数据量,当达到 100 万条时说明消费完成,测试结束。

    5. Kerberos 认证

    5.1 什么是 Kerberos ?

    Kerberos 是一种基于加密 Ticket 的身份认证协议,支持双向认证且性能较高。主要有三个组成部分:Kdc, Client 和 Service。

    生产环境的 Kafka 一般需要开启 Kerberos 认证,为 Kafka 提供权限管理,提高安全性。

    5.2 前置条件

    • Java 8+
    • kerberos:包括 Kdc 和 Client
    • keytab 证书

    5.3 认证相关配置说明

    环境相关配置说明

    以下是 Kerberos 认证涉及的关键配置信息,具体配置文件的路径根据实际情况调整

    1. 安装 kdc
    yum install -y krb5-server krb5-libs krb5-workstation krb5-devel krb5-auth-dialog

    2. 配置 /etc/krb5.conf

    1. [realms]
    2. HADOOP.COM = {
    3. kdc = cnserver9:88
    4. admin_server = cnserver9:749
    5. default_domain = HADOOP.COM
    6. }

    3. 配置 /var/kerberos/krb5kdc/kadm5.acl

    */admin@HADOOP.COM	*

    4. 创建生成 kdc 数据库文件

    sudo kdb5_util create -r HADOOP.COM –s

    5. 启动 kerberos 服务

    1. sudo systemctl start krb5kdc
    2. sudo systemctl status krb5kdc

    6. 安装 kerberos 客户端

    yum install -y krb5-devel krb5-workstation krb5-client

    7. 启动 kerberos 客户端

    sudo kadmin.local -q "addprinc hadoop/admin"

    DolphinDB Kafka Plugin 配置说明

    • 关键配置参数说明
      • security.protocol:指定通信协议
      • sasl.kerberos.service.name:指定 service 名称
      • sasl.mechanism:SASL 机制,包括 GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
      • sasl.kerberos.keytab:keytab 文件的路径
      • sasl.kerberos.principal:指定 principal

    • 具体代码实现
    1. // 加载插件
    2. try{loadPlugin("/path/to/DolphinDBPlugin/kafka/bin/linux/PluginKafka.txt")} catch(ex){print(ex)}
    3. // 生产者配置
    4. producerCfg = dict(STRING, ANY);
    5. producerCfg["bootstrap.servers"] = "cnserver9:9992";
    6. producerCfg["security.protocol"] = "SASL_PLAINTEXT";
    7. producerCfg["sasl.kerberos.service.name"] = "kafka";
    8. producerCfg["sasl.mechanism"] = "GSSAPI";
    9. producerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
    10. producerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
    11. producer = kafka::producer(producerCfg);
    12. // 消费者配置
    13. consumerCfg = dict(STRING, ANY)
    14. consumerCfg["group.id"] = "test"
    15. consumerCfg["bootstrap.servers"] = "cnserver9:9992";
    16. consumerCfg["security.protocol"] = "SASL_PLAINTEXT";
    17. consumerCfg["sasl.kerberos.service.name"] = "kafka";
    18. consumerCfg["sasl.mechanism"] = "GSSAPI";
    19. consumerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
    20. consumerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
    21. consumer = kafka::consumer(consumerCfg)
    注意:适配 Kerberos 认证只需修改 Kafka 插件有关生产者和消费者的配置即可,其余脚本无需改动。

    6. 其他说明

    本教程展示了 DolphinDB Kafka Plugin 中常用的接口函数,完整的函数支持请参考官网文档:DolphinDB Kafka 插件官方教程

    使用过程中如果遇到任何问题,欢迎大家在项目仓库反馈

     

  • 相关阅读:
    《妃梦千年》第十四章-第十五章:重重困局,风云再起
    已知 list 数组请写出一段代码,实现功能: 数组内type 相同的元素只保留 votes 最大的元素,并且元素需要按照 votes 从大到小排序。
    2022 前端面试题
    241.为运算表达式设计优先级
    (一)shell编程
    WebRTC Native M96 基础Base模块介绍之网络相关的封装
    MySQL第一弹
    2022华为中央媒体技术院AI算法工程师FindStar一面+二面+主管面
    redis 分布式锁
    如何设置让vs 在生成程序错误的情况下不去执行上一个可以执行的程序?
  • 原文地址:https://blog.csdn.net/feidododekefu/article/details/127921026