• flink: 从kafka读取数据


    一、添加相关依赖

    
    
        4.0.0
    
        org.example
        flink-proj
        1.0-SNAPSHOT
    
        
            8
            8
        
        
            
                org.apache.flink
                flink-java
                1.11.1
            
    
            
                org.apache.flink
                flink-streaming-java_2.11
                1.11.1
            
    
            
                org.apache.flink
                flink-clients_2.11
                1.11.1
            
    
            
                org.apache.flink
                flink-connector-kafka_2.12
                1.11.1
            
    
        
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    二、主类:

    package cn.edu.tju.demo;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    public class Test03 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment
                    .getExecutionEnvironment();
            Properties properties  = new Properties();
            properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
            properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");
    
            DataStreamSource myTopic = environment.addSource(
                    new FlinkKafkaConsumer("myTopic", new SimpleStringSchema(), properties));
            myTopic.print();
    
    
            environment.execute("my job");
    
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    kafka为2.x版本

  • 相关阅读:
    大家都说单测没啥用,这是真的吗?
    aspnetcore6.0源代码编译调试
    FFmpeg深入学习
    ASEMI整流桥KBPC2510,KBPC2510参数,KBPC2510规格书
    Java数据结构与Java算法学习Day02---算法排序
    华为欧拉系统安装
    Oracle管理表(创建、修改、删除)
    OSG第三方库编译之三十六:exiv2编译(Windows、Linux、Macos环境下编译)
    数据结构-二叉搜索树
    软著有什么好处
  • 原文地址:https://blog.csdn.net/amadeus_liu2/article/details/136449127