• Flink 流处理API


    目录

     

    一、环境

    1.1getExecutionEnvironment

    1.2createLocalEnvironment

    1.3createRemoteEnvironment

    二、从集合中读取数据

    三、从文件中读取数据

    四、从KafKa中读取数据

    1.导入依赖

    2.启动KafKa

    3.java代码


     

    一、环境

    1.1getExecutionEnvironment

    创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

    1. #批处理环境
    2. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    3. #流处理环境
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    设置并行度:如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认为1

    1. //设置并行度为8
    2. env.setParallelism(8);

    1.2createLocalEnvironment

    返回本地执行环境,需要在调用时指定默认的并行度

    LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); 

    1.3createRemoteEnvironment

    返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("IP",端口号,jar包路径)

    二、从集合中读取数据

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. import java.util.Arrays;
    4. /**
    5. * @author : Ashiamd email: ashiamd@foxmail.com
    6. * @date : 2021/1/31 5:13 PM
    7. * 测试Flink从集合中获取数据
    8. */
    9. public class SourceTest1_Collection {
    10. public static void main(String[] args) throws Exception {
    11. // 创建执行环境
    12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    13. // 设置env并行度1,使得整个任务抢占同一个线程执行
    14. env.setParallelism(1);
    15. // Source: 从集合Collection中获取数据
    16. DataStream dataStream = env.fromCollection(
    17. Arrays.asList(
    18. new SensorReading("sensor_1", 1547718199L, 35.8),
    19. new SensorReading("sensor_6", 1547718201L, 15.4),
    20. new SensorReading("sensor_7", 1547718202L, 6.7),
    21. new SensorReading("sensor_10", 1547718205L, 38.1)
    22. )
    23. );
    24. DataStream intStream = env.fromElements(1,2,3,4,5,6,7,8,9);
    25. // 打印输出
    26. dataStream.print("SENSOR");
    27. intStream.print("INT");
    28. // 执行
    29. env.execute("JobName");
    30. }
    31. }

    三、从文件中读取数据

    文件由自己创建一个txt文件

    1. import org.apache.flink.streaming.api.datastream.DataStream;
    2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    3. /**
    4. * @author : Ashiamd email: ashiamd@foxmail.com
    5. * @date : 2021/1/31 5:26 PM
    6. * Flink从文件中获取数据
    7. */
    8. public class SourceTest2_File {
    9. public static void main(String[] args) throws Exception {
    10. // 创建执行环境
    11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    12. // 使得任务抢占同一个线程
    13. env.setParallelism(1);
    14. // 从文件中获取数据输出
    15. DataStream dataStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt");
    16. dataStream.print();
    17. env.execute();
    18. }
    19. }

    四、从KafKa中读取数据

    1.导入依赖

    1. junit
    2. junit
    3. 4.11
    4. test
    5. org.apache.flink
    6. flink-java
    7. 1.10.1
    8. org.apache.flink
    9. flink-streaming-java_2.12
    10. 1.10.1
    11. org.apache.flink
    12. flink-clients_2.12
    13. 1.10.1
    14. org.apache.flink
    15. flink-connector-kafka_2.11
    16. 1.12.1

    2.启动KafKa

    启动Zookeeper

    ./bin/zookeeper-server-start.sh [config/zookeeper.properties]

    启动KafKa服务

    ./bin/kafka-server-start.sh -daemon ./config/server.properties

    启动KafKa生产者

    ./bin/kafka-console-producer.sh --broker-list localhost:9092  --topic sensor

    3.java代码

    1. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    2. import org.apache.flink.streaming.api.datastream.DataStream;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    5. import java.util.Properties;
    6. /**
    7. * @author : Ashiamd email: ashiamd@foxmail.com
    8. * @date : 2021/1/31 5:44 PM
    9. */
    10. public class SourceTest3_Kafka {
    11. public static void main(String[] args) throws Exception {
    12. // 创建执行环境
    13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. // 设置并行度1
    15. env.setParallelism(1);
    16. Properties properties = new Properties();
    17. //监听的kafka端口
    18. properties.setProperty("bootstrap.servers", "localhost:9092");
    19. // 下面这些次要参数
    20. properties.setProperty("group.id", "consumer-group");
    21. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    22. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    23. properties.setProperty("auto.offset.reset", "latest");
    24. // flink添加外部数据源
    25. DataStream dataStream = env.addSource(new FlinkKafkaConsumer("sensor", new SimpleStringSchema(),properties));
    26. // 打印输出
    27. dataStream.print();
    28. env.execute();
    29. }
    30. }

     

     

  • 相关阅读:
    简单的人脸识别实战
    超详细的MySQL三万字总结
    HDLbits: Fsm serial
    基于PHP+MySQL的物流配送管理系统平台
    新零售SaaS架构:组织管理的底层逻辑与架构设计
    CSS 伪类选择器<a> 的四个伪类选择器
    2.ClickHouse系列之特点介绍
    C++&QT day8
    【JQuery插件】手把手教你如何白瓢一个网站的全部付费资源!前端狂喜
    通信接口五种主要的类型是什么?RS-232、485、CAN、USB
  • 原文地址:https://blog.csdn.net/weixin_64443786/article/details/130792753