• 用 Kafka + DolphinDB 实时计算K线


    Kafka 是一个高吞吐量的分布式消息中间件,可用于海量消息的发布和订阅。

    当面对大量的数据写入时,以消息中间件接收数据,然后再批量写入到时序数据库中,这样可以将消息中间件的高并发能力和时序数据库的高吞吐量联合起来,更好地解决海量数据的实时处理和存储问题。

    本篇教程,我们会向大家详细介绍 DolphinDB Kafka 插件的使用方式,并以一个“DolphinDB + Kafka 实时计算k线”的案例,向大家展示 DolphinDB Kafka 插件的最佳实践指南。

    1. DolphinDB Kafka 插件介绍

    DolphinDB Kafka 插件支持把 DolphinDB 中生产的数据推送到 Kafka,也支持从 Kafka订阅数据,并在DolphinDB中消费。用户可以在 DolphinDB 中实例化 Producer 对象,把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。用户也可以在 DolphinDB 中实例化 Consumer 对象,将 Kafka 中指定 Topic 的数据同步到 DolphinDB。DolphinDB Kafka 插件目前支持以下数据类型的序列化和反序列化:

    • DolphinDB 标量
    • Kafka Java API 的内置类型:String(UTF-8) , Short , Integer , Long , Float , Double , Bytes , byte[] 以及 ByteBuffer
    • 以上数据类型所组成的向量

    Kafka 插件目前支持版本:relsease200release130。本教程基于 Kafka Plugin release200 开发,请使用 DolphinDB 2.00.X 版本 server 进行相关测试。若需要测试其它版本 server,请切换至相应插件分支下载插件包进行测试。

    2. 基本使用介绍

    2.1 安装 DolphinDB Kafka 插件

    用户可根据 DolphinDB server 版本和操作系统下载对应的已经编译好的插件文件,官方下载链接。手动编译安装可以参考官网文档教程:DolphinDB Kafka 插件官方教程

    以 Linux 为例,下载好插件文件后需要添加动态库地址到环境变量中,注意插件安装的路径,需要根据实际环境修改,本例中插件的安装路径为 /DolphinDB/server/plugins/kafka,执行命令如下:

    export LD_LIBRARY_PATH="LD_LIBRARY_PATH:/DolphinDB/server/plugins/kafka"

    2.2 使用 DolphinDB Kafka Producer

    语法

    kafka::producer(config)
    • config:字典类型,表示DolphinDB Kafka Producer 的配置。字典的键是一个字符串,值是一个字符串或布尔值。有关 Kafka 配置的更多信息,请参阅 配置参数列表

    该函数调用后,会根据指定配置创建一个 Kafka Producer 实例,并返回句柄。

    kafka::produce(producer, topic, key, value, json, [partition])
    • producer:Kafka 生产者的句柄
    • topic:Kafka 的主题
    • key:Kafka 生产者配置字典的键
    • value:Kafka 生产者配置字典的值
    • json:表示是否以 json 格式传递数据
    • partition:可选参数,整数,表示 Kafka 的 broker 分区

    该函数调用后会,可以把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。

    下面通过例子,展示如何实时同步 DolphinDB 流数据表 KafkaTest 中的增量数据到 Kafka 的 dolphindb-producer-test Topic 中。

    DolphinDB 中创建 Producer 实例

    DolphinDB GUI 连接 DolphinDB 节点后执行以下脚本,加载 DolphinDB Kafka 插件:

    try{loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")} catch(ex){print(ex)}
    注意:
    本例中插件的安装路径为  /DolphinDB/server/plugins/kafka,用户需要根据自己实际环境进行修改。

    每次启动 DolphinDB 服务后,只需手动加载一次即可。也可以设置为自动加载,参考教程:自动加载插件教程

    DolphinDB GUI 中执行以下脚本, 创建 Producer 实例,注意需要根据实际环境配置 metadata.broker.list 参数:

    1. producerCfg = dict(STRING, ANY)
    2. producerCfg["metadata.broker.list"] = "192.193.168.4:9092"
    3. producer = kafka::producer(producerCfg)

    模拟测试数据生成

    DolphinDB GUI 中执行以下脚本,模拟测试数据生成:

    share streamTable(take(1, 86400) as id, 2020.01.01T00:00:00 + 0..86399 as datetime, rand(1..100, 86400) as val) as `kafkaTest

    测试数据共有 86400 行,包含三列:id (INT 类型), datetime(DATETIME 类型)和 val(INT 类型),如下表所示

    Kafka 创建 Topic : dolphindb-producer-test

    使用 Kafka 集群自带的 kafka-topics.sh 终端命令创建 Topic:

    bin/kafka-topics.sh --create --topic dolphindb-producer-test --bootstrap-server 192.193.168.4:9092

    控制台输出结果:

    Created topic dolphindb-producer-test.

    DolphinDB 流数据表中的数据同步至 Kafka

    DolphinDB GUI 中执行以下脚本,声明自定义流数据表订阅的处理函数:

    1. def sendMsgToKafkaFunc(producer, msg){
    2. try {
    3. kafka::produce(producer, "dolphindb-producer-test", 1, msg, true)
    4. }
    5. catch(ex) {
    6. writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex)
    7. }
    8. }

    DolphinDB GUI 中执行以下脚本,订阅 DolphinDB 的流数据表 kafkaTest,处理函数是 sendMsgToKafkaFunc,将流数据表内的增量数据实时推送到 Kafka 的 dolphindb-producer-test Topic 中:

    subscribeTable(tableName="kafkaTest", actionName="sendMsgToKafka", offset=0, handler=sendMsgToKafkaFunc{producer}, msgAsTable=true, reconnect=true)

    验证数据

    使用 kafka-console-consumer 命令行工具消费 Topic 为 dolphindb-producer-test 中的数据。

    执行下述语句,首先会输出流数据表中的历史数据,往流数据表中插入新的数据后,kafka-console-consumer 会立即输出新增的数据:

    ./bin/kafka-console-consumer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic dolphindb-producer-test

    控制台会打印已消费的数据,输出结果如下:

    ... {"id":[1,1,...],"datetime":["2020.01.01T10:55:12","2020.01.01T10:55:13",...],"val":[73,74,...]} {"id":[1,1,...],"datetime":["2020.01.01T23:55:12","2020.01.01T23:55:13",...],"val":[88,1,...]} ...

    接下来在 DolphinDB GUI 中执行以下脚本,往流数据表 kafkaTest 中新插入两条数据:

    insert into kafkaTest values(2,now(),rand(1..100)) insert into kafkaTest values(2,now(),rand(1..100))

    控制台输出结果:

    1. {"id":[2],"datetime":["2022.08.16T11:08:27"],"val":[23]}
    2. {"id":[2],"datetime":["2022.08.16T11:10:42"],"val":[11]}

    由此验证 DolphinDB Kafka Producer 生产数据的完整性和正确性。

    2.3 使用 DolphinDB Kafka Consumer

    语法

    kafka::consumer(config)
    • config:字典类型,表示 Kafka 消费者的配置。字典的键是一个字符串,值是一个元组。有关 Kafka 配置的更多信息, 请参考 Kafka 使用手册

    该函数调用后,会根据指定配置创建一个 Kafka Consumer 实例,并返回句柄。

    下面通过例子,展示如何在 DolphinDB 中订阅消费 kafka 中 Topic 为 dolphindb-consumer-test 的数据,将其实时同步到流数据表 KafkaTest 中。

    DolphinDB 中创建 Consumer 实例

    DolphinDB GUI 中执行以下脚本, 创建 Consumer 实例:

    1. consumerCfg = dict(string, any)
    2. consumerCfg["metadata.broker.list"] = "192.193.168.4:9092"
    3. consumerCfg["group.id"] = "test"
    4. consumer = kafka::consumer(consumerCfg)

    DolphinDB 中消费数据

    DolphinDB GUI 中执行以下脚本,创建一张共享内存表 kafkaTest

    share table(1:0,`id`timestamp`val,[INT,TIMESTAMP,INT]) as `kafkaTest

    DolphinDB GUI 中执行以下脚本,订阅 Kafka 中的 dolphindb-consumer-test 主题的数据:

    1. topics = ["dolphindb-consumer-test"]
    2. kafka::subscribe(consumer, topics)
    注意:订阅函数支持传入一个string类型的向量,实现同时订阅多个topic

    DolphinDB GUI 中执行以下脚本,定义多线程轮询处理消费队列:

    1. def parse(x) {
    2. dict = parseExpr(x).eval()
    3. return table(dict[`id] as `id, dict[`timestamp] as `datetime, dict[`val] as `val)
    4. }
    5. conn = kafka::createSubJob(consumer, kafkaTest, parse, "kafka consumer")

    DolphinDB GUI 中执行以下脚本,查看订阅状态:

    kafka::getJobStat()

    返回:

    subscriptionIduserdescriptioncreateTimestamp
    80773376adminkafka consumer2022.08.19T06:46:06.072

    验证数据

    使用 kafka-console-producer 终端工具,在控制台输入消息生产到 kafka:

    ./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --topic dolphindb-consumer-test

    控制台输入消息:

    1. {"id":1001,"timestamp":1660920813123,"val":1000}
    2. {"id":1001,"timestamp":1660920814123,"val":2000}
    3. {"id":1001,"timestamp":1660920815123,"val":3000}

    通过 DolphinDB GUI 查看流数据表中的结果,如下图所示:

    由此验证 DolphinDB Kafka Consumer 消费数据的完整性和正确性,消费吞吐量相关的信息见第四章。

    DolphinDB GUI 中执行以下脚本,取消订阅:

    kafka::cancelSubJob(conn)

    3. 通过 Kafka 插件实时计算K线

    3.1 环境准备

    • 部署 DolphinDB 集群,版本为v2.00.7

    3.2 生产数据

    本节通过 DolphinDB 的 replay 历史数据回放工具和 Kafka 插件,把逐笔成交数据实时发送到 Kafka 中。

    Kafka 创建 Topic :topic-message

    使用 Kafka 集群自带的 kafka-topics.sh 终端命令创建 Topic:

    ./bin/kafka-topics.sh --create --topic topic-message --bootstrap-server 192.193.168.4:9092

    控制台输出结果:

    Created topic topic-message.

    加载 Kafka 插件并创建 Kafka Producer

    DolphinDB GUI 连接 DolphinDB 节点后执行以下脚本:

    1. // 加载插件
    2. path = "/DolphinDB/server/plugins/kafka"
    3. loadPlugin(path + "/PluginKafka.txt")
    4. loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt");
    5. // 定义创建 Kafka Producer 的函数
    6. def initKafkaProducerFunc(metadataBrokerList){
    7. producerCfg = dict(STRING, ANY)
    8. producerCfg["metadata.broker.list"] = metadataBrokerList
    9. return kafka::producer(producerCfg)
    10. }
    11. // 创建 Kafka Producer 并返回句柄
    12. producer = initKafkaProducerFunc("192.193.168.5:8992")
    • 本例中插件的安装路径为 /DolphinDB/server/plugins/kafka,用户需要根据自己实际环境进行修改。
    • 推荐 Kafka server 和 DolphinDB Server 在同一网段中。

    推送数据到 Kafka Topic

    DolphinDB GUI 中执行以下脚本:

    1. // 定义推送数据到 KafKa "topic-message" Topic 的函数
    2. def sendMsgToKafkaFunc(dataType, producer, msg){
    3. startTime = now()
    4. try {
    5. for(i in msg){
    6. kafka::produce(producer, "topic-message", 1, i, true)
    7. }
    8. cost = now() - startTime
    9. writeLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.")
    10. }
    11. catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)}
    12. }
    13. // 创建 DolphinDB 流数据表 tickStream
    14. colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
    15. colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
    16. share(streamTable(35000000:0, colName, colType), `tickStream)
    17. // 订阅 tickStream,处理函数是 sendMsgToKafkaFunc
    18. subscribeTable(tableName="tickStream", actionName="sendMsgToKafka", offset=-1, handler=sendMsgToKafkaFunc{`tick, producer}, msgAsTable=true, reconnect=true, batchSize=10000,throttle=1)
    19. getHomeDir()
    20. // 控速回放 DolphinDB 分布式中的历史数据至 tickStream
    21. dbName = "dfs://SH_TSDB_tick"
    22. tbName = "tick"
    23. replayDay = 2021.12.08
    24. testData = select * from loadTable(dbName, tbName) where date(TradeTime)=replayDay, time(TradeTime)>=09:30:00.000, time(TradeTime)<=10:00:00.000 order by TradeTime, SecurityID
    25. submitJob("replay", "replay", replay, testData, objByName("tickStream"), `TradeTime, `TradeTime, 2000, true, 4)

    kafka::produce 函数会将任意表结构的 msg 以 json 格式发送至指定的 Kafka topic。此处的 writeLog 函数会在 DolphinDB 节点的运行日志中打印每批推送的情况,方便代码调试和运维观察。

    可以使用 kafka-console-consumer 命令行工具消费 Topic 为 topic-message 中的数据,验证数据是否成功写入:

    ./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic topic-message

    3.3 消费数据

    创建消费者,主题并进行订阅

    DolphinDB GUI 中执行以下脚本:

    1. // 创建 Kafka Consumer 并返回句柄
    2. consumerCfg = dict(STRING, ANY)
    3. consumerCfg["metadata.broker.list"] = "192.193.168.5:8992"
    4. consumerCfg["group.id"] = "topic-message"
    5. consumer = kafka::consumer(consumerCfg)
    6. // 订阅 Kafka 中的 "topic-message" 主题的数据
    7. topics = ["topic-message"]
    8. kafka::subscribe(consumer, topics);

    DolphinDB 订阅 Kafka消息队列中数据

    DolphinDB GUI 中执行以下脚本:

    1. // 订阅 Kafka 发布消息,写入流表 tickStream_kafka
    2. colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
    3. colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
    4. share(streamTable(35000000:0, colName, colType), `tickStreamkafka)
    5. go
    6. // Kafka 消息解析函数
    7. def parse(mutable dictVar, mutable tickStreamkafka){
    8. try{
    9. t = dictVar
    10. t.replaceColumn!(`TradeTime, temporalParse(dictVar[`TradeTime],"yyyy.MM.ddTHH:mm:ss.SSS"))
    11. tickStreamkafka.append!(t);
    12. }catch(ex){
    13. print("kafka errors : " + ex)
    14. }
    15. }
    16. colType[1] = STRING;
    17. decoder = EncoderDecoder::jsonDecoder(colName, colType, parse{, tickStreamkafka}, 15, 100000, 0.5)
    18. // 创建 subjob 函数
    19. conn = kafka::createSubJob(consumer, , decoder, "topic-message")

    3.4 流计算引擎实时计算 K 线

    使用 DolphinDB 内置流计算引擎计算分钟 K 线,并将结果输出到名为 OHLCVwap的结果表中。

    DolphinDB GUI 中执行以下脚本:

    1. // 创建接收实时计算结果的流数据表
    2. colName = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`Vwap
    3. colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
    4. share(streamTable(2000000:0, colName, colType), `OHLCStream)
    5. // K 线指标计算元表达式
    6. aggrMetrics = <[ first(TradePrice), max(TradePrice), min(TradePrice), last(TradePrice), wavg(TradePrice, TradeQty) ]>
    7. // 创建引擎并将 kafka 中订阅的数据注入流计算引擎
    8. createTimeSeriesEngine(name="OHLCVwap", windowSize=60000, step=60000, metrics=aggrMetrics, dummyTable=objByName("tickStreamkafka"), outputTable=objByName("OHLCStream"), useSystemTime=true, keyColumn=`SecurityID, useWindowStartTime=false)
    9. subscribeTable(tableName="tickStreamkafka", actionName="OHLCVwap", offset=-1, handler=getStreamEngine("OHLCVwap"), msgAsTable=true, batchSize=1000, throttle=1, hash=0)
    • 设置参数 offset 为 - 1,订阅将会从提交订阅时流数据表的当前行开始。
    • 设置 useSystemTime=true,表示时间序列引擎会按照数据注入时间序列引擎的时刻(毫秒精度的本地系统时间,与数据中的时间列无关),每隔固定时间截取固定长度窗口的流数据进行计算。

    4. 性能测试

    4.1 硬件环境

    类型配置
    CPUIntel(R) Xeon(R) Gold 5220R CPU @ 2.20GHz
    内存512 GB
    网络带宽10 Gbps
    硬盘SSD (500 MB/s 读写)

    4.2 软件环境

    • DolphinDB:2.00.7
    • 内核版本: Linux 3.10.0-1160.el7.x86_64
    • 操作系统版本:CentOS Linux 7 (Core)
    • Kafka版本:2.13-3.1.0
    • JDK:1.8

    4.3 测试结果

    测试数据表结构如下:

    列名DolphinDB 数据类型
    deviceIdSYMBOL
    timestampsTIMESTAMP
    deviceTypeSYMBOL
    valueDOUBLE
    pluginSendTimeTIMESTAMP
    pluginReceivedTIMESTAMP

    测试结果如下:

    数据量耗时(s)RPS吞吐(MB/s)
    100W812 万38
    500W45.211 万37
    1000W92.111 万37

    测试结果说明

    • 测试环境为生产级别的常用配置,目的是降低用户选型评估成本
    • 测试结果为执行10次取平均值
    • 指标 RPS 是指每秒消费的记录数

    4.4 测试流程

    相关说明

    • 启动测试前清空所有数据。
    • 每次测试先把所有数据写入 Kafka,再加载 Kafka 插件同步数据到 DolphinDB中。目的是将同步数据的压力全部集中到 Kafka 插件。
    • 以Kafka插件从收到第一批数据到收到最后一批数据的时间差作为同步数据的总耗时。

    测试流程

    • 加载 Kafka 插件并创建 Kafka Producer 发送数据到 Kafka 中(以发送100万条数据为例)

    DolphinDB GUI 连接 DolphinDB 执行以下脚本,本例中插件的安装路径为 /DolphinDB/server/plugins/kafka,用户需要根据自己实际环境进行修改:

    1. // 加载插件
    2. try{
    3. loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")
    4. loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt")
    5. } catch(ex){print(ex)}
    6. // 创建 Producer
    7. producerCfg = dict(STRING, ANY);
    8. producerCfg["metadata.broker.list"] = "192.193.168.5:8992";
    9. producer = kafka::producer(producerCfg);
    10. kafka::producerFlush(producer);
    11. //向kafka传100万数据
    12. tbl = table("R5L1B3T1N03D01" as deviceId, "2022-02-22 13:55:47.377" as timestamps, "voltage" as deviceType , 1.5 as value )
    13. // 创建 Consume
    14. consumerCfg = dict(STRING, ANY)
    15. consumerCfg["group.id"] = "test10"
    16. consumerCfg["metadata.broker.list"] = "192.193.168.5:8992";
    17. for(i in 1..1000000) {
    18. aclMsg = select *, string(now()) as pluginSendTime from tbl;
    19. for(i in aclMsg) {
    20. kafka::produce(producer, "test3", "1", i, true);
    21. }
    22. }
    23. consumer = kafka::consumer(consumerCfg)
    24. topics=["test10"];
    25. kafka::subscribe(consumer, topics);
    26. for(i in 1..1000000) {
    27. aclMsg = select *, string(now()) as pluginSendTime from tbl;
    28. for(i in aclMsg) {
    29. kafka::produce(producer, "test10", "1", i, true);
    30. }
    31. }
    • 订阅 Kafka 中数据进行消费
    1. // 创建存储解析完数据的表
    2. colDefName = ["deviceId","timestamps","deviceType","value", "pluginSendTime", "pluginReceived"]
    3. colDefType = [SYMBOL,TIMESTAMP,SYMBOL,DOUBLE,TIMESTAMP,TIMESTAMP]
    4. dest = table(1:0, colDefName, colDefType);
    5. share dest as `streamZd
    6. // 解析函数
    7. def temporalHandle(mutable dictVar, mutable dest){
    8. try{
    9. t = dictVar
    10. t.replaceColumn!(`timestamps, temporalParse(dictVar[`timestamps],"yyyy-MM-dd HH:mm:ss.SSS"))
    11. t.replaceColumn!(`pluginSendTime, timestamp(dictVar[`pluginSendTime]))
    12. t.update!(`received, now());
    13. dest.append!(t);
    14. }catch(ex){
    15. print("kafka errors : " + ex)
    16. }
    17. }
    18. // 创建 decoder
    19. name = colDefName[0:5];
    20. type = colDefType[0:5];
    21. type[1] = STRING;
    22. type[4] = STRING;
    23. decoder = EncoderDecoder::jsonDecoder(name, type, temporalHandle{, dest}, 15, 100000, 0.5)
    24. // 创建subjob函数
    25. kafka::createSubJob(consumer, , decoder, `DecoderKafka)

    此时我们观察共享流表的数据量,当达到 100 万条时说明消费完成,测试结束。

    5. Kerberos 认证

    5.1 什么是 Kerberos ?

    Kerberos 是一种基于加密 Ticket 的身份认证协议,支持双向认证且性能较高。主要有三个组成部分:Kdc, Client 和 Service。

    生产环境的 Kafka 一般需要开启 Kerberos 认证,为 Kafka 提供权限管理,提高安全性。

    5.2 前置条件

    • Java 8+
    • kerberos:包括 Kdc 和 Client
    • keytab 证书

    5.3 认证相关配置说明

    环境相关配置说明

    以下是 Kerberos 认证涉及的关键配置信息,具体配置文件的路径根据实际情况调整

    1. 安装 kdc
    yum install -y krb5-server krb5-libs krb5-workstation krb5-devel krb5-auth-dialog

    2. 配置 /etc/krb5.conf

    1. [realms]
    2. HADOOP.COM = {
    3. kdc = cnserver9:88
    4. admin_server = cnserver9:749
    5. default_domain = HADOOP.COM
    6. }

    3. 配置 /var/kerberos/krb5kdc/kadm5.acl

    */admin@HADOOP.COM	*

    4. 创建生成 kdc 数据库文件

    sudo kdb5_util create -r HADOOP.COM –s

    5. 启动 kerberos 服务

    1. sudo systemctl start krb5kdc
    2. sudo systemctl status krb5kdc

    6. 安装 kerberos 客户端

    yum install -y krb5-devel krb5-workstation krb5-client

    7. 启动 kerberos 客户端

    sudo kadmin.local -q "addprinc hadoop/admin"

    DolphinDB Kafka Plugin 配置说明

    • 关键配置参数说明
      • security.protocol:指定通信协议
      • sasl.kerberos.service.name:指定 service 名称
      • sasl.mechanism:SASL机制,包括 GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
      • sasl.kerberos.keytab:keytab 文件的路径
      • sasl.kerberos.principal:指定 principal

    • 具体代码实现
    1. // 加载插件
    2. try{loadPlugin("/path/to/DolphinDBPlugin/kafka/bin/linux/PluginKafka.txt")} catch(ex){print(ex)}
    3. // 生产者配置
    4. producerCfg = dict(STRING, ANY);
    5. producerCfg["bootstrap.servers"] = "cnserver9:9992";
    6. producerCfg["security.protocol"] = "SASL_PLAINTEXT";
    7. producerCfg["sasl.kerberos.service.name"] = "kafka";
    8. producerCfg["sasl.mechanism"] = "GSSAPI";
    9. producerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
    10. producerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
    11. producer = kafka::producer(producerCfg);
    12. // 消费者配置
    13. consumerCfg = dict(STRING, ANY)
    14. consumerCfg["group.id"] = "test"
    15. consumerCfg["bootstrap.servers"] = "cnserver9:9992";
    16. consumerCfg["security.protocol"] = "SASL_PLAINTEXT";
    17. consumerCfg["sasl.kerberos.service.name"] = "kafka";
    18. consumerCfg["sasl.mechanism"] = "GSSAPI";
    19. consumerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
    20. consumerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
    21. consumer = kafka::consumer(consumerCfg)
    注意:适配 Kerberos 认证只需修改 Kafka 插件有关生产者和消费者的配置即可,其余脚本无需改动。

    6. 其他说明

    本教程展示了 DolphinDB Kafka Plugin 中常用的接口函数,完整的函数支持请参考官网文档:DolphinDB Kafka 插件官方教程

    使用过程中如果遇到任何问题,欢迎大家在项目仓库反馈

    7. 参考链接

  • 相关阅读:
    代码随想录49——动态规划:121买卖股票的最佳时机、122买卖股票的最佳时机II
    SQL中的数据类型和规范化,助力数据存储优化
    前端例程20220815:拟物风格复选按钮
    Android官方推荐 无需向应用授予的照片选择器工具
    Strings数据类型
    LeetCode123
    [山东科技大学OJ]1216 Problem D: 编写函数:字符串的复制 之二 (Append Code)
    项目人力资源管理
    cenos自动启动tomcat
    vue中实现签名画板
  • 原文地址:https://blog.csdn.net/qq_41996852/article/details/127903189