• Kafka核心原理


    一、kafka安装步骤

    (1)配置profile文件
    1. vim /etc/profile
    2. // KAFKA
    3. export KAFKA_HOME=/opt/soft/kafka212
    4. export PATH=$KAFKA_HOME/bin:$PATH
    5. source /etc/profile
    (2)创建kfkdata目录

    cd /opt/soft/kafka212/

    mkdir kfkdata

    (3)进入config目录配置server.properties文件
    1. // Kafka服务器id号
    2. 21 broker.id=0
    3. // 设置主机IP地址和端口号
    4. 36 advertised.listeners=PLAINTEXT://192.168.91.11:9092
    5. // 存储日志文件的目录
    6. 60 log.dirs=/opt/soft/kafka212/kfkdata
    7. 123 zookeeper.connect=192.168.91.11:2181
    8. // 设置连接zk的超时时间(毫秒)
    9. 126 zookeeper.connection.timeout.ms=18000
    10. // 主题启用
    11. 137 delete.topic.enable=true
    (4)进入config目录启动Kafka

    先启动zookeeper后启动Kafka

    1. // 先启动zookeeper
    2. zhServer.sh start
    3. // 后启动kafka
    4. nohup kafka-server-start.sh /opt/soft/kafka212/config/server.properties &

    二、kafka常用命令

    1. // partitions 分区 replication-factor 副本数
    2. // 创建主题
    3. kafka-topics.sh --create --zookeeper 192.168.91.128:2181 --topic kb23 --partitions 1 --replication-factor 1
    4. // 查看主题内容
    5. kafka-topics.sh --zookeeper 192.168.91.11:2181 --list
    6. // 生产者,生产消息
    7. kafka-console-producer.sh --topic kb23 --broker-list 192.168.78.131:9092
    8. // 消费者
    9. kafka-console-consumer.sh --bootstrap-server 192.168.78.131:9092 --topic bigdate --from-beginning
    10. // 查看队列详情
    11. kafka-topics.sh --zookeeper 192.168.91.128:2181 --describe --topic kb23
    12. // 查看队列消息数量
    13. kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.91.11:9092 --topic bigdate2
    14. // 删除topic
    15. kafka-topics.sh --zookeeper 192.168.91.11:2181 --delete --topic bigdate

    三、Kafka架构

    Topic:维护一个主题中的消息,可视为消息分类

    Producer:向Kafka主题发布(生产)消息

    Consumer:订阅(消费)主题并处理消息

    Broker:Kafka集群中的服务器

    四、Kafka

    (1)topic

    主题是已发布消息的类别名称

    发布和订阅数据必须指定主题

    主题副本数量不大于Brokers个数

    (2)partition

    一个主题包含多个分区,默认按Key Hash分区

    每个Partition对应一个文件夹-

    每个Partition被视为一个有序的日志文件(LogSegment)

    Replication策略是基于Partition,而不是Topic

    每个Partition都有一个Leader,0或多个Followers

    (3)Kafka Message header(消息头,固定长度)

    offset:唯一确定每条消息在分区内的位置

    CRC32:用crc32校验消息

    "magic":表示本次发布Kafka服务程序协议版本号

    "attributes":表示为独立版本、或标识压缩类型、或编码类型

    (4)Kafka Message body(消息体)

    key:表示消息键,可选

    value bytes payload:表示实际消息数据

    (5)Kafka Producer

    生产者将消息写入到Broker

    Producer直接发送消息到Broker上的Leader Partition

    Producer客户端自己控制着消息被推送到哪些Partition
            随机分配、自定义分区算法等

    Batch推送提高效率

    (6)Kafka Consumer
    ①消费者通过订阅消费消息

    offset的管理是基于消费组(group.id)的级别

    每个Partition只能由同一消费组内的一个Consumer来消费

    每个Consumer可以消费多个分区

    消费过的数据仍会保留在Kafka中

    消费者不能超过分区数量

    ②消费模式

    队列:所有消费者在一个消费组内

    发布/订阅:所有消费者被分配到不同的消费组

    (7)ZooKeeper在Kafka中的作用
    ①Broker注册并监控状态

    /brokers/ids

    ②Topic注册

    /brokers/topics

    ③生产者负载均衡

    每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更

    ④offset维护

    Kafka早期版本使用ZooKeeper为每个消费者存储offset,由于ZooKeeper写入性能较差,从0.10版本后,Kafka使用自己的内部主题维护offset

    (8)Kafka数据流
    ①副本同步 ISR(In-Sync Replica)
    ②容灾 Leader Partition
    ③高并发 读写性能 Consumer Group
    ④负载均衡

    (9)Kafka API
    ①Producer API
    ②Consumer API
    ③Streams API
    ④Connector API

    五、kafka生产消费者模式

    1. package nj.zb.kb23.kafka;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.ProducerConfig;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import org.apache.kafka.common.serialization.StringSerializer;
    6. import java.util.Properties;
    7. import java.util.Scanner;
    8. /*
    9. 生产者消费模式
    10. */
    11. public class MyProducer {
    12. public static void main(String[] args) {
    13. Properties properties = new Properties();
    14. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
    15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    17. /*
    18. 应答机制
    19. 0:不需要等待broker任何响应,无法确保数据正确送到broke中
    20. 1:只需要得到分区副本中leader的确认就OK,可能会丢失数据(极端情况下)
    21. -1:等到所有副本确认收到信息,响应时间最长,数据最安全,不会丢失数据,(极端情况下,可能会重复)
    22. */
    23. properties.put(ProducerConfig.ACKS_CONFIG,"-1");
    24. KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
    25. Scanner scanner = new Scanner(System.in);
    26. while (true){
    27. System.out.println("请输入内容:");
    28. String msg=scanner.nextLine();
    29. if (msg.equals("tt")) {
    30. break;
    31. }
    32. ProducerRecord<String, String> record = new ProducerRecord<>("bigdate",msg);
    33. producer.send(record);
    34. }
    35. }
    36. }

     

    1. // 创建表bigdate
    2. kafka-topics.sh --create --zookeeper 192.168.91.11:2181 --topic bigdate --partitions 1 --replication-factor 1
    3. // 查看列队信息
    4. kafka-console-consumer.sh --bootstrap-server 192.168.91.11:9092 --topic bigdate --from-beginning

    六、kafka生产者生产消息

    1. package nj.zb.kb23.kafka;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.ProducerConfig;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import org.apache.kafka.common.serialization.StringSerializer;
    6. import java.util.Properties;
    7. import java.util.concurrent.ExecutorService;
    8. import java.util.concurrent.Executors;
    9. /*
    10. 生产者生产消息
    11. */
    12. public class MyProducer2 {
    13. public static void main(String[] args) throws InterruptedException {
    14. Properties properties = new Properties();
    15. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
    16. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    17. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    18. /*
    19. * 开启重试,如果发送消息失败,默认是0,只发送一次,可以设置重置3次(共4次)
    20. * 每次重试的间隔是100ms,可以手动设置10000
    21. */
    22. properties.put(ProducerConfig.RETRIES_CONFIG,3);//properties.put(ProducerConfig."retries",3);
    23. properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,10000);//每个分区10000条信息
    24. /*
    25. BUFFER_MEMORY_CONFIG 缓存内存 默认值 32M
    26. BATCH_SIZE_CONFIG 批大小配置 默认值 16KB
    27. SEND_BUFFER_CONFIG 每次批量发送的值
    28. */
    29. properties.put(ProducerConfig.BATCH_SIZE_CONFIG,102400); //1024 byte kb
    30. properties.put(ProducerConfig.SEND_BUFFER_CONFIG,102400);
    31. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,67108864); //64M
    32. //应答机制:-1 all
    33. properties.put(ProducerConfig.ACKS_CONFIG,"-1");
    34. //多线程
    35. ExecutorService executorService = Executors.newCachedThreadPool();
    36. //模拟10个线程,同时向Kafka传递数据
    37. for (int i = 0; i < 10; i++) {
    38. Thread thread = new Thread(new Runnable() {
    39. @Override
    40. public void run() {
    41. //一个线程代表一个人
    42. KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
    43. String threadName = Thread.currentThread().getName();
    44. System.out.println(threadName);
    45. //每个线程传递1000条数据
    46. for (int j = 0; j < 10000; j++) {
    47. ProducerRecord<String, String> record = new ProducerRecord<>("bigdate1", threadName + " " + j);
    48. try {
    49. //让主机休息10毫微秒
    50. Thread.currentThread().sleep(10);
    51. } catch (Exception e) {
    52. e.printStackTrace();
    53. }
    54. producer.send(record);
    55. }
    56. }
    57. });
    58. executorService.execute(thread);
    59. }
    60. executorService.shutdown();
    61. while (true){
    62. //让主线程多休息10秒,主线程关闭,子线程没有跟上,所有数据缺失
    63. Thread.sleep(10000);
    64. if (executorService.isTerminated()){
    65. System.out.println("game over!");
    66. break;
    67. }
    68. }
    69. }
    70. }

    1. // 查看进度
    2. [root@kb23 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.91.11:9092 --topic bigdate1 --from-beginning
    3. pool-1-thread-4 9997
    4. pool-1-thread-9 9997
    5. pool-1-thread-1 9997
    6. pool-1-thread-8 9998
    7. pool-1-thread-4 9999
    8. pool-1-thread-7 9999
    9. pool-1-thread-3 9999
    10. pool-1-thread-5 9999
    11. pool-1-thread-2 9999
    12. ^CProcessed a total of 100000 messages
  • 相关阅读:
    c++ primer plus 笔记 第十六章 string类和标准模板库
    typeHandlers标签和plugins标签
    计算机网络复习-第三章数据链路层
    SRE方法论之拥抱风险
    Django CreateView视图
    C# OpenVINO Cls 图像分类
    应用场景不同,使用的“代码”也不同
    Java使用BouncyCastle进行基于ECDSA算法的椭圆曲线secp256r1证书自签名
    矩阵分析:特征值分解都在这里了
    LQ0272 矩形运算【计算几何】
  • 原文地址:https://blog.csdn.net/berbai/article/details/133041812