首先说明,本人之前没用过zookeeper、kafka等,尚硅谷十几个小时的教程实在没有耐心看,现在我也不知道分区、副本之类的概念。用kafka只是听说他比RabbitMQ快,我也是昨天晚上刚使用,下文中若有讲错的地方或者我的理解与它的本质有偏差的地方请包涵。
此文背景的环境是windows,linux流程也差不多。
1.取消 advertised.listeners 注释,修改kafka地址与端口。如果要集群部署,broker.id不能重复,1号机是0,2号机是1这样的。
2.修改 zookeeper.connect 为你上面zookeeper.properties中配置的地址
来到bin/windows,shift右键在此处打开cmd,输入 zookeeper-server-start.bat ../../config/zookeeper.properties
,等待其启动。(注意你config的路径不要写错)
启动完再开一个cmd,输入 kafka-server-start.bat ../../config/server.properties
。
如果在此环节出现问题,请查看logs中的日志,面向csdn。
我遇到的问题是 他显示什么什么路径太长了,问题的原因是我把他放桌面上了。
新建springboot项目,pom中添加依赖
org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.projectlombok lombok true com.alibaba fastjson 1.2.28配置application.yml,写启动类,controller,创建User类,创建consumer
application.yml
spring:
application:
name: kafka
kafka:
bootstrap-servers: localhost:9092 #这个是你server.properties中配置的
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-consumer-group #这个去config/consumer.properties中查看和修改
# 不过好像不一样也不影响?
server:
port: 8001
controller
@RestController
public class ProducerController {
?
@Autowired
KafkaTemplate kafka;
?
@RequestMapping("register")
public String register(User user) {
String message = JSON.toJSONString(user);
System.out.println("接收到用户信息:" + message);
kafka.send("register", message);
//kafka.send(String topic, @Nullable V data) {
return "OK";
}
}
user
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
?
private String id;
?
private String name;
?
private Integer age;
}
consumer
@Configuration
public class Consumer {
?
@KafkaListener(topics = "register")
public void consume(String message) {
System.out.println("接收到消息:" + message);
User user = JSON.parseObject(message, User.class);
System.out.println("正在为 " + user.getName() + " 办理注册业务...");
System.out.println("注册成功");
}
}
此时启动springboot,然而报错了
org.springframework.context.ApplicationContextException:
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is java.lang.IllegalStateException:
Topic(s) [register] is/are not present and missingTopicsFatal is true
为什么呢?
请检查zookeeper和kafka的cmd上有没有他们启动失败的消息,如果有就重新启动下,记得先开zookeeper再开kafka。
然后确认你的kafka上有没有"register"这个topic,此时我是没有的,而consumer又加了 @KafkaListener(topics = "register")
注解,于是他就启动失败了。
有两种解决方式:
1.比较傻X的方式,先将@KafkaListener注释掉,启动springboot后访问localhost:8001/register,他send的时候就会自行创建topic,再取消注释重新启动就可以了。
2.cmd方式:输入 kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic register
然后我们再启动,已经启动成功了。访问 localhost:8001/register?name=JamesBond&age=55
我们可以看到数据已经成功送到那里了。
然后来测试一下速度
@RequestMapping("test")
public String test() {
System.out.println("发送开始" + System.currentTimeMillis() % 10000);
for (int i = 0; i < 1000; i++) {
kafka.send("test", JSON.toJSONString(new User((1289312+i)+"",
"User" + i, (int)(Math.random() * 100), info)));
}
System.out.println("发送结束" + System.currentTimeMillis() % 10000);
return "OK";
}
@KafkaListener(topics = "test")
public void test(String message) {
System.out.println("--" + System.currentTimeMillis() % 10000 + "--");
}
console:
发送开始1267
--1384--
--1384--
...
--1715--
--1715--
发送结束1715
--1715--
--1715--
...
--1734--
对比RabbitMQ:
发送开始5692
--5766--
--5766--
...
--5973--
--5974--
发送结束5976
--5977--
--5977--
...
--6456--
kafka:
发送结束 - 发送开始=448ms
接收结束 - 接收开始=350ms
整体耗时: 467ms
rabbit:
发送结束 - 发送开始=284ms
接收结束 - 接收开始=690ms
整体耗时: 764ms
OK既然我会用了 我就去学一下kafka基本知识了
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