简单实现,不涉及集群及更多扩展的kafka知识。
kafka官网:http://kafka.apache.org/
应用版本:kafka_2.12-3.0.0
前置环境:jdk1.8
文件目录:config/server.properties
修改项:
log.dirs=/tmp/kafka-logs → log.dirs=/data/kafka-logs
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.62.133:9092 #虚拟机ip
文件目录:config/zookeeper.properties
修改项:dataDir=/tmp/zookeeper → dataDir=/data/zookeeper
linux tmp文件夹默认有清除机制,所以数据文件不要放在tmp目录下
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties &
./kafka-server-start.sh -daemon ../config/server.properties &
启动顺序要注意,先启动zookeeper,后kafka。
[root@template ~]# jps
7793 Kafka
24361 QuorumPeerMain
21049 Jps
创建一个topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 8 --topic test
查看topic是否创建成功
./kaf-topics.sh --bootstrap-server localhost:9092 --list
测试发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello World
启动消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Hello World
<!--Kafka 依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
这里spring-kafka没有指定版本,使用 Spring Boot 时(并且您尚未使用 start.spring.io 创建项目),省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本
server.port=8077
spring.application.name=gas-kafka-sync
spring.kafka.bootstrap-servers=192.168.62.133:9092
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=default-group
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual_immediate
根据项目场景,一般是对接第三方数据。由于不保证第三方拥有直接对接kafka的能力,这里提供一个公共的消息发送接口。对方只需要传入topic和数据对象(Object)到DataDto即可。
package com.gsafety.bg.kafka.service;
import cn.hutool.json.JSONUtil;
import com.gsafety.bg.kafka.model.DataDto;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author Mr.wanter
* @time 2022-7-18 0018
* @description
*/
@Service
@AllArgsConstructor
@Slf4j
public class KafkaProducerImpl implements KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Override
public void sendMsg(DataDto dto) {
//1 构建消息数据
String msg = JSONUtil.toJsonStr(dto.getData());
//2.发送消息
ListenableFuture future = kafkaTemplate.send(dto.getTopic(), msg);
// 回调函数
future.addCallback(
new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable ex) {
log.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, dto.getTopic(), msg);
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
log.info("kafka sendMessage success topic = {}, data = {}", dto.getTopic(), msg);
}
});
// 未指定分区发送
// kafkaTemplate.send(TOPIC_NAME, msg);
// 指定分区发送
// kafkaTemplate.send(TOPIC_NAME, 0, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
}
}
消费接口根据业务场景,对固定topic下的数据进行解析入库。
package com.gsafety.bg.gas.service.impl;
import cn.hutool.json.JSONUtil;
import com.gsafety.bg.gas.dao.QuakeAlarmDao;
import com.gsafety.bg.gas.model.QuakeAlarmEntity;
import com.gsafety.bg.gas.model.dto.QuakeAlarmDto;
import com.gsafety.bg.gas.service.mapping.QuakeAlarmMapping;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.transaction.Transactional;
/**
* @author Mr.wanter
* @time 2022-7-15 0015
* @description
*/
@Component
@Slf4j
@AllArgsConstructor
@Transactional(rollbackOn = Exception.class)
public class KafkaConsumer {
private final QuakeAlarmDao dao;
private final QuakeAlarmMapping mapping;
//自动提交时Acknowledgment 参数会报错
@KafkaListener(id = "0", topics = "quake_alarm", groupId = "default-group")
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
String value = record.value();
QuakeAlarmDto quakeAlarmDto = JSONUtil.toBean(value, QuakeAlarmDto.class);
QuakeAlarmEntity entity = mapping.convertFrom(quakeAlarmDto);
QuakeAlarmEntity save = dao.save(entity);
log.info("kafka mq:{}", record.value());
log.info("entity:{}", save);
//消费成功后手动提交offset
ack.acknowledge();
} catch (Exception e) {
ack.acknowledge();
log.error("异常原因:{}", e.getCause());
//如果对方传入的json数据不符合约定的对象结构,那么这条数据视为垃圾数据,直接消费掉。但是如果此时数据库挂了导致数据没有存入数据库,但是kafka已经被消费怎么办?
}
}
}
检查服务器内存是否够用。
kafka没有对外暴漏ip地址
# 允许外部端口连接
listeners=PLAINTEXT://0.0.0.0:9092
# 外部代理地址
advertised.listeners=PLAINTEXT://192.168.62.133:9092
配置文件对应修改:
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual_immediate
参考:
https://blog.csdn.net/qq_41969358/article/details/123032806
https://www.cnblogs.com/along21/p/10278100.html
https://blog.csdn.net/qq_41432730/article/details/121924814
https://docs.spring.io/spring-kafka/reference/html/#getting-started
https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka