• sprintboot集成flink快速入门demo


    一、flink介绍

    Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

    二、环境搭建

    安装flink

    • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

    安装Netcat

    Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。它被广泛用于测试网络中的端口,发送文件等操作。使用 Netcat 可以轻松地进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作。因为其功能强大而又简单易用,所以在计算机安全领域也有着广泛的应用。

    安装nc命令

     
     
    yum install -y nc

    启动socket端口

     
     
    [root@node01 bin]# nc -lk 8888

    三、代码工程

    实验目的:无界流之读取socket文本流 

    pom.xml

     
     
    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>springboot-demoartifactId>
    7. <groupId>com.etgroupId>
    8. <version>1.0-SNAPSHOTversion>
    9. parent>
    10. <modelVersion>4.0.0modelVersion>
    11. <artifactId>flinkartifactId>
    12. <properties>
    13. <maven.compiler.source>8maven.compiler.source>
    14. <maven.compiler.target>8maven.compiler.target>
    15. properties>
    16. <dependencies>
    17. <dependency>
    18. <groupId>org.springframework.bootgroupId>
    19. <artifactId>spring-boot-starter-webartifactId>
    20. dependency>
    21. <dependency>
    22. <groupId>org.springframework.bootgroupId>
    23. <artifactId>spring-boot-autoconfigureartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.springframework.bootgroupId>
    27. <artifactId>spring-boot-starter-testartifactId>
    28. <scope>testscope>
    29. dependency>
    30. <dependency>
    31. <groupId>org.apache.flinkgroupId>
    32. <artifactId>flink-streaming-javaartifactId>
    33. <version>1.17.0version>
    34. dependency>
    35. <dependency>
    36. <groupId>org.apache.flinkgroupId>
    37. <artifactId>flink-javaartifactId>
    38. <version>1.17.0version>
    39. dependency>
    40. <dependency>
    41. <groupId>org.apache.flinkgroupId>
    42. <artifactId>flink-clientsartifactId>
    43. <version>1.17.0version>
    44. dependency>
    45. <dependency>
    46. <groupId>org.apache.flinkgroupId>
    47. <artifactId>flink-connector-baseartifactId>
    48. <version>1.17.0version>
    49. dependency>
    50. <dependency>
    51. <groupId>org.apache.flinkgroupId>
    52. <artifactId>flink-connector-filesartifactId>
    53. <version>1.17.0version>
    54. dependency>
    55. <dependency>
    56. <groupId>org.apache.flinkgroupId>
    57. <artifactId>flink-connector-kafkaartifactId>
    58. <version>1.17.0version>
    59. dependency>
    60. <dependency>
    61. <groupId>org.apache.flinkgroupId>
    62. <artifactId>flink-runtime-webartifactId>
    63. <version>1.17.0version>
    64. dependency>
    65. dependencies>
    66. <build>
    67. <plugins>
    68. <plugin>
    69. <groupId>org.apache.maven.pluginsgroupId>
    70. <artifactId>maven-shade-pluginartifactId>
    71. <executions>
    72. <execution>
    73. <phase>packagephase>
    74. <goals>
    75. <goal>shadegoal>
    76. goals>
    77. <configuration>
    78. <transformers>
    79. <transformer
    80. implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    81. <resource>META-INF/spring.handlersresource>
    82. transformer>
    83. <transformer
    84. implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
    85. <resource>META-INF/spring.factoriesresource>
    86. transformer>
    87. <transformer
    88. implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    89. <resource>META-INF/spring.schemasresource>
    90. transformer>
    91. <transformer
    92. implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
    93. <transformer
    94. implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    95. <mainClass>com.et.flink.job.SocketJobmainClass>
    96. transformer>
    97. transformers>
    98. configuration>
    99. execution>
    100. executions>
    101. plugin>
    102. plugins>
    103. build>
    104. project>

    SoketJob

     
     
    1. package com.et.flink.job;
    2. import org.apache.flink.api.common.typeinfo.TypeHint;
    3. import org.apache.flink.api.java.tuple.Tuple2;
    4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. import org.apache.flink.util.Collector;
    8. /**
    9. * @author liuhaihua
    10. * @version 1.0
    11. * @ClassName SocketJob
    12. * @Description todo
    13. * @date 2024年02月29日 17:06
    14. */
    15. public class SocketJob {
    16. public static void main(String[] args) throws Exception {
    17. // 创建执行环境
    18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    19. // 指定并行度,默认电脑线程数
    20. env.setParallelism(3);
    21. // 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
    22. DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);
    23. // 处理数据: 切换、转换、分组、聚合 得到统计结果
    24. SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
    25. .flatMap(
    26. (String value, Collector<Tuple2<String, Integer>> out) -> {
    27. String[] words = value.split(" ");
    28. for (String word : words) {
    29. out.collect(Tuple2.of(word, 1));
    30. }
    31. }
    32. )
    33. .setParallelism(2)
    34. // // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2。只有显式设置系统当前返回类型,才能正确解析出完整数据
    35. .returns(new TypeHint<Tuple2<String, Integer>>() {
    36. })
    37. // .returns(Types.TUPLE(Types.STRING,Types.INT))
    38. .keyBy(value -> value.f0)
    39. .sum(1);
    40. // 输出
    41. sum.print();
    42. // 执行
    43. env.execute();
    44. }
    45. }

    代码仓库

    • https://github.com/Harries/springboot-demo

    四、测试

    启动socket流

     
     
    [root@cmn-zentao-002 ~]# nc -l 8888

    本地执行

    本地直接ideal启动main程序,在socket流中输入

     
     
    1. abc bcd cde
    2. bcd cde fgh
    3. cde fgh hij

    console日志显示

     
     
    1. 3> (abc,1)
    2. 1> (fgh,1)
    3. 3> (bcd,1)
    4. 3> (cde,1)
    5. 3> (bcd,2)
    6. 3> (cde,2)
    7. 3> (cde,3)
    8. 1> (fgh,2)
    9. 2> (hij,1)

    集群执行

    执行maven打包,将打包的jar上传到集群中94713cb0980fb13dd88d7785b93deef2.png在socker中输入字符,结果和本地一样

    五、引用

    • https://juejin.cn/post/7283311024979066937

    • http://www.liuhaihua.cn/archives/710270.html

  • 相关阅读:
    Android Jetpack系列(一)起始篇:Jetpack 的前世今生
    [Model.py 03]Modification for creating terrain matrix3.
    PostgreSQL 的时间差DATEDIFF
    nc前端合计行、按钮组
    【Java 进阶篇】JDBC ResultSet 类详解
    Python从入门到实践(七)函数
    ue5打包失败与优化项目
    react高阶组件——HOC
    数据结构栈和队列
    2023兰州理工大学计算机考研信息汇总
  • 原文地址:https://blog.csdn.net/dot_life/article/details/136408621