• kafka(六):java API消费数据


    说明

    kafka消费消息记录。

    分享

    kafka消费实现

    maven

    <dependency>  
        <groupId>org.apache.kafkagroupId>  
        <artifactId>kafka-clientsartifactId>  
        <version>2.3.0version>  
    dependency>
    
    <dependency>
        <groupId>ch.qos.logbackgroupId>
        <artifactId>logback-classicartifactId>
        <version>1.2.11version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    连接

    • 连接并消费kafka
    private Logger log= LoggerFactory.getLogger(this.getClass());
    	
    	/**
    	 * kafka 连接地址
    	 */
    	public String bootstrapServer ="localhost:9092";
    	
    	/**
    	 * topic
    	 */
    	public String topic="test1";
    	
    	
    	public void kafkaDemo() {
    	       Properties properties = new Properties();
    	        //borker地址
    	        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
    	        //反序列化方式
    	        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    	        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
    	        
    	        //------------------消费组id必须指定----------------------------
    	        //指定消费者组id
    	        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
    	        
    	        //earliest:offset偏移至最早时候开始消费;latest:偏移到从最新开始消费(默认) earliest 从最早位置消费消息
    	        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");		
    	        
    	        //每批次最小拉取数据大小,默认1byte
    	        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,1);
    	        
    	        //每批次最大拉取数据大小,默认50M
    	        properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,50 * 1024 * 1024);
    	        
    	        //一批次数据,未达到最小数据大小时候,最大等待时间.默认500ms
    	        properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,500);
    	        
    	        //单次调用 poll() 返回的最大记录数,默认500
    	        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
    	        
    	        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
    	        
    	        //A. 设置订阅的topic 列表,从所有数据分区读取数据
    	        List<String> topicList = new ArrayList<>();
    	        topicList.add(topic);
    	        kafkaConsumer.subscribe(topicList);
    	        
    	        //B. 设置订阅topic,并指定分区
    //	        List topicPartitions = new ArrayList<>();
    //	        topicPartitions.add(new TopicPartition(topic,0));
    //	        kafkaConsumer.assign(topicPartitions);
    	        
    	        try {
    		        while (true){
    		            ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(2L));
    		            for (ConsumerRecord<String,String> record : records) {
    		                System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|数据:"+record.value());
    		            }
    		        }
    		        
    	        }catch(Exception e){
    	        	log.error(e.toString(),e);
    	        }finally {
    	        	kafkaConsumer.close();
    	        }
    	}
    	
    	
    	public static void main(String[] args) {
    		new KafkaCustomer().kafkaDemo();;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    说明

    • 注意设置消费位置,请按需选择
      • earliest 偏移至最早时候开始消费,每次软件启动会从最开始位置消费,会有重复数据
      • latest:偏移到从最新开始消费(默认),软件启动前kafka中数据不会被接收处理。
    • 设置Topic支持分区指定,默认消费所有分区。
    • 如需kerberos认证,参照 kafka(四):kafka javaAPI入库程序

    总结

    • 无论境遇如何,积累好技术,水滴石穿。
  • 相关阅读:
    【Node.JS 】服务器相关的概念
    buuctf web [极客大挑战 2019]Http
    内部错误: !scandr.cpp@815: eWasOpenForWrite
    thinkphp withJoin 模式下field 无效
    使用 PHP 和 MySQL 的安全注册系统
    维基百科是如何定义联合办公空间的?
    4G通信电子标签
    (未解决)执行git rebase提示:Current branch xxx is up to date.
    Java 环境变量配置
    C++数据结构题:DS 顺序表--连续操作
  • 原文地址:https://blog.csdn.net/qq_22973811/article/details/126891724