Kafka包括五个核心api:
Producer API 允许应用程序将数据流发送到 Kafka 集群中的主题。【生产者】Consumer API 允许应用程序从 Kafka 集群中的主题中读取数据流。【消费者】Streams API 允许将数据流从输入主题转换为输出主题。【计算引擎】Connect API 允许实现连接器,这些连接器不断地从某个源系统或应用程序拉入 Kafka,或从 Kafka 推送到某个接收器系统或应用程序。【source与sink】Admin API 允许管理和检查主题、代理和其他 Kafka 对象。Java 客户端接口文档:https://kafka.apache.org/32/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
非 Java 客户端接口文档:https://cwiki.apache.org/confluence/display/KAFKA/Clients
使用java kafka需引入依赖:
<!-- Kafka 客户端库。包含内置的序列化器/反序列化器 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
<!-- Kafka Streams 的基础库 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.2.0</version>
</dependency>
<!-- 用于 Scala 库的 Kafka Streams DSL,用于编写 Scala Kafka Streams 应用程序。不使用 SBT 时,您需要在工件 ID 后缀上您的应用程序使用的正确版本的 Scala ( _2.12, _2.13) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-scala_2.13</artifactId>
<version>3.2.0</version>
</dependency>
<!-- slf4j 依赖包 -->
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
slf4j-api、slf4j-log4j12、log4j三者之间的关系如下图所示:

