• Flink学习之旅:(二)构建Flink demo工程并提交到集群执行


    1.创建Maven工程

            在idea中创建一个 名为 MyFlinkFirst 工程

    2.配置pom.xml

    1. <properties>
    2. <flink.version>1.13.0</flink.version>
    3. <java.version>1.8</java.version>
    4. <scala.binary.version>2.12</scala.binary.version>
    5. <slf4j.version>1.7.30</slf4j.version>
    6. </properties>
    7. <dependencies>
    8. <!-- 引入 Flink 相关依赖-->
    9. <dependency>
    10. <groupId>org.apache.flink</groupId>
    11. <artifactId>flink-java</artifactId>
    12. <version>${flink.version}</version>
    13. </dependency>
    14. <dependency>
    15. <groupId>org.apache.flink</groupId>
    16. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    17. <version>${flink.version}</version>
    18. </dependency>
    19. <dependency>
    20. <groupId>org.apache.flink</groupId>
    21. <artifactId>flink-clients_${scala.binary.version}</artifactId>
    22. <version>${flink.version}</version>
    23. </dependency>
    24. <!-- 引入日志管理相关依赖-->
    25. <dependency>
    26. <groupId>org.slf4j</groupId>
    27. <artifactId>slf4j-api</artifactId>
    28. <version>${slf4j.version}</version>
    29. </dependency>
    30. <dependency>
    31. <groupId>org.slf4j</groupId>
    32. <artifactId>slf4j-log4j12</artifactId>
    33. <version>${slf4j.version}</version>
    34. </dependency>
    35. <dependency>
    36. <groupId>org.apache.logging.log4j</groupId>
    37. <artifactId>log4j-to-slf4j</artifactId>
    38. <version>2.14.0</version>
    39. </dependency>
    40. </dependencies>

    3.配置日志管理

            在目录 src/main/resources 下添加文件:log4j.properties,内容配置如下:

    1. log4j.rootLogger=error, stdout
    2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    4. log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

    4.编写代码

            编写 StreamWordCount 类,单词汇总

    1. package com.qiyu;
    2. import org.apache.flink.api.common.typeinfo.Types;
    3. import org.apache.flink.api.java.tuple.Tuple2;
    4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    5. import org.apache.flink.streaming.api.datastream.KeyedStream;
    6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. import org.apache.flink.util.Collector;
    9. import java.util.Arrays;
    10. /**
    11. * @author MR.Liu
    12. * @version 1.0
    13. * @data 2023-10-18 14:45
    14. */
    15. public class StreamWordCount {
    16. public static void main(String[] args) throws Exception {
    17. // 1. 创建流式执行环境
    18. StreamExecutionEnvironment env =
    19. StreamExecutionEnvironment.getExecutionEnvironment();
    20. // 2. 读取文本流
    21. DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",
    22. 7777);
    23. // 3. 转换数据格式
    24. SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
    25. .flatMap((String line, Collector<String> words) -> {
    26. Arrays.stream(line.split(" ")).forEach(words::collect);
    27. })
    28. .returns(Types.STRING)
    29. .map(word -> Tuple2.of(word, 1L))
    30. .returns(Types.TUPLE(Types.STRING, Types.LONG));
    31. // 4. 分组
    32. KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
    33. .keyBy(t -> t.f0);
    34. // 5. 求和
    35. SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
    36. .sum(1);
    37. // 6. 打印
    38. result.print();
    39. // 7. 执行
    40. env.execute();
    41. }
    42. }

    5.测试

    在hadoop102 服务器中 执行:

    nc -lk 7777

    再运行 StreamWordCount java类

    在命令行随意疯狂输出

    idea 控制台 打印结果:

    测试代码正常

    6. 打包程序提交到集群中运行

            在pom.xml添加打包插件

    1. <build>
    2. <plugins>
    3. <plugin>
    4. <groupId>org.apache.maven.plugins</groupId>
    5. <artifactId>maven-assembly-plugin</artifactId>
    6. <version>3.0.0</version>
    7. <configuration>
    8. <descriptorRefs>
    9. <descriptorRef>jar-with-dependencies</descriptorRef>
    10. </descriptorRefs>
    11. </configuration>
    12. <executions>
    13. <execution>
    14. <id>make-assembly</id>
    15. <phase>package</phase>
    16. <goals>
    17. <goal>single</goal>
    18. </goals>
    19. </execution>
    20. </executions>
    21. </plugin>
    22. </plugins>
    23. </build>

    直接使用 maven 中的 package命令,控制台显示 BUILD SUCCESS 就是打包成功!

    选择 MyFlinkFirst-1.0-SNAPSHOT.jar 提交到 web ui 上

    上传 jar 后,点击 jar 包名称 ,填写 主要配置程序入口主类的全类名,任务运行的并行度。完成后 点击 submit 

    查看 任务运行列表 

    点击任务

    点击“Task Managers”,打开 Stdout,并且在 hadoop102 命令行 疯狂输出 

    Stdout 就会显示 结果

  • 相关阅读:
    orangepi香橙派 ubuntu安装RabbitMQ
    PyTorch ConvTranspose2d 的定义与计算过程
    windows使用nginx探索笔记
    1553B总线测试仪
    BufferCache与PageCache
    [网络] 前端大文件上传
    解析java在的debug模式之属性断点调试
    Sentinel学习(2)——sentinel的使用,引入依赖和配置 & 对消费者进行流控 & 对生产者进行熔断降级
    以题为例浅谈SSRF
    C/C++数1的个数 2019年9月电子学会青少年软件编程(C/C++)等级考试一级真题答案解析
  • 原文地址:https://blog.csdn.net/qq_35370485/article/details/133905729