| 名称 | 值 |
|---|---|
| cpu | Intel® Core™ i5-1035G1 CPU @ 1.00GHz |
| 操作系统 | CentOS Linux release 7.9.2009 (Core) |
| 内存 | 4G |
| 逻辑核数 | 3 |
| Gbase-8a节点-IP | 192.168.142.11 |
| zookeeper节点-IP | 192.168.142.10 |
| kafka节点-IP | 192.168.142.10 |
| Gbase-8a数据库版本 | 8.6.2.43-R33.132743 |
| zookeeper版本 | 3.4.9 |
| kafka版本 | 2.11-0.10.2.1 |
[root@czg0 install]# tar -xvf zookeeper-3.4.9.tar.gz
[root@czg0 zookeeper-3.4.9]# mkdir log
[root@czg0 zookeeper-3.4.9]# mkdir data
[root@czg0 zookeeper-3.4.9]# cd conf/
[root@czg0 conf]# ll
总用量 12
-rw-rw-r-- 1 gbase gbase 535 8月 23 2016 configuration.xsl
-rw-rw-r-- 1 gbase gbase 2161 8月 23 2016 log4j.properties
-rw-rw-r-- 1 gbase gbase 922 8月 23 2016 zoo_sample.cfg
[root@czg0 conf]# cp zoo_sample.cfg zoo.cfg
[root@czg0 conf]# cat zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/install/zookeeper-3.4.9/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
dataLogDir=/opt/install/zookeeper-3.4.9/log
server.0=czg0:2888:3888
| 参数名 | 描述 |
|---|---|
| dataDir | zookeeper 数据存放目录。 |
| clientPort | zookeeper 服务监听端口号。 |
| server | zookeeper 集群部署IP。 |
[root@czg0 conf]# cat /opt/install/zookeeper-3.4.9/data/myid
0
[root@czg0 bin]# ./zkServer.sh start
[root@czg0 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/install/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: standalone
[root@czg0 install]# tar -xvf kafka_2.11-0.10.2.1.tgz
[root@czg0 config]# cat server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://czg0:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/opt/install/kafka_2.11-0.10.2.1/log
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=czg0:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
| 参数名 | 描述 |
|---|---|
| broker.id | 该值是 broker 节点在集群的 id 值,要求多节点部署的时候,可以不连续,但必须保证唯一。 |
| log.dirs | 该值是指定broker保存数据的目录,可以指定多个目录,多目录使用逗号分隔,不允许目录前后有空格。多节点部署时,尽量使用一个目录存储数据。 |
| zookeeper.connect | 该值是指定 broker 连接 zookeeper 服务的,多 zookeeper 服务间使用逗号分隔,不允许服务间有空格。否则会导致连接失败。端口也要与上一章节配置的 zookeeper 的服务端口一致。 |
[root@czg0 config]# cat zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/install/zookeeper-3.4.9/data
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=czg0:2888:3888
[root@czg0 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
检查进程是否存在
[root@czg0 kafka_2.11-0.10.2.1]# ps -ef|grep kafka
检查日志是否报错
[root@czg0 kafka_2.11-0.10.2.1]# cat logs/server.log
[root@czg0 bin]# ./kafka-topics.sh --create --zookeeper czg0:2181 -partitions 1 --replication-factor 1 --topic sun
Created topic "sun".
[root@czg0 bin]# ./kafka-topics.sh --list --zookeeper czg0:2181
sun
会话一
[root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic sun
Hello !!!
Sun
会话二
[root@czg0 bin]# ./kafka-console-consumer.sh --bootstrap-server czg0:9092 --topic sun
Hello !!!
Sun
如果想从最开始的消息查看
[root@czg0 bin]# ./kafka-console-consumer.sh --bootstrap-server czg0:9092 --topic sun --from-beginning
czg
zxj
haha
Hello!!!
Sun
Czg
Zxj
Hello !!!
Sun
语法树大家可以参考之前的文章南大通用数据库-Gbase-8a-学习-14-LOAD加载数据
kafka://broker/topic[?[duration=XX][&][partition=partitionid|offset][#frombeginning]
| 参数名 | 描述 |
|---|---|
| broker | kafka的IP和端口,比如10.0.2.201:9092 |
| topic | kafka的数据源的topic名字,注意不要有横线,只包含常见的字母数字和下划线,不要有特殊字符,横线等。 |
| duration | 获取数据提交间隔。当达到该时间后,将提交这部分数据,保存到数据库。一般加载建议,30-300秒都是合适的。太短的间隔会导致数据库磁盘负载增加。 |
| partition | kafka里的分区,partitionid 分区编号,offset:偏移量。 最初的数据从0开始,但存在老化删除的情况。 |
| #frombeginning | 整个topic从头开始。注意不一定是0,因为有老化。也请注意这个参数和partition的区别,那个是指定某个partition从头开始,这个用来指定整个topic从头开始。不能出现2次,因为从语义上是冲突的。 |
gbase> create table sunczg (a int, b varchar(10));
Query OK, 0 rows affected (Elapsed: 00:00:01.85)
gbase> desc sunczg;
+-------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+-------+
| a | int(11) | YES | | NULL | |
| b | varchar(10) | YES | | NULL | |
+-------+-------------+------+-----+---------+-------+
2 rows in set (Elapsed: 00:00:00.00)
[root@czg0 bin]# ./kafka-topics.sh --create --zookeeper czg0:2181 -partitions 1 --replication-factor 1 --topic moon
Created topic "moon".
[root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic moon
1,czg
2,zxj
3,moon
gbase> load data infile 'kafka://czg0:9092/moon?duration=1000#frombeginning' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 3 rows affected (Elapsed: 00:00:02.81)
Task 2621747 finished, Loaded 3 records, Skipped 0 records
gbase> select * from sunczg;
+------+------+
| a | b |
+------+------+
| 1 | czg |
| 2 | zxj |
| 3 | moon |
+------+------+
3 rows in set (Elapsed: 00:00:00.00)
我多发布了一条数据:4,sun
我这边加载的是0号分片,其中有数据。
gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=0|0' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 4 rows affected (Elapsed: 00:00:10.91)
Task 2621748 finished, Loaded 4 records, Skipped 0 records
gbase> select * from sunczg;
+------+------+
| a | b |
+------+------+
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
+------+------+
7 rows in set (Elapsed: 00:00:00.12)
加载的1号分片,其中没有数据。
gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=1|0' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 0 rows affected (Elapsed: 00:00:10.70)
Task 2621750 finished, Loaded 0 records, Skipped 0 records
gbase> load data infile 'kafka://czg0:9092/moon?duration=10000&partition=0|1' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 3 rows affected (Elapsed: 00:00:11.53)
Task 2621749 finished, Loaded 3 records, Skipped 0 records
gbase>
gbase> select * from sunczg;
+------+------+
| a | b |
+------+------+
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
+------+------+
10 rows in set (Elapsed: 00:00:00.27)
偏移量是1,跳过第一条数据,从第二条数据开始加载,也就是说,从2 |,zxj开始加载数据。
发布新数据8,lulu。
注意:不指定offset则不会重复消费。
[root@czg0 bin]# ./kafka-console-producer.sh --broker-list czg0:9092 --topic moon
1,czg
2,zxj
3,moon
4,sun
5,haha
5,haha
6,test
7,happy
8,lulu
gbase> select * from sunczg;
+------+-------+
| a | b |
+------+-------+
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
| 5 | haha |
| 5 | haha |
| 6 | test |
| 7 | happy |
+------+-------+
18 rows in set (Elapsed: 00:00:00.11)
gbase> load data infile 'kafka://czg0:9092/moon?duration=10000' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 1 row affected (Elapsed: 00:00:11.14)
Task 2621765 finished, Loaded 1 records, Skipped 0 records
gbase> select * from sunczg;
+------+-------+
| a | b |
+------+-------+
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
| 1 | czg |
| 2 | zxj |
| 3 | moon |
| 4 | sun |
| 5 | haha |
| 5 | haha |
| 6 | test |
| 7 | happy |
| 8 | lulu |
+------+-------+
19 rows in set (Elapsed: 00:00:00.00)
查看gclusterdb库的topic名_库名_表名。
gbase> select * from gclusterdb.moon_czg_sunczg;
+---------+------------------+---------------------+
| scn | partition_offset | commit_time |
+---------+------------------+---------------------+
| 2621747 | 0:3 | 2022-08-26 15:04:03 |
| 2621748 | 0:4 | 2022-08-26 15:06:28 |
| 2621749 | 0:4 | 2022-08-26 15:07:03 |
| 2621750 | 1:0 | 2022-08-26 15:07:39 |
| 2621760 | 0:6 | 2022-08-26 15:25:09 |
| 2621762 | 0:7 | 2022-08-26 15:27:05 |
| 2621764 | 0:8 | 2022-08-26 15:28:02 |
| 2621765 | 0:9 | 2022-08-26 15:29:10 |
+---------+------------------+---------------------+
8 rows in set (Elapsed: 00:00:00.00)
注意点:
当一张表从复数个kafka订阅相同主题时,会导致上面的元数据表污染,建议topic名字不重复唯一。
gbase> drop table gclusterdb.moon_czg_sunczg;
Query OK, 0 rows affected (Elapsed: 00:00:00.37)
会重新加载所有发布的数据。
gbase> load data infile 'kafka://czg0:9092/moon?duration=10000' into table czg.sunczg DATA_FORMAT 3 FIELDS TERMINATED BY ',' MAX_BAD_RECORDS 0;
Query OK, 9 rows affected (Elapsed: 00:00:12.86)
Task 2621768 finished, Loaded 9 records, Skipped 0 records
元数据表会根据之前说过的规则重新创建表。
gbase> select * from gclusterdb.moon_czg_sunczg;
+---------+------------------+---------------------+
| scn | partition_offset | commit_time |
+---------+------------------+---------------------+
| 2621768 | 0:9 | 2022-08-26 15:40:44 |
+---------+------------------+---------------------+
1 row in set (Elapsed: 00:00:00.09)
create kafka consumer CONSUMER_NAME loader topic TOPIC_NAME brokers 'BROKER[,BROKERn..]' duration DURATION_TIME_MS into table [DBNAME.]TABLENAME;
| 参数名 | 描述 |
|---|---|
| CONSUMER_NAME | 自定义消费者名字 |
| TOPIC_NAME | Kafka主题 |
| BROKER | Kafka的IP,如果有多个,用逗号分割 |
| DURATION_TIME_MS | 消费Kafka数据间隔,单位是毫秒。建议30-300秒以上。 |
| DBNAME | 数据库名字 |
| TABLENAME | 数据表名字 |
gbase> create kafka consumer kc_gbase8a loader topic moon brokers 'czg0:9092' duration 3000 into table czg.sunczg;
Query OK, 0 rows affected (Elapsed: 00:00:03.33)
create consumer done.
后续有解决办法再分享出来。
gbase> start kafka consumer kc_gbase8a;
ERROR 1707 (HY000): gcluster command error: leader thread not started.