• 南大通用数据库-Gbase-8a-学习-19-Gbase8a从Kafka订阅Topic消费数据


    一、测试环境

    名称
    cpuIntel® Core™ i5-1035G1 CPU @ 1.00GHz
    操作系统CentOS Linux release 7.9.2009 (Core)
    内存4G
    逻辑核数3
    Gbase-8a节点-IP192.168.142.11
    zookeeper节点-IP192.168.142.10
    kafka节点-IP192.168.142.10
    Gbase-8a数据库版本8.6.2.43-R33.132743
    zookeeper版本3.4.9
    kafka版本2.11-0.10.2.1

    二、安装ZooKeeper

    1、解压

    [root@czg0 install]# tar -xvf zookeeper-3.4.9.tar.gz
    
    • 1

    2、zoo.cfg

    [root@czg0 zookeeper-3.4.9]# mkdir log
    
    [root@czg0 zookeeper-3.4.9]# mkdir data
    
    [root@czg0 zookeeper-3.4.9]# cd conf/
    
    [root@czg0 conf]# ll
    总用量 12
    -rw-rw-r-- 1 gbase gbase  535 823 2016 configuration.xsl
    -rw-rw-r-- 1 gbase gbase 2161 823 2016 log4j.properties
    -rw-rw-r-- 1 gbase gbase  922 823 2016 zoo_sample.cfg
    
    [root@czg0 conf]# cp zoo_sample.cfg zoo.cfg 
    
    [root@czg0 conf]# cat zoo.cfg 
    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=/opt/install/zookeeper-3.4.9/data
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    dataLogDir=/opt/install/zookeeper-3.4.9/log
    server.0=czg0:2888:3888
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    参数名描述
    dataDirzookeeper 数据存放目录。
    clientPortzookeeper 服务监听端口号。
    serverzookeeper 集群部署IP。

    3、myid

    [root@czg0 conf]# cat /opt/install/zookeeper-3.4.9/data/myid 
    0
    
    • 1
    • 2

    4、启动服务

    [root@czg0 bin]# ./zkServer.sh start
    
    [root@czg0 bin]# ./zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /opt/install/zookeeper-3.4.9/bin/../conf/zoo.cfg
    Mode: standalone
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    三、安装Kafka

    1、解压

    [root@czg0 install]# tar -xvf kafka_2.11-0.10.2.1.tgz 
    
    • 1

    2、修改配置文件

    (1)server.properties
    [root@czg0 config]# cat server.properties 
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    # see kafka.server.KafkaConfig for additional details and defaults
    
    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    # Switch to enable topic deletion or not, default value is false
    #delete.topic.enable=true
    
    ############################# Socket Server Settings #############################
    
    # The address the socket server listens on. It will get the value returned from 
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    #   FORMAT:
    #     listeners = listener_name://host_name:port
    #   EXAMPLE:
    #     listeners = PLAINTEXT://your.host.name:9092
    listeners=PLAINTEXT://czg0:9092
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    #advertised.listeners=PLAINTEXT://your.host.name:9092
    
    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    
    # The number of threads handling network requests
    num.network.threads=3
    
    # The number of threads doing disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    # A comma seperated list of directories under which to store log files
    log.dirs=/opt/install/kafka_2.11-0.10.2.1/log
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=czg0:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    参数名描述
    broker.id该值是 broker 节点在集群的 id 值,要求多节点部署的时候,可以不连续,但必须保证唯一。
    log.dirs该值是指定broker保存数据的目录,可以指定多个目录,多目录使用逗号分隔,不允许目录前后有空格。多节点部署时,尽量使用一个目录存储数据。
    zookeeper.connect该值是指定 broker 连接 zookeeper 服务的,多 zookeeper 服务间使用逗号分隔,不允许服务间有空格。否则会导致连接失败。端口也要与上一章节配置的 zookeeper 的服务端口一致。
    (2)zookeeper.properties
    [root@czg0 config]# cat zookeeper.properties
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    # 
    #    http://www.apache.org/licenses/LICENSE-2.0
    # 
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # the directory where the snapshot is stored.
    dataDir=/opt/install/zookeeper-3.4.9/data
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0
    initLimit=5
    syncLimit=2
    server.1=czg0:2888:3888
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3、启动 Kafka

    [root@czg0 bin]# ./kafka-server-start.sh -daemon ../config/server.properties  
    
    • 1

    检查进程是否存在

    [root@czg0 kafka_2.11-0.10.2.1]# ps -ef|grep kafka
    
    • 1

    检查日志是否报错

    [root@czg0 kafka_2.11-0.10.2.1]# cat logs/server.log
    
    • 1

    4、创建主题

    [root@czg0 bin]# ./kafka-topics.sh --create --zookeeper czg0:2181 -partitions 1 --replication-factor 1 --topic sun
    Created topic "sun".
    
    • 1
    • 2

    5、查看主题

    [root@czg0 bin]# ./kafka-topics.sh --list --zookeeper czg0:2181
    sun
    
    • 1
    • 2

    6、测试发布订阅主题

    (1)发布主题

    会话一

    [root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic sun
    Hello !!!
    Sun
    
    • 1
    • 2
    • 3
    (2)订阅主题

    会话二

    [root@czg0 bin]# ./kafka-console-consumer.sh --bootstrap-server czg0:9092 --topic sun
    
    Hello !!!
    Sun
    
    • 1
    • 2
    • 3
    • 4

    如果想从最开始的消息查看

    [root@czg0 bin]# ./kafka-console-consumer.sh --bootstrap-server czg0:9092 --topic sun --from-beginning
    
    czg
    zxj
    haha
    
    Hello!!!
    Sun
    Czg
    Zxj
    Hello !!!
    Sun
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    四、Load订阅Topic消费数据

    1、Load语法

    语法树大家可以参考之前的文章南大通用数据库-Gbase-8a-学习-14-LOAD加载数据

    kafka://broker/topic[?[duration=XX][&][partition=partitionid|offset][#frombeginning]
    
    • 1
    参数名描述
    brokerkafka的IP和端口,比如10.0.2.201:9092
    topickafka的数据源的topic名字,注意不要有横线,只包含常见的字母数字和下划线,不要有特殊字符,横线等。
    duration获取数据提交间隔。当达到该时间后,将提交这部分数据,保存到数据库。一般加载建议,30-300秒都是合适的。太短的间隔会导致数据库磁盘负载增加。
    partitionkafka里的分区,partitionid 分区编号,offset:偏移量。 最初的数据从0开始,但存在老化删除的情况。
    #frombeginning整个topic从头开始。注意不一定是0,因为有老化。也请注意这个参数和partition的区别,那个是指定某个partition从头开始,这个用来指定整个topic从头开始。不能出现2次,因为从语义上是冲突的。

    2、创建测试表

    gbase> create table sunczg (a int, b varchar(10));
    Query OK, 0 rows affected (Elapsed: 00:00:01.85)
    
    gbase> desc sunczg;
    +-------+-------------+------+-----+---------+-------+
    | Field | Type        | Null | Key | Default | Extra |
    +-------+-------------+------+-----+---------+-------+
    | a     | int(11)     | YES  |     | NULL    |       |
    | b     | varchar(10) | YES  |     | NULL    |       |
    +-------+-------------+------+-----+---------+-------+
    2 rows in set (Elapsed: 00:00:00.00)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3、发布测试主题

    [root@czg0 bin]# ./kafka-topics.sh --create --zookeeper czg0:2181 -partitions 1 --replication-factor 1 --topic moon
    Created topic "moon".
    
    [root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic moon
    1,czg
    2,zxj
    3,moon
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4、加载数据

    (1)从开始加载数据
    gbase> load data infile 'kafka://czg0:9092/moon?duration=1000#frombeginning' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
    Query OK, 3 rows affected (Elapsed: 00:00:02.81)
    Task 2621747 finished, Loaded 3 records, Skipped 0 records
    
    gbase> select * from sunczg;
    +------+------+
    | a    | b    |
    +------+------+
    |    1 | czg  |
    |    2 | zxj  |
    |    3 | moon |
    +------+------+
    3 rows in set (Elapsed: 00:00:00.00)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    (2)按照分片加载数据

    我多发布了一条数据:4,sun
    我这边加载的是0号分片,其中有数据。

    gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=0|0' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
    Query OK, 4 rows affected (Elapsed: 00:00:10.91)
    Task 2621748 finished, Loaded 4 records, Skipped 0 records
    
    gbase> select * from sunczg;
    +------+------+
    | a    | b    |
    +------+------+
    |    1 | czg  |
    |    2 | zxj  |
    |    3 | moon |
    |    1 | czg  |
    |    2 | zxj  |
    |    3 | moon |
    |    4 | sun  |
    +------+------+
    7 rows in set (Elapsed: 00:00:00.12)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    加载的1号分片,其中没有数据。

    gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=1|0' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
    Query OK, 0 rows affected (Elapsed: 00:00:10.70)
    Task 2621750 finished, Loaded 0 records, Skipped 0 records
    
    • 1
    • 2
    • 3
    (3)分片偏移量测试
    gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=0|1' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
    Query OK, 3 rows affected (Elapsed: 00:00:11.53)
    Task 2621749 finished, Loaded 3 records, Skipped 0 records
    
    gbase> 
    gbase> select * from sunczg;
    +------+------+
    | a    | b    |
    +------+------+
    |    1 | czg  |
    |    2 | zxj  |
    |    3 | moon |
    |    1 | czg  |
    |    2 | zxj  |
    |    3 | moon |
    |    4 | sun  |
    |    2 | zxj  |
    |    3 | moon |
    |    4 | sun  |
    +------+------+
    10 rows in set (Elapsed: 00:00:00.27)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    偏移量是1,跳过第一条数据,从第二条数据开始加载,也就是说,从2 |,zxj开始加载数据。

    (4)增量加载数据

    发布新数据8,lulu。
    注意:不指定offset则不会重复消费。

    [root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic moon
    1,czg
    2,zxj
    3,moon
    4,sun
    5,haha
    5,haha
    6,test
    7,happy
    8,lulu
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    gbase> select * from sunczg;
    +------+-------+
    | a    | b     |
    +------+-------+
    |    1 | czg   |
    |    2 | zxj   |
    |    3 | moon  |
    |    1 | czg   |
    |    2 | zxj   |
    |    3 | moon  |
    |    4 | sun   |
    |    2 | zxj   |
    |    3 | moon  |
    |    4 | sun   |
    |    1 | czg   |
    |    2 | zxj   |
    |    3 | moon  |
    |    4 | sun   |
    |    5 | haha  |
    |    5 | haha  |
    |    6 | test  |
    |    7 | happy |
    +------+-------+
    18 rows in set (Elapsed: 00:00:00.11)
    
    gbase> load data infile 'kafka://czg0:9092/moon?duration=10000' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
    Query OK, 1 row affected (Elapsed: 00:00:11.14)
    Task 2621765 finished, Loaded 1 records, Skipped 0 records
    
    gbase> select * from sunczg;
    +------+-------+
    | a    | b     |
    +------+-------+
    |    1 | czg   |
    |    2 | zxj   |
    |    3 | moon  |
    |    1 | czg   |
    |    2 | zxj   |
    |    3 | moon  |
    |    4 | sun   |
    |    2 | zxj   |
    |    3 | moon  |
    |    4 | sun   |
    |    1 | czg   |
    |    2 | zxj   |
    |    3 | moon  |
    |    4 | sun   |
    |    5 | haha  |
    |    5 | haha  |
    |    6 | test  |
    |    7 | happy |
    |    8 | lulu  |
    +------+-------+
    19 rows in set (Elapsed: 00:00:00.00)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    5、查看元数据加载

    查看gclusterdb库的topic名_库名_表名。

    gbase> select * from gclusterdb.moon_czg_sunczg;
    +---------+------------------+---------------------+
    | scn     | partition_offset | commit_time         |
    +---------+------------------+---------------------+
    | 2621747 | 0:3              | 2022-08-26 15:04:03 |
    | 2621748 | 0:4              | 2022-08-26 15:06:28 |
    | 2621749 | 0:4              | 2022-08-26 15:07:03 |
    | 2621750 | 1:0              | 2022-08-26 15:07:39 |
    | 2621760 | 0:6              | 2022-08-26 15:25:09 |
    | 2621762 | 0:7              | 2022-08-26 15:27:05 |
    | 2621764 | 0:8              | 2022-08-26 15:28:02 |
    | 2621765 | 0:9              | 2022-08-26 15:29:10 |
    +---------+------------------+---------------------+
    8 rows in set (Elapsed: 00:00:00.00)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    注意点:
    当一张表从复数个kafka订阅相同主题时,会导致上面的元数据表污染,建议topic名字不重复唯一。

    6、删除元数据表后果(不建议)

    (1)删除表
    gbase> drop table gclusterdb.moon_czg_sunczg;
    Query OK, 0 rows affected (Elapsed: 00:00:00.37)
    
    • 1
    • 2
    (2)加载

    会重新加载所有发布的数据。

    gbase> load data infile 'kafka://czg0:9092/moon?duration=10000' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
    Query OK, 9 rows affected (Elapsed: 00:00:12.86)
    Task 2621768 finished, Loaded 9 records, Skipped 0 records
    
    • 1
    • 2
    • 3
    (3)查看元数据表

    元数据表会根据之前说过的规则重新创建表。

    gbase> select * from gclusterdb.moon_czg_sunczg;
    +---------+------------------+---------------------+
    | scn     | partition_offset | commit_time         |
    +---------+------------------+---------------------+
    | 2621768 | 0:9              | 2022-08-26 15:40:44 |
    +---------+------------------+---------------------+
    1 row in set (Elapsed: 00:00:00.09)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    五、定时订阅Kafka主题

    1、创建Kafka定时任务

    (1)定时任务语法
    create kafka consumer CONSUMER_NAME loader topic TOPIC_NAME brokers  'BROKER[,BROKERn..]' duration DURATION_TIME_MS into table [DBNAME.]TABLENAME;
    
    • 1
    参数名描述
    CONSUMER_NAME自定义消费者名字
    TOPIC_NAMEKafka主题
    BROKERKafka的IP,如果有多个,用逗号分割
    DURATION_TIME_MS消费Kafka数据间隔,单位是毫秒。建议30-300秒以上。
    DBNAME数据库名字
    TABLENAME数据表名字
    (2)创建定时任务
    gbase> create kafka consumer kc_gbase8a loader topic moon brokers  'czg0:9092' duration 3000 into table czg.sunczg;
    Query OK, 0 rows affected (Elapsed: 00:00:03.33)
    create consumer done.
    
    • 1
    • 2
    • 3
    (3)开启定时任务(报错)

    后续有解决办法再分享出来。

    gbase> start kafka consumer kc_gbase8a;
    ERROR 1707 (HY000): gcluster command error: leader thread not started.
    
    • 1
    • 2
  • 相关阅读:
    delphi中new_type的用法
    【Flutter】引入网络图片时,提示:Failed host lookup: ‘[图片host]‘
    如何使用 ABAP 代码发送带有 PDF 附件的电子邮件
    C++模板
    打破千篇一律,DIY属于自己独一无二的商城
    什么是大数据可视化
    golang常用包
    算法训练.
    Spring的@Transactional如何实现的
    对于get请求如何加swagger接口文档的字段注释
  • 原文地址:https://blog.csdn.net/qq_45111959/article/details/126540015