• nowcoder----kafka极速入门体验


    1. 阻塞队列

    image-20220625165153394

    package com.zs.kafka;
    
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class BlockingQueueTests {
        public static void main(String[] args) {
            BlockingQueue queue = new ArrayBlockingQueue(10);
            new Thread(new Producer(queue)).start();
            new Thread(new Consumer(queue)).start();
            new Thread(new Consumer(queue)).start();
            new Thread(new Consumer(queue)).start();
        }
    }
    
    
    class Producer implements Runnable {
        private BlockingQueue<Integer> queue;
    
        public Producer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(20);
                    queue.put(i);
                    System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
    
    class Consumer implements Runnable {
    
        private BlockingQueue<Integer> queue;
    
        public Consumer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                while (true) {
                    Thread.sleep(new Random().nextInt(200));
                    queue.take();
                    System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    Thread-0生产:1
    Thread-2消费:0
    Thread-0生产:1
    Thread-0生产:2
    Thread-3消费:1
    Thread-0生产:2
    Thread-1消费:1
    Thread-0生产:2
    Thread-2消费:1
    Thread-3消费:0
    Thread-0生产:1
    Thread-0生产:2
    Thread-1消费:1
    Thread-0生产:2
    Thread-2消费:1
    Thread-0生产:2
    Thread-0生产:3
    Thread-0生产:4
    Thread-3消费:3
    Thread-0生产:4
    Thread-2消费:3
    Thread-0生产:4
    Thread-1消费:3
    Thread-0生产:4
    Thread-0生产:5
    Thread-2消费:4
    Thread-0生产:5
    Thread-1消费:4
    Thread-3消费:3
    Thread-0生产:4
    Thread-1消费:3
    Thread-3消费:3
    Thread-1消费:2
    Thread-0生产:3
    Thread-0生产:3
    Thread-3消费:2
    Thread-1消费:1
    Thread-0生产:2
    Thread-3消费:1
    Thread-0生产:2
    Thread-0生产:3
    Thread-0生产:4
    Thread-0生产:5
    Thread-2消费:4
    Thread-1消费:3
    Thread-0生产:4
    Thread-2消费:3
    Thread-3消费:2
    Thread-1消费:1
    Thread-0生产:2
    Thread-2消费:1
    Thread-1消费:1
    Thread-0生产:1
    Thread-3消费:1
    Thread-0生产:1
    Thread-0生产:2
    Thread-0生产:3
    Thread-0生产:4
    Thread-1消费:3
    Thread-0生产:4
    Thread-2消费:3
    Thread-0生产:4
    Thread-3消费:3
    Thread-0生产:4
    Thread-2消费:3
    Thread-0生产:4
    Thread-0生产:5
    Thread-1消费:4
    Thread-0生产:5
    Thread-2消费:4
    Thread-0生产:5
    Thread-0生产:6
    Thread-3消费:5
    Thread-0生产:6
    Thread-0生产:7
    Thread-3消费:6
    Thread-0生产:7
    Thread-0生产:8
    Thread-0生产:9
    Thread-1消费:8
    Thread-0生产:9
    Thread-2消费:8
    Thread-0生产:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-3消费:8
    Thread-0生产:9
    Thread-1消费:8
    Thread-0生产:9
    Thread-2消费:8
    Thread-1消费:7
    Thread-3消费:6
    Thread-0生产:7
    Thread-3消费:6
    Thread-0生产:7
    Thread-0生产:8
    Thread-0生产:9
    Thread-3消费:8
    Thread-0生产:9
    Thread-2消费:8
    Thread-0生产:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-3消费:8
    Thread-0生产:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-2消费:8
    Thread-0生产:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-2消费:9
    Thread-0生产:10
    Thread-3消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-0生产:10
    Thread-1消费:9
    Thread-2消费:8
    Thread-0生产:9
    Thread-3消费:8
    Thread-2消费:7
    Thread-0生产:8
    Thread-0生产:9
    Thread-1消费:8
    Thread-0生产:9
    Thread-2消费:8
    Thread-1消费:7
    Thread-3消费:6
    Thread-2消费:5
    Thread-3消费:4
    Thread-1消费:3
    Thread-1消费:2
    Thread-2消费:1
    Thread-3消费:0

    2. kafka

    image-20220625172206424

    消息会被存储到硬盘上,所以可以处理海量数据,但是读取硬盘时是采用顺序读取,高于对内存的随机读取

    分布式的集群

    Broker:kafka服务器

    Zookeeper:单独的软件

    Topic:消息主题

    Partition:分区,对主题位置的分区

    Offset:分区中的索引

    Leader Replica:主副本

    Follower Replica:从副本

    2.1 下载

    kafka不分操作系统

    image-20220625173653670

    image-20220625173718714

    2.2 安装

    **1)**解压

    **2)**配置zookeeper

    在config/zookeeper.properties

    # the directory where the snapshot is stored.
    dataDir=/Users/miyufeng/Downloads/kafka/datas/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0
    # Disable the adminserver by default to avoid port conflicts.
    # Set the port to something non-conflicting if choosing to enable this
    admin.enableServer=false
    # admin.serverPort=8080
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    **3)**配置kafka服务配置

    #broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=0
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的线程数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/Users/miyufeng/Downloads/kafka/datas

    **4)**启动zookeeper

    sh bin/zookeeper-server-start.sh config/zookeeper.properties
    
    • 1

    **5)**启动kafka

    sh bin/kafka-server-start.sh config/server.properties
    
    • 1

    **6)**指令了解

    (1)创建主题

    sh kafka-topic.sh --create --bootstrapt-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    • 1

    ​ (2) 查询主题

    sh kafka-topic.sh --list --bootstrap-server localhost:9092
    
    • 1

    ​ (3) 生产数据

    sh kafka-console-producer.sh --broker-list localhost:9092 --topic test
    
    • 1

    ​ (4) 消费数据

    sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    • 1

    **注意:**停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

    3. springboot简单使用

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=test-consumer-group
    #是否自动提交偏移量
    spring.kafka.consumer.enable-auto-commit=true
    #自动提交的频率
    spring.kafka.consumer.auto-commit-interval=3000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com.zs.kafka;
    
    import com.zs.MySpringApplication;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @ContextConfiguration(classes= MySpringApplication.class)
    public class test {
    
        @Autowired
        private KafkaProducer kafkaProducer;
    
        @Test
        public void test01() throws InterruptedException {
            kafkaProducer.sendMessage("test","你好");
            kafkaProducer.sendMessage("test","在吗");
    
            Thread.sleep(2000*10);
        }
    
    
    }
    
    
    @Component
    class KafkaProducer{
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        public void sendMessage(String topic,String content){
            kafkaTemplate.send(topic,content);
        }
    
    }
    
    
    @Component
    class KafkaConsumer{
        @KafkaListener(topics={"test"})
        public void handleMessage(ConsumerRecord record){
            System.out.println(record.value());
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
  • 相关阅读:
    当社恐成为技术面试官
    问题随记 —— Cannot create directory /tmp/hive. Name node is in safe mode.
    修身养性 - 阿纳托利: 健身指导
    Pro Git日常学习记录-Git基础-10.Git别名
    酒水行业的痛点以及解决方式
    企业电子招投标采购系统源码之电子招投标的组成
    ES-索引管理
    当LCC画龙时,新老车企分别在想什么?
    数据结构:链表
    LVS集群-NAT模式
  • 原文地址:https://blog.csdn.net/weixin_44235759/article/details/125462471