Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。
Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。
安装步骤参考:P61 尚硅谷 kafka监控_MySQL环境准备
关闭 Kafka 集群
[atguigu@hadoop102 kafka]$ kf.sh stop
修改 /opt/module/kafka/bin/kafka-server-start.sh
[atguigu@hadoop102 kafka]$ vim bin/kafka-server-start.sh
修改如下参数值:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
为
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
初始内存只分配1G,如果要使用 Eagle 功能,我们可以将内存设置为 2G。
注意:修改之后在启动 Kafka 之前要分发至其他节点。
[atguigu@hadoop102 bin]$ xsync kafka-server-start.sh
上传压缩包 kafka-eagle-bin-2.0.8.tar.gz 到集群 /opt/software 目录
解压到本地
[atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
进入刚才解压的目录
[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ ll
总用量 79164
-rw-rw-r--. 1 atguigu atguigu 81062577 10 月 13 00:00 efak-web-2.0.8-bin.tar.gz
将 efak-web-2.0.8-bin.tar.gz 解压至 /opt/module
[atguigu@hadoop102 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/
修改名称
[atguigu@hadoop102 module]$ mv efak-web-2.0.8/ efak
修改配置文件/opt/module/efak/conf/system-config.properties
[atguigu@hadoop102 conf]$ vim system-config.properties
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.'instead
######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
######################################
# broker size online list
######################################
cluster1.efak.broker.size=20
######################################
# zk client thread limit
######################################
kafka.zk.limit.size=32
######################################
# EFAK webui port
######################################
efak.webui.port=8048
######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456
######################################
# kafka offset storage
######################################
# offset 保存在 kafka
cluster1.efak.offset.storage=kafka
######################################
# kafka jmx uri
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
######################################
# kafka metrics, 15 days by default
######################################
efak.metrics.charts=true
efak.metrics.retain=15
######################################
# kafka sql topic records max
######################################
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
######################################
# delete kafka topic token
######################################
efak.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramL
oginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
cluster2.efak.sasl.enable=false
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainL
oginModule required username="kafka" password="kafka-eagle";
cluster2.efak.sasl.client.id=
cluster2.efak.blacklist.topics=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=
######################################
# kafka ssl authenticate
######################################
cluster3.efak.ssl.enable=false
cluster3.efak.ssl.protocol=SSL
cluster3.efak.ssl.truststore.location=
cluster3.efak.ssl.truststore.password=
cluster3.efak.ssl.keystore.location=
cluster3.efak.ssl.keystore.password=
cluster3.efak.ssl.key.password=
cluster3.efak.ssl.endpoint.identification.algorithm=https
cluster3.efak.blacklist.topics=
cluster3.efak.ssl.cgroup.enable=false
cluster3.efak.ssl.cgroup.topics=
######################################
# kafka sqlite jdbc driver address
######################################
# 配置 mysql 连接
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=000000
######################################
# kafka mysql jdbc driver address
######################################
#efak.driver=com.mysql.cj.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#efak.username=root
#efak.password=123456
添加环境变量
[atguigu@hadoop102 conf]$ sudo vim /etc/profile.d/my_env.sh
# kafkaEFAK
export KE_HOME=/opt/module/efak
export PATH=$PATH:$KE_HOME/bin
注意:source /etc/profile
[atguigu@hadoop102 conf]$ source /etc/profile
启动
注意:启动之前需要先启动 zk 以及 kafka
[atguigu@hadoop102 kafka]$ kf.sh start
启动 efak
[atguigu@hadoop102 efak]$ bin/ke.sh start
Version 2.0.8 -- Copyright 2016-2021
*****************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.10.102:8048'
* Account:admin ,Password:123456
*****************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*****************************************************************
如果停止 efak,执行命令:
[atguigu@hadoop102 efak]$ bin/ke.sh stop

主面板

Brokers

Topics

Zookeepers

Consumers

大屏信息


左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。
右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个:





在 /home/atguigu/bin 目录下创建文件 kf2.sh 脚本文件
[atguigu@hadoop102 bin]$ vim kf2.sh
脚本如下:
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka2-------"
ssh $i "/opt/module/kafka2/bin/kafka-server-start.sh -daemon /opt/module/kafka2/config/kraft/server.properties"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka2-------"
ssh $i "/opt/module/kafka2/bin/kafka-server-stop.sh "
done
};;
esac
添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf2.sh
启动集群命令
[atguigu@hadoop102 ~]$ kf2.sh start
停止集群命令
[atguigu@hadoop102 ~]$ kf2.sh stop
Flume 是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。

启动 kafka 集群
[atguigu@hadoop102 ~]$ zk.sh start
[atguigu@hadoop102 ~]$ kf.sh start
启动 kafka 消费者
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
Flume 安装步骤
app.log 文件数据的变化
配置 Flume
在 hadoop102 节点的 Flume 的 job 目录下创建 file_to_kafka.conf
[atguigu@hadoop102 flume]$ mkdir jobs
[atguigu@hadoop102 flume]$ vim jobs/file_to_kafka.conf
配置文件内容如下:
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.* # 监控文件目录
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # offset文件 支持断点续传
# 3 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动 Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/file_to_kafka.conf &
向 /opt/module/applog/app.log 里追加数据,查看 kafka 消费者消费情况
[atguigu@hadoop102 module]$ mkdir applog
[atguigu@hadoop102 applog]$ echo hello >> /opt/module/applog/app.log
观察 kafka 消费者,能够看到消费的 hello 数据


配置 Flume
在 hadoop102 节点的 Flume 的 /opt/module/flume/jobs 目录下创建 kafka_to_file.conf
[atguigu@hadoop102 jobs]$ vim kafka_to_file.conf
配置文件内容如下:
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置sink
a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动 Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
并输入数据,例如:hello
观察控制台输出的日志

Flink是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。

创建一个 maven 项目 flink-kafka
添加配置文件
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.13.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.12artifactId>
<version>1.13.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.12artifactId>
<version>1.13.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.12artifactId>
<version>1.13.0version>
dependency>
dependencies>
将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
在 java 文件夹下创建包名为 com.atguigu.flink
在 com.atguigu.flink 包下创建 java 类:FlinkKafkaProducer1(系统也有一个 FlinkKafkaProducer,会重名,所以这里命名为 1)。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.ArrayList;
import java.util.Properties;
public class FlinkKafkaProducer1 {
public static void main(String[] args) throws Exception {
// 0 初始化flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); // 3个槽 对应kafka主题题的3个分区
// 1 准备数据源 读取集合中数据
ArrayList<String> wordsList = new ArrayList<>();
wordsList.add("hello");
wordsList.add("atguigu");
DataStream<String> stream = env.fromCollection(wordsList);
// 2 kafka生产者配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 3 创建kafka生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"first",
new SimpleStringSchema(), // 序列化和反序列化模板类 string类型
properties
);
// 4 生产者和flink流关联
stream.addSink(kafkaProducer);
// 5 执行
env.execute();
}
}
启动Kafka消费者
[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
执行 FlinkKafkaProducer1 程序,观察 kafka 消费者控制台情况

Q:
- 为什么先接收到 atguigu,然后才是 hello 呢?
A:
- 在 Flink 中,对于并行度大于 1 的情况,不同的算子实例是并行运行的,也就是说当你的
env.setParallelism(3)时,会有3个线程同时运行。在你的例子中,"hello"和"atguigu"可能由不同的线程处理,并且处理的顺序是不确定的。- 如果你希望严格按照顺序处理,你可以将并行度设置为
1,即env.setParallelism(1)。但是这样可能会影响处理速度。此外,Flink 也提供了一些方法来保证在并行处理时的顺序,可以查阅相关资料来了解更多。
在 com.atguigu.flink 包下创建 java 类:FlinkKafkaConsumer1
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class FlinkKafkaConsumer1 {
public static void main(String[] args) throws Exception {
// 0 初始化flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 1 kafka消费者配置信息
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// group.id可选,不配置不会报错
// 2 创建kafka消费者
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"first",
new SimpleStringSchema(),
properties
);
// 3 消费者和flink流关联
env.addSource(kafkaConsumer).print();
// 4 执行
env.execute();
}
}
启动 FlinkKafkaConsumer1 消费者
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
观察 IDEA 控制台数据打印

有 3 个消费者并行消费,因为只发了两条消息,所以这里只有 1 和 3。
SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以用于 SpringBoot 的消费者。

跟之前不太一样的是,外部数据是通过接口的方式发送到 SpringBoot 程序,然后 SpringBoot 接收到这个接口的数据,然后再发送到 kafka 集群。
在 IDEA 中安装 lombok 插件
在 Plugins 下搜索 lombok 然后在线安装即可,安装后注意重启

创建一个 Spring Initializr

注意:有时候SpringBoot官方脚手架不稳定,我们切换国内地址:https://start.aliyun.com
项目名称 springboot

添加项目依赖




检查自动生成的配置文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.6.1version>
<relativePath/>
parent>
<groupId>com.atguigugroupId>
<artifactId>springbootartifactId>
<version>0.0.1-SNAPSHOTversion>
<name>springbootname>
<description>Demo project for Spring Bootdescription>
<properties>
<java.version>1.8java.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafka-testartifactId>
<scope>testscope>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
exclude>
excludes>
configuration>
plugin>
plugins>
build>
project>
修改 SpringBoot 核心配置文件 application.propeties,添加生产者相关信息
# 应用名称
spring.application.name=atguigu_springboot_kafka
# 指定kafka的地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 指定key和value的序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
创建 controller 从浏览器接收数据,并写入指定的 topic
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
// Kafka模板用来向kafka发送数据
@Autowired
KafkaTemplate<String, String> kafka;
@RequestMapping("/atguigu")
public String data(String msg) {
kafka.send("first", msg);
return "ok";
}
}
在浏览器中给 /atguigu 接口发送数据
http://localhost:8080/atguigu?msg=hello
kafka 消费者接收到数据

