• kafka初体验


    领导要求把mq切换为kakfa,今天做了下kafka的搭建和测试,在此做下记录。首先说明的是kafka是依赖zookeeper的,只不过kafka3.2版本应该是内置了zookeeper的配置,不需要单独搭建zookeeper。这一点在kafka启动的时候应该就体现了。

    1、搭建kafka环境。

    1.1 准备一个linux环境做服务,我用的是虚拟机。

    1.2 需要安装jdk1.8+。下载jdk1.8,下载地址见后文,解压到/usr/local/目录下。然后vim /etc/profile,添加以下内容:

    export JAVA_HOME=/usr/local/jdk1.8
    export JRE_HOME=${JAVA_HOME}/jre
    export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
    export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
    export PATH=$PATH:${JAVA_PATH}
    然后保存,执行source /etc/profile,使配置文件生效。

    执行java -version测试jdk安装成功与否。

    1.3 安装kafka。 下载kafka3.2,解压到/usr/local/。 需要有目录操作权限,如果没有执行chmod -R 777 /usr/local。

    进入到解压后的kafka目录,进入config目录,修改server.properties,

    listeners=PLAINTEXT://服务器ip:9092
    advertised.host.name=服务器ip

    然后保存。如果这里改,会造成外部访问服务器的时候访问不了,会自动跳转访问到127.0.0.1:9092。

    用xshell打开一个终端窗口,进入到kafka根目录,执行:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    然后再打开一个终端窗口,进入到kafka根目录,执行:

    bin/kafka-server-start.sh config/server.properties

    等待服务启动完毕即可(出现类似这样的就说明kafka服务启动成功了。 INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    )。

    2、kafka的测试。

    2.1 kafka要使用需要创建一个topic

    进入到kafka根目录,然后执行

    bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 服务器ip:9092

    2.2 执行生产者生产消息

    进入到kafka根目录,然后执行

    bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 服务器ip:9092

    执行完之后就可以输入内容,打回车,就完成了一个消息的发送。

    2.3 执行消费者消费消息

    进入到kafka根目录,然后执行 

    bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 服务器ip:9092
    因为执行的是--from-beginning,所以每次执行这个的,会打印所有消息。

    3、java程序访问kafka

    需要依赖:

    1. <dependency>
    2. <groupId>org.apache.kafkagroupId>
    3. <artifactId>kafka-clientsartifactId>
    4. <version>3.2.1version>
    5. dependency>

    测试类

    1. class KafkaTestApplicationTests {
    2. private final static String TOPIC = "quickstart-events";
    3. private static ExecutorService consumerPool = new ThreadPoolExecutor(10, 10,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue(100), new ThreadPoolExecutor.CallerRunsPolicy());
    6. ThreadLocalRandom random = ThreadLocalRandom.current();
    7. /**
    8. * 生产者
    9. * @throws Exception
    10. */
    11. @Test
    12. void productor() throws Exception {
    13. Properties props = new Properties();
    14. props.put("bootstrap.servers", "192.168.163.138:9092");
    15. props.put("linger.ms", 1);
    16. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    17. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    18. KafkaProducer producer = new KafkaProducer(props);
    19. ExecutorService productorService = Executors.newFixedThreadPool(10);
    20. for (int i = 0; i < 200; i++){
    21. int finalI = i;
    22. productorService.execute(new Runnable() {
    23. @Override
    24. public void run() {
    25. producer.send(new ProducerRecord(TOPIC, Integer.toString(finalI), Integer.toString(finalI)));
    26. System.out.println("发送消息"+ finalI);
    27. }
    28. });
    29. }
    30. productorService.shutdown();
    31. while (!productorService.awaitTermination(5, TimeUnit.SECONDS)){}
    32. System.out.println("生产结束");
    33. producer.close();
    34. System.in.read();
    35. }
    36. /**
    37. * 自动提交模式的消费者
    38. * @throws Exception
    39. */
    40. @Test
    41. void automicConsumer() throws Exception {
    42. Properties props = new Properties();
    43. props.setProperty("bootstrap.servers", "192.168.163.138:9092");
    44. props.setProperty("group.id", "test");
    45. props.setProperty("enable.auto.commit", "true");
    46. props.setProperty("auto.commit.interval.ms", "1000");
    47. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    48. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    49. KafkaConsumer consumer = new KafkaConsumer<>(props);
    50. consumer.subscribe(Arrays.asList(TOPIC));
    51. Semaphore semaphore = new Semaphore(10);
    52. while (true) {
    53. ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
    54. dealRecodes(records);
    55. if (records.count() == 0){
    56. System.out.println("当前已经没有消息消费了!");
    57. }
    58. }
    59. }
    60. private void dealRecodes(ConsumerRecords records) {
    61. for (ConsumerRecord record : records){
    62. consumerPool.execute(() -> dealRecord(record));
    63. }
    64. }
    65. private void dealRecord(ConsumerRecord record){
    66. try {
    67. Thread.sleep(random.nextInt(1000)*3);
    68. } catch (InterruptedException e) {
    69. e.printStackTrace();
    70. }
    71. System.out.printf("ThreadName = %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(), record.offset(), record.key(), record.value());
    72. }
    73. /**
    74. * 手动提交模式的消费者
    75. * @throws Exception
    76. */
    77. @Test
    78. void manualConsumer() throws Exception {
    79. Properties props = new Properties();
    80. props.setProperty("bootstrap.servers", "192.168.163.138:9092");
    81. props.setProperty("group.id", "test");
    82. props.setProperty("enable.auto.commit", "false");
    83. props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    84. props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    85. KafkaConsumer consumer = new KafkaConsumer<>(props);
    86. consumer.subscribe(Arrays.asList(TOPIC));
    87. final int minBatchSize = 200;
    88. List> buffer = new ArrayList<>();
    89. while (true) {
    90. ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
    91. for (ConsumerRecord record : records) {
    92. buffer.add(record);
    93. }
    94. System.out.println("buffer.size:" + buffer.size());
    95. if (buffer.size() >= minBatchSize) {
    96. insertIntoDb(buffer);
    97. consumer.commitSync();
    98. buffer.clear();
    99. }
    100. }
    101. }
    102. public void insertIntoDb(List> buffer){
    103. for (ConsumerRecord record : buffer)
    104. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    105. }
    106. }

    附:

    jdk1.8和kafka3.2的下载地址:jdk1.8+kafka3.2linux版本-Java文档类资源-CSDN下载

    kafka官网文档:https://kafka.apache.org/documentation/#quickstart

  • 相关阅读:
    CB利用链及无依赖打Shiro
    Spark SQL
    Zookeeper特性与节点数据类型详解
    一文详解Redis键过期策略,最全文档
    ASEMI肖特基二极管MBR20200CT参数及代换
    【数据结构】哈希桶
    微信查券小助手,淘宝客公众号查券返利机器人搭建教程
    难怪大家丢掉了postman而选择 Apifox
    前端面试题:html和css面试题
    Linux学习-67-日志服务器设置和日志分析工具(logwatch)安装及使用
  • 原文地址:https://blog.csdn.net/sdmanooo/article/details/126147785