领导要求把mq切换为kakfa,今天做了下kafka的搭建和测试,在此做下记录。首先说明的是kafka是依赖zookeeper的,只不过kafka3.2版本应该是内置了zookeeper的配置,不需要单独搭建zookeeper。这一点在kafka启动的时候应该就体现了。
1.1 准备一个linux环境做服务,我用的是虚拟机。
1.2 需要安装jdk1.8+。下载jdk1.8,下载地址见后文,解压到/usr/local/目录下。然后vim /etc/profile,添加以下内容:
export JAVA_HOME=/usr/local/jdk1.8
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
export PATH=$PATH:${JAVA_PATH}
然后保存,执行source /etc/profile,使配置文件生效。
执行java -version测试jdk安装成功与否。
1.3 安装kafka。 下载kafka3.2,解压到/usr/local/。 需要有目录操作权限,如果没有执行chmod -R 777 /usr/local。
进入到解压后的kafka目录,进入config目录,修改server.properties,
listeners=PLAINTEXT://服务器ip:9092
advertised.host.name=服务器ip
然后保存。如果这里改,会造成外部访问服务器的时候访问不了,会自动跳转访问到127.0.0.1:9092。
用xshell打开一个终端窗口,进入到kafka根目录,执行:
bin/zookeeper-server-start.sh config/zookeeper.properties
然后再打开一个终端窗口,进入到kafka根目录,执行:
bin/kafka-server-start.sh config/server.properties
等待服务启动完毕即可(出现类似这样的就说明kafka服务启动成功了。 INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
)。
2.1 kafka要使用需要创建一个topic
进入到kafka根目录,然后执行
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 服务器ip:9092
2.2 执行生产者生产消息
进入到kafka根目录,然后执行
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 服务器ip:9092
执行完之后就可以输入内容,打回车,就完成了一个消息的发送。
2.3 执行消费者消费消息
进入到kafka根目录,然后执行
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 服务器ip:9092
因为执行的是--from-beginning,所以每次执行这个的,会打印所有消息。
3、java程序访问kafka
需要依赖:
- <dependency>
- <groupId>org.apache.kafkagroupId>
- <artifactId>kafka-clientsartifactId>
- <version>3.2.1version>
- dependency>
测试类
- class KafkaTestApplicationTests {
-
- private final static String TOPIC = "quickstart-events";
- private static ExecutorService consumerPool = new ThreadPoolExecutor(10, 10,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
(100), new ThreadPoolExecutor.CallerRunsPolicy()); - ThreadLocalRandom random = ThreadLocalRandom.current();
-
- /**
- * 生产者
- * @throws Exception
- */
- @Test
- void productor() throws Exception {
- Properties props = new Properties();
- props.put("bootstrap.servers", "192.168.163.138:9092");
- props.put("linger.ms", 1);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- KafkaProducer
producer = new KafkaProducer(props); - ExecutorService productorService = Executors.newFixedThreadPool(10);
-
- for (int i = 0; i < 200; i++){
- int finalI = i;
- productorService.execute(new Runnable() {
- @Override
- public void run() {
- producer.send(new ProducerRecord
(TOPIC, Integer.toString(finalI), Integer.toString(finalI))); - System.out.println("发送消息"+ finalI);
- }
- });
- }
- productorService.shutdown();
- while (!productorService.awaitTermination(5, TimeUnit.SECONDS)){}
- System.out.println("生产结束");
- producer.close();
- System.in.read();
- }
-
- /**
- * 自动提交模式的消费者
- * @throws Exception
- */
- @Test
- void automicConsumer() throws Exception {
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "192.168.163.138:9092");
- props.setProperty("group.id", "test");
- props.setProperty("enable.auto.commit", "true");
- props.setProperty("auto.commit.interval.ms", "1000");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer
consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(TOPIC));
- Semaphore semaphore = new Semaphore(10);
- while (true) {
- ConsumerRecords
records = consumer.poll(Duration.ofMillis(100)); - dealRecodes(records);
- if (records.count() == 0){
- System.out.println("当前已经没有消息消费了!");
- }
- }
- }
-
- private void dealRecodes(ConsumerRecords
records) { - for (ConsumerRecord
record : records){ - consumerPool.execute(() -> dealRecord(record));
- }
- }
-
- private void dealRecord(ConsumerRecord
record) { - try {
- Thread.sleep(random.nextInt(1000)*3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.printf("ThreadName = %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(), record.offset(), record.key(), record.value());
- }
-
- /**
- * 手动提交模式的消费者
- * @throws Exception
- */
- @Test
- void manualConsumer() throws Exception {
- Properties props = new Properties();
- props.setProperty("bootstrap.servers", "192.168.163.138:9092");
- props.setProperty("group.id", "test");
- props.setProperty("enable.auto.commit", "false");
- props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer
consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(TOPIC));
- final int minBatchSize = 200;
- List
> buffer = new ArrayList<>(); - while (true) {
- ConsumerRecords
records = consumer.poll(Duration.ofMillis(100)); - for (ConsumerRecord
record : records) { - buffer.add(record);
- }
- System.out.println("buffer.size:" + buffer.size());
- if (buffer.size() >= minBatchSize) {
- insertIntoDb(buffer);
- consumer.commitSync();
- buffer.clear();
- }
- }
- }
-
- public void insertIntoDb(List
> buffer) { - for (ConsumerRecord
record : buffer) - System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
附:
jdk1.8和kafka3.2的下载地址:jdk1.8+kafka3.2linux版本-Java文档类资源-CSDN下载
kafka官网文档:https://kafka.apache.org/documentation/#quickstart