在上一节中我们安装了Kafka单机环境和集群环境,这一节来测试下Linux环境安装Kafka后的命令行操作。
我们之前在用Windows环境安装Kafka Kafka应用场景|基础架构|Windows安装|命令行操作 和命令行操作时,讲到主题命令行参数如下:
1. 创建主题
[root@localhost kafka-01]# bin/kafka-topics.sh --bootstrape-server localhost:9092 --create --topic test1 --partitions 3 --replication-factor 3
注意:这里之所以无法识别 --bootstrape-server 参数是因为kafka的版本低于2.2,我安装的kafka版本为kafka_2.12-2.2.1.tgz,应该使用 --zookeeper localhost:2181参数:
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 3
–zookeeper:指定了Kafka所连接的Zookeeper服务地址
–topic:指定了所要创建主题的名称
–partitions:指定了分区个数
–replication-factor:指定了副本因子
–create:创建主题的动作指令
2. 查看主题详情
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
3. 查看所有主题
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
[root@localhost kafka-01]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
–bootstrap-server 指定了连接Kafka集群的地址
–topic 指定了消费端订阅的主题
[root@localhost kafka-01]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
–broker-list 指定了连接的Kafka集群的地址
–topic 指定了发送消息时的主题
生产者发送消息:
消费者接收消息:
① 创建kafka项目并引入依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.2.4.RELEASEversion>
<relativePath/>
parent>
<groupId>com.hhgroupId>
<artifactId>kafkaartifactId>
<version>1.0-SNAPSHOTversion>
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
properties>
<dependencies>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version>3.0.0version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>cn.hutoolgroupId>
<artifactId>hutool-allartifactId>
<version>5.7.20version>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.83version>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
dependencies>
project>
② kafka生产者发送消息:
public class CustomProducer01 {
public static void main(String[] args) {
// kafka生产者属性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.22:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// kafka生产者发送消息,默认是异步发送方式
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "hello,kafka");
try{
// 发送消息
kafkaProducer.send(producerRecord);
}catch (Exception e){
e.printStackTrace();
}
// 关闭资源
kafkaProducer.close();
}
}
③ 查看kafka消费者有没有消费消息:
④ kafka消费者消费消息:
查看kafka安装目录config/consumer.properties文件中的group.id:
public class CustomConsumer01 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.38.22:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("test");
consumer.subscribe(topics);
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
⑤ 启动消费者程序后,再启动生产者程序发送消息,查看消费者控制台: