• kafka集成flink api编写教程


    1.引入依赖(pox.xml)

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.flink</groupId>
    4. <artifactId>flink-java</artifactId>
    5. <version>1.13.6</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.apache.flink</groupId>
    9. <artifactId>flink-streaming-java_2.12</artifactId>
    10. <version>1.13.6</version>
    11. </dependency>
    12. <dependency>
    13. <groupId>org.apache.flink</groupId>
    14. <artifactId>flink-clients_2.12</artifactId>
    15. <version>1.13.6</version>
    16. </dependency>
    17. <dependency>
    18. <groupId>org.apache.flink</groupId>
    19. <artifactId>flink-connector-kafka_2.12</artifactId>
    20. <version>1.13.6</version>
    21. </dependency>
    22. </dependencies>

    2.创建日志配置文件

    把$FLINK_HOME/conf/log4j.properties 内容复制粘贴过来

    1. # This affects logging for both user code and Flink
    2. rootLogger.level = INFO
    3. rootLogger.appenderRef.file.ref = MainAppender
    4. # Uncomment this if you want to _only_ change Flink's logging
    5. #logger.flink.name = org.apache.flink
    6. #logger.flink.level = INFO
    7. # The following lines keep the log level of common libraries/connectors on
    8. # log level INFO. The root logger does not override this. You have to manually
    9. # change the log levels here.
    10. logger.akka.name = akka
    11. logger.akka.level = INFO
    12. logger.kafka.name= org.apache.kafka
    13. logger.kafka.level = INFO
    14. logger.hadoop.name = org.apache.hadoop
    15. logger.hadoop.level = INFO
    16. logger.zookeeper.name = org.apache.zookeeper
    17. logger.zookeeper.level = INFO
    18. logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
    19. logger.shaded_zookeeper.level = INFO
    20. # Log all infos in the given file
    21. appender.main.name = MainAppender
    22. appender.main.type = RollingFile
    23. appender.main.append = true
    24. appender.main.fileName = ${sys:log.file}
    25. appender.main.filePattern = ${sys:log.file}.%i
    26. appender.main.layout.type = PatternLayout
    27. appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    28. appender.main.policies.type = Policies
    29. appender.main.policies.size.type = SizeBasedTriggeringPolicy
    30. appender.main.policies.size.size = 100MB
    31. appender.main.policies.startup.type = OnStartupTriggeringPolicy
    32. appender.main.strategy.type = DefaultRolloverStrategy
    33. appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
    34. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    35. logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    36. logger.netty.level = OFF

    3.flink生产者api

    1. package com.ljr.flink;
    2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    6. import org.apache.kafka.clients.producer.ProducerConfig;
    7. import java.util.ArrayList;
    8. import java.util.Properties;
    9. public class MyFlinkKafkaProducer {
    10. //输入main tab 键 即创建入main 方法
    11. public static void main(String[] args) throws Exception {
    12. //1.获取环境
    13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. //设置的槽数与分区相等
    15. env.setParallelism(3);
    16. //2.准备数据源
    17. ArrayList wordlist = new ArrayList<>();
    18. wordlist.add("zhangsan");
    19. wordlist.add("lisi");
    20. DataStreamSource stream = env.fromCollection(wordlist);
    21. //创建kafka生产者
    22. Properties pros = new Properties();
    23. pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
    24. FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer("customers", new SimpleStringSchema(), pros);
    25. //3.添加数据源
    26. stream.addSink(kafkaProducer);
    27. //4.执行代码
    28. env.execute();
    29. }
    30. }

    运行;kafka消费者消费结果

    4.flink消费者api

    1. package com.ljr.flink;
    2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    5. import org.apache.kafka.clients.consumer.ConsumerConfig;
    6. import java.util.Properties;
    7. public class MyFlinkKafkaConsumer {
    8. public static void main(String[] args) throws Exception {
    9. //1 初始化flink环境
    10. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    11. env.setParallelism(3);
    12. //2 创建消费者
    13. Properties pros = new Properties();
    14. pros.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
    15. //pros.put(ConsumerConfig.GROUP_ID_CONFIG,"hh")
    16. FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>("customers", new SimpleStringSchema(), pros);
    17. //3 关联消费者和flink流
    18. env.addSource(flinkKafkaConsumer).print();
    19. //4 执行
    20. env.execute();
    21. }
    22. }

    运行,用3中的生产者生产数据,消费结果

  • 相关阅读:
    29【定时器和延时器】
    Temporal Action Proposal Generation with Transformers TAPG transformer论文阅读笔记
    re:Invent 构建未来:云计算&生成式 AI 诞生科技新局面
    搜索与图论总结
    《重新定义团队》——Google如何工作
    宝塔Nginx配置反向代理后如何配置跨域?
    焊死,这38条命令还不会?难怪你的Windows那么费劲
    CSP-J2022普及组题解T2:解密
    总结Rabbitmq的六种模式
    vue中transition的使用
  • 原文地址:https://blog.csdn.net/v15220/article/details/139563134