关于配置可以参考我这篇文章:大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)
$ cd $KAFKA_HOME
# 启动zookeeper
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 启动zookeeper客户端验证
$ ./bin/zookeeper-shell.sh hadoop-node1:12181
# 启动kafka
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
# 查看kafka topic列表
$ ./bin/kafka-topics.sh --bootstrap-server hadoop-node1:19092 --list
# 停止服务
$ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
package bigdata.kafka.com;
import java.time.Duration;
import java.util.*;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.concurrent.ExecutionException;
public class KafkaTest001 {
public static final String TOPIC = "plaintext";
static Properties getBaseConfig() {
Properties props = new Properties();
// Kafka 服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
// 也可以使用下面方式定义配置
// props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
return props;
}
/**
* 创建topic
*/
public static void createTopic(){
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
//创建需要新建的topic
//1.topic名,2.分区数:1, 3.副本数:1
NewTopic newTopic = new NewTopic(TOPIC, 3,(short)3);
// CreateTopicsResult res = adminClient.createTopics(Arrays.asList("topic1", "topic3","topic3"));
CreateTopicsResult res = adminClient.createTopics(Collections.singletonList(newTopic));
//等待创建,成功不会有任何报错,如果创建失败和超时会报错。
try {
res.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* topic list
*/
public static void topicList() throws ExecutionException, InterruptedException {
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
// 查看所有topic列表
Set<String> topiclist = adminClient.listTopics().names().get();
System.out.println(topiclist);
//遍历数组中的元素
for (String topicname:topiclist){
System.out.println(topicname);
}
//查看Topic详情
DescribeTopicsResult describeTopics = adminClient.describeTopics(topiclist);
Map<String, TopicDescription> tdm = describeTopics.allTopicNames().get();
for (Map.Entry<String, TopicDescription> entry : tdm.entrySet()) {
System.out.println(entry.getKey()+"\t"+entry.getValue());
}
}
/**
* 删除topic
* @param topicName
*/
public static void deleteTopic(String topicName){
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
DeleteTopicsResult res = adminClient.deleteTopics(Collections.singleton(topicName));
try {
System.out.println(res.all().get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* 生产者
*/
public static void MyProducer() throws InterruptedException {
/**
* 1.创建链接参数
*/
final Properties props = getBaseConfig();
// ack:-1 需要 ISR 中所有的 Replica(副本)给予接收确认,速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 时并不能一定避免数据丢失。
props.put("acks", "-1");
// 消息重试次数
props.put("retries", 3);
// 设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。超过 1MB 将会报错。这里设置1M
props.put("max.request.size",1048576);
// 每一批次的消息大小,16kb
props.put("batch.size", 16384);
// 消息延迟发送时间,默认0,只有满足一批次才会发送。这里设置为1s,一个批次的消息不满足 16kb,也会发送
props.put("linger.ms", 1);
// 缓冲池大小,默认32M
props.put("buffer.memory", 33554432);
// key的序列化方式
props.put("key.serializer", StringSerializer.class);
// value的序列化方式
props.put("value.serializer", StringSerializer.class);
/**
* 2.创建生产者
*/
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
/**
* 3.调用 send 方法,发送消息(普通异步发送)
*/
/*for (int i = 0; i < 50; i++) {
// 异步发送 默认
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key" + i, "value" + i);
producer.send(record);
// 同步发送
record.send(record).get();
}*/
/**
* 3. 带回调函数的异步发送
*/
for (int i = 0; i < 5; i++) {
// 添加回调
producer.send(new ProducerRecord<>(TOPIC,
"key " + i, "value" + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
/**
* 4.关闭生产者
*/
producer.close();
}
/**
* 消费者
*/
public static void MyConsumer(){
// 1.创建链接参数
final Properties props = getBaseConfig();
// 制定consumer group
props.put("group.id", "test");
// 是否自动确认offset
props.put("enable.auto.commit", "false");
// 自动确认offset的时间间隔
// props.put("auto.commit.interval.ms", "1000");
// 使用消费者组管理时,调用poll()之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间量设置了一个上限。
// 如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给其他成员
// 默认值 300s
props.put("max.poll.interval.ms",300000);
// offset 重置,默认:latest,从最新的开始消费,这里设置为 earliest,重最早的提交的 offset 开始消费
props.put("auto.offset.reset","earliest");
// 拉取的消息的最大条数,默认 500
props.put("max.poll.records",500);
// 这个参数就是为消息的 key 做反序列化的 ;consumer 代码从 broker 端获取的任何消息都是宇节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式 。
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
// 定义consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅的topic, 可同时订阅多个
// consumer.subscribe(Arrays.asList("topic1", "topic3","topic3"));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
// 读取数据,读取超时时间为 10ms
Duration d = Duration.ofSeconds(10);
ConsumerRecords<String, String> records = consumer.poll(d);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 同步提交 offset
// consumer.commitSync();
// 异步提交 offset
consumer.commitAsync();
}
}
/**
* 主函数入口
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
// deleteTopic(TOPIC);
// createTopic();
// topicList();
MyProducer();
MyConsumer();
}
}

关于配置可以参考我这篇文章:大数据Hadoop之——Kafka鉴权认证(Kafka kerberos认证+kafka账号密码认证+CDH Kerberos认证)
$ cd $KAFKA_HOME
# 启动zookeeper
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 启动zookeeper客户端验证
$ ./bin/zookeeper-shell.sh hadoop-node1:12181
# 启动kafka
$ ./bin/kafka-server-start-sasl.sh -daemon config/server-sasl.properties
# 查看kafka topic列表
$ ./bin/kafka-topics-sasl.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/kerberos/client.properties
# 停止服务
$ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
package bigdata.kafka.com;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
public class KafkaTest002 {
public static final String TOPIC = "KafkaTest002";
static Properties getBaseConfig() {
Properties properties = System.getProperties();
String krb5_config = (String) properties.get("java.security.krb5.conf");
String login_config = (String) properties.get("java.security.auth.login.config");
System.setProperty("java.security.krb5.conf", krb5_config);
System.setProperty("java.security.auth.login.config", login_config);
Properties props = new Properties();
// Kafka 服务端的主机名和端口号
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
// 默认就是GSSAPI,kerberos
props.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka-server");
return props;
}
/**
* 创建topic
*/
public static void createTopic(){
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
//创建需要新建的topic
//1.topic名,2.分区数:1, 3.副本数:1
NewTopic newTopic = new NewTopic(TOPIC, 3,(short)3);
// CreateTopicsResult res = adminClient.createTopics(Arrays.asList("topic1", "topic3","topic3"));
CreateTopicsResult res = adminClient.createTopics(Collections.singletonList(newTopic));
//等待创建,成功不会有任何报错,如果创建失败和超时会报错。
try {
res.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* topic list
*/
public static void topicList() throws ExecutionException, InterruptedException {
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
// 查看所有topic列表
Set<String> topiclist = adminClient.listTopics().names().get();
System.out.println(topiclist);
//遍历数组中的元素
for (String topicname:topiclist){
System.out.println(topicname);
}
//查看Topic详情
DescribeTopicsResult describeTopics = adminClient.describeTopics(topiclist);
Map<String, TopicDescription> tdm = describeTopics.allTopicNames().get();
for (Map.Entry<String, TopicDescription> entry : tdm.entrySet()) {
System.out.println(entry.getKey()+"\t"+entry.getValue());
}
}
/**
* 删除topic
* @param topicName
*/
public static void deleteTopic(String topicName){
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
DeleteTopicsResult res = adminClient.deleteTopics(Collections.singleton(topicName));
try {
System.out.println(res.all().get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* 生产者
*/
public static void MyProducer(String[] args) throws InterruptedException {
/**
* 1.创建链接参数
*/
final Properties props = getBaseConfig();
// ack:-1 需要 ISR 中所有的 Replica(副本)给予接收确认,速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 时并不能一定避免数据丢失。
props.put("acks", "-1");
// 消息重试次数
props.put("retries", 3);
// 设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。超过 1MB 将会报错。这里设置1M
props.put("max.request.size",1048576);
// 每一批次的消息大小,16kb
props.put("batch.size", 16384);
// 消息延迟发送时间,默认0,只有满足一批次才会发送。这里设置为1s,一个批次的消息不满足 16kb,也会发送
props.put("linger.ms", 1);
// 缓冲池大小,默认32M
props.put("buffer.memory", 33554432);
// key的序列化方式
props.put("key.serializer", StringSerializer.class);
// value的序列化方式
props.put("value.serializer", StringSerializer.class);
/**
* 2.创建生产者
*/
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
/**
* 3.调用 send 方法,发送消息(普通异步发送)
*/
/*for (int i = 0; i < 50; i++) {
// 异步发送 默认
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key" + i, "value" + i);
producer.send(record);
// 同步发送
record.send(record).get();
}*/
/**
* 3. 带回调函数的异步发送
*/
for (int i = 0; i < 5; i++) {
// 添加回调
producer.send(new ProducerRecord<>(TOPIC,
"key " + i, "value" + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
/**
* 4.关闭生产者
*/
producer.close();
}
/**
* 消费者
*/
public static void MyConsumer(String[] args){
// 1.创建链接参数
final Properties props = getBaseConfig();
// 制定consumer group
props.put("group.id", "test");
// 是否自动确认offset
props.put("enable.auto.commit", "false");
// 自动确认offset的时间间隔
// props.put("auto.commit.interval.ms", "1000");
// 使用消费者组管理时,调用poll()之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间量设置了一个上限。
// 如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给其他成员
// 默认值 300s
props.put("max.poll.interval.ms",300000);
// offset 重置,默认:latest,从最新的开始消费,这里设置为 earliest,重最早的提交的 offset 开始消费
props.put("auto.offset.reset","earliest");
// 拉取的消息的最大条数,默认 500
props.put("max.poll.records",500);
// 这个参数就是为消息的 key 做反序列化的 ;consumer 代码从 broker 端获取的任何消息都是宇节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式 。
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
// 定义consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅的topic, 可同时订阅多个
// consumer.subscribe(Arrays.asList("topic1", "topic3","topic3"));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
// 读取数据,读取超时时间为 10ms
Duration d = Duration.ofSeconds(10);
ConsumerRecords<String, String> records = consumer.poll(d);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 同步提交 offset
// consumer.commitSync();
// 异步提交 offset
consumer.commitAsync();
}
}
/**
* 主函数入口
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
try {
Properties properties = System.getProperties();
String action = (String) properties.get("action");
System.out.println(action);
for (String arg : args) {
System.out.println("main方法参数为:" + arg);
}
if ( action.equals("deleteTopic") ) {
deleteTopic(TOPIC);
}
if ( action.equals("createTopic") ) {
createTopic();
}
if ( action.equals("topicList") ) {
topicList();
}
if ( action.equals("deleteTopic") ) {
deleteTopic(TOPIC);
}
if ( action.equals("Producer_Consumer") ) {
MyProducer(args);
MyConsumer(args);
}
}catch (Exception e) {
e.printStackTrace();
System.out.println("please input args:[-Daction=deleteTopic,-Daction=createTopic,-Daction=topicList,,-Daction=Producer_Consumer]");
}
}
}
Kerberos认证比较特殊,最好在Linux机器上测试,因为krb5.conf引用的文件计较多。这里我也是先打包成jar,放在Linux机器测试。IDEA打包成jar包如下所示:

【温馨提示】Directory for META-INF/MANIFEST.MF选择resources路径,否则运行jar包时会抛
no main manifest attribute,in xxx.jar。



执行
$ java -jar -Daction=topicList -Djava.security.krb5.conf=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/config/kerberos/kafka-client-jaas.conf kafka.jar bigdata.kafka.com.KafkaTest002
【温馨提示】参数必须放在-jar后面

$ cd $KAFKA_HOME
# 启动zookeeper
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 启动zookeeper客户端验证
$ ./bin/zookeeper-shell.sh hadoop-node1:12181
# 启动kafka
$ ./bin/kafka-server-start-pwd.sh -daemon config/server-pwd.properties
# 查看kafka topic列表
$ ./bin/kafka-topics-pwd.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/userpwd/client.properties
# 停止服务
$ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
package bigdata.kafka.com;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.config.SaslConfigs;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class KafkaTest003 {
public static final String TOPIC = "KafkaTest003";
static Properties getBaseConfig() {
Properties props = new Properties();
// Kafka 服务端的主机名和端口号
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "hadoop-node1:19092,hadoop-node2:19092,hadoop-node3:19092");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka-server");
String saslJaasConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required \nusername=\"%s\" \npassword=\"%s\";", "kafka", "123456");
props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
return props;
}
/**
* 创建topic
*/
public static void createTopic(){
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
//创建需要新建的topic
//1.topic名,2.分区数:1, 3.副本数:1
NewTopic newTopic = new NewTopic(TOPIC, 3,(short)3);
// CreateTopicsResult res = adminClient.createTopics(Arrays.asList("topic1", "topic3","topic3"));
CreateTopicsResult res = adminClient.createTopics(Collections.singletonList(newTopic));
//等待创建,成功不会有任何报错,如果创建失败和超时会报错。
try {
res.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* topic list
*/
public static void topicList() throws ExecutionException, InterruptedException {
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
// 查看所有topic列表
Set<String> topiclist = adminClient.listTopics().names().get();
System.out.println(topiclist);
//遍历数组中的元素
for (String topicname:topiclist){
System.out.println(topicname);
}
//查看Topic详情
DescribeTopicsResult describeTopics = adminClient.describeTopics(topiclist);
Map<String, TopicDescription> tdm = describeTopics.allTopicNames().get();
for (Map.Entry<String, TopicDescription> entry : tdm.entrySet()) {
System.out.println(entry.getKey()+"\t"+entry.getValue());
}
}
/**
* 删除topic
* @param topicName
*/
public static void deleteTopic(String topicName){
final Properties props = getBaseConfig();
KafkaAdminClient adminClient= (KafkaAdminClient) KafkaAdminClient.create(props);
DeleteTopicsResult res = adminClient.deleteTopics(Collections.singleton(topicName));
try {
System.out.println(res.all().get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* 生产者
*/
public static void MyProducer() throws InterruptedException {
/**
* 1.创建链接参数
*/
final Properties props = getBaseConfig();
// ack:-1 需要 ISR 中所有的 Replica(副本)给予接收确认,速度最慢,安全性最高,但是由于 ISR 可能会缩小到仅包含一个 Replica,所以设置参数为 all 时并不能一定避免数据丢失。
props.put("acks", "-1");
// 消息重试次数
props.put("retries", 3);
// 设置请求的数据的最大字节数,为了防止发生较大的数据包影响到吞吐量,默认值为1MB。超过 1MB 将会报错。这里设置1M
props.put("max.request.size",1048576);
// 每一批次的消息大小,16kb
props.put("batch.size", 16384);
// 消息延迟发送时间,默认0,只有满足一批次才会发送。这里设置为1s,一个批次的消息不满足 16kb,也会发送
props.put("linger.ms", 1);
// 缓冲池大小,默认32M
props.put("buffer.memory", 33554432);
// key的序列化方式
props.put("key.serializer", StringSerializer.class);
// value的序列化方式
props.put("value.serializer", StringSerializer.class);
/**
* 2.创建生产者
*/
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
/**
* 3.调用 send 方法,发送消息(普通异步发送)
*/
/*for (int i = 0; i < 50; i++) {
// 异步发送 默认
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "key" + i, "value" + i);
producer.send(record);
// 同步发送
record.send(record).get();
}*/
/**
* 3. 带回调函数的异步发送
*/
for (int i = 0; i < 5; i++) {
// 添加回调
producer.send(new ProducerRecord<>(TOPIC,
"key " + i, "value" + i), new Callback() {
// 该方法在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata,
Exception exception) {
if (exception == null) {
// 没有异常,输出信息到控制台
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition());
} else {
// 出现异常打印
exception.printStackTrace();
}
}
});
// 延迟一会会看到数据发往不同分区
Thread.sleep(2);
}
/**
* 4.关闭生产者
*/
producer.close();
}
/**
* 消费者
*/
public static void MyConsumer(){
// 1.创建链接参数
final Properties props = getBaseConfig();
// 制定consumer group
props.put("group.id", "test");
// 是否自动确认offset
props.put("enable.auto.commit", "false");
// 自动确认offset的时间间隔
// props.put("auto.commit.interval.ms", "1000");
// 使用消费者组管理时,调用poll()之间的最大延迟。这对消费者在获取更多记录之前可以空闲的时间量设置了一个上限。
// 如果在此超时过期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给其他成员
// 默认值 300s
props.put("max.poll.interval.ms",300000);
// offset 重置,默认:latest,从最新的开始消费,这里设置为 earliest,重最早的提交的 offset 开始消费
props.put("auto.offset.reset","earliest");
// 拉取的消息的最大条数,默认 500
props.put("max.poll.records",500);
// 这个参数就是为消息的 key 做反序列化的 ;consumer 代码从 broker 端获取的任何消息都是宇节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式 。
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
// 定义consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅的topic, 可同时订阅多个
// consumer.subscribe(Arrays.asList("topic1", "topic3","topic3"));
consumer.subscribe(Collections.singletonList(TOPIC));
while (true) {
// 读取数据,读取超时时间为 10ms
Duration d = Duration.ofSeconds(10);
ConsumerRecords<String, String> records = consumer.poll(d);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// 同步提交 offset
// consumer.commitSync();
// 异步提交 offset
consumer.commitAsync();
}
}
/**
* 主函数入口
* @param args
* @throws ExecutionException
* @throws InterruptedException
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
// deleteTopic(TOPIC);
// createTopic();
// topicList();
MyProducer();
MyConsumer();
}
}

关于配置可以参考我这篇文章:大数据Hadoop之——Zookeeper鉴权认证(Kerberos认证+账号密码认证)
$ cd $KAFKA_HOME
# 启动zookeeper
$ ./bin/zookeeper-server-start-kerberos.sh -daemon ./config/zookeeper-kerberos.properties
# 启动zookeeper客户端验证
$ ./bin/zookeeper-shell-kerberos.sh hadoop-node1:12181
# 启动kafka
$ ./bin/kafka-server-start-zkcli-kerberos.sh -daemon ./config/server-zkcli-kerberos.properties
# 查看kafka topic列表
$ ./bin/kafka-topics-sasl.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/kerberos/client.properties
# 停止服务
$ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
$ cd $KAFKA_HOME
# 启动zookeeper
$ ./bin/zookeeper-server-start-userpwd.sh -daemon ./config/zookeeper-userpwd.properties
# 启动zookeeper客户端验证
$ ./bin/zookeeper-shell-userpwd.sh hadoop-node1:12181
# 启动kafka
$ ./bin/kafka-server-start-zkcli-userpwd.sh -daemon ./config/server-zkcli-userpwd.properties
# 查看kafka topic列表
$ ./bin/kafka-topics-pwd.sh --list --bootstrap-server hadoop-node1:19092 --command-config config/userpwd/client.properties
# 停止服务
$ cd $KAFKA_HOME; ./bin/kafka-server-stop.sh ; ./bin/zookeeper-server-stop.sh
同时开启认证的方式用的不多,其实代码类似,小伙伴感兴趣的话,可以根据我之前配置的把我上面的代码改一下应该就可以了,难度不大。有疑问的小伙伴欢迎给我留言哦!!!!