修改 SpringBoot 核心配置文件 application.propeties
# =========消费者配置开始=========
# 指定kafka的地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 指定key和value的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 指定消费者组的group_id
spring.kafka.consumer.group-id=atguigu
# =========消费者配置结束=========
创建类消费 Kafka 中指定 topic 的数据
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConsumer {
// 指定要监听的topic
@KafkaListener(topics = "first")
public void consumeTopic(String msg) { // 参数: 收到的value
System.out.println("收到的信息: " + msg);
}
}
向 first 主题发送数据
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
> atguigu
SpringBoot 消费者接收到数据

Spark 是一个在大数据开发中非常常用的组件,可以用于 Kafka 的生产者,也可以用于 Kafka 的消费者。

Scala 环境准备
Spark 的底层源码是用 Scala 编写的。
创建一个 maven 项目 spark-kafka
在项目 spark-kafka 上点击右键,Add Framework Support => 勾选 scala
在 main 下创建 scala 文件夹,并右键 Mark Directory as Sources Root => 在 scala 下创建包名为 com.atguigu.spark
添加配置文件
<dependencies>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming-kafka-0-10_2.12artifactId>
<version>3.0.0version>
dependency>
dependencies>
将 log4j.properties 文件添加到 resources 里面,就能更改打印日志的级别为 error
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
在 com.atguigu.spark 包下创建 scala Object:SparkKafkaProducer

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object SparkKafkaProducer {
def main(args: Array[String]): Unit = {
// 0 kafka配置信息
val properties = new Properties()
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092")
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
// 1 创建kafka生产者
var producer = new KafkaProducer[String, String](properties)
// 2 发送数据
for (i <- 1 to 5) {
producer.send(new ProducerRecord[String, String]("first", "atguigu" + i))
}
// 3 关闭资源
producer.close()
}
}
启动 Kafka 消费者
[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
执行 SparkKafkaProducer 程序,观察 kafka 消费者控制台情况

添加配置文件
<dependencies>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming-kafka-0-10_2.12artifactId>
<version>3.0.0version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_2.12artifactId>
<version>3.0.0version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming_2.12artifactId>
<version>3.0.0version>
dependency>
dependencies>
在 com.atguigu.spark 包下创建 scala Object:SparkKafkaConsumer
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkKafkaConsumer {
def main(args: Array[String]): Unit = {
// 1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")
// 2.创建StreamingContext 初始化上下文环境
// Seconds(3):时间窗口,批处理间隔,表示每隔3秒钟,Spark Streaming就会收集一次数据进行处理。
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup"
)
// 4.读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc, // 上下文环境
LocationStrategies.PreferConsistent, // 数据存储位置 优先位置
ConsumerStrategies.Subscribe[String, String](Set("first"), kafkaPara) // 消费策略:(订阅多个主题,配置参数)
)
// 5.将每条消息的KV取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
// 6.计算WordCount
valueDStream.print()
// 7.开启任务 并阻塞(使程序一直执行)
ssc.start()
ssc.awaitTermination()
}
}
启动 SparkKafkaConsumer 消费者
启动 kafka 生产者
[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
观察IDEA控制台数据打印

笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)