Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。它被广泛用于测试网络中的端口,发送文件等操作。使用 Netcat 可以轻松地进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作。因为其功能强大而又简单易用,所以在计算机安全领域也有着广泛的应用。
安装nc命令
yum install -y nc
启动socket端口
[root@node01 bin]# nc -lk 8888
实验目的:无界流之读取socket文本流
pom.xml
- "1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>springboot-demoartifactId>
- <groupId>com.etgroupId>
- <version>1.0-SNAPSHOTversion>
- parent>
- <modelVersion>4.0.0modelVersion>
-
-
- <artifactId>flinkartifactId>
-
-
- <properties>
- <maven.compiler.source>8maven.compiler.source>
- <maven.compiler.target>8maven.compiler.target>
- properties>
- <dependencies>
-
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
-
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-autoconfigureartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-streaming-javaartifactId>
- <version>1.17.0version>
- dependency>
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-javaartifactId>
- <version>1.17.0version>
- dependency>
-
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-clientsartifactId>
- <version>1.17.0version>
- dependency>
-
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-baseartifactId>
- <version>1.17.0version>
- dependency>
-
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-filesartifactId>
- <version>1.17.0version>
- dependency>
-
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-kafkaartifactId>
- <version>1.17.0version>
- dependency>
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-runtime-webartifactId>
- <version>1.17.0version>
- dependency>
-
-
-
-
- dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-shade-pluginartifactId>
- <executions>
- <execution>
- <phase>packagephase>
- <goals>
- <goal>shadegoal>
- goals>
- <configuration>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>META-INF/spring.handlersresource>
- transformer>
- <transformer
- implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
- <resource>META-INF/spring.factoriesresource>
- transformer>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>META-INF/spring.schemasresource>
- transformer>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>com.et.flink.job.SocketJobmainClass>
- transformer>
- transformers>
- configuration>
- execution>
- executions>
- plugin>
- plugins>
- build>
-
-
-
-
-
-
- project>
SoketJob
- package com.et.flink.job;
-
-
- import org.apache.flink.api.common.typeinfo.TypeHint;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
-
- /**
- * @author liuhaihua
- * @version 1.0
- * @ClassName SocketJob
- * @Description todo
- * @date 2024年02月29日 17:06
- */
-
-
- public class SocketJob {
- public static void main(String[] args) throws Exception {
- // 创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 指定并行度,默认电脑线程数
- env.setParallelism(3);
- // 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
- DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);
-
-
- // 处理数据: 切换、转换、分组、聚合 得到统计结果
- SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
- .flatMap(
- (String value, Collector<Tuple2<String, Integer>> out) -> {
- String[] words = value.split(" ");
- for (String word : words) {
- out.collect(Tuple2.of(word, 1));
- }
- }
- )
- .setParallelism(2)
- // // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2
。只有显式设置系统当前返回类型,才能正确解析出完整数据 - .returns(new TypeHint<Tuple2<String, Integer>>() {
- })
- // .returns(Types.TUPLE(Types.STRING,Types.INT))
- .keyBy(value -> value.f0)
- .sum(1);
-
-
-
-
- // 输出
- sum.print();
-
-
- // 执行
- env.execute();
- }
-
-
- }
https://github.com/Harries/springboot-demo
[root@cmn-zentao-002 ~]# nc -l 8888
本地直接ideal启动main程序,在socket流中输入
- abc bcd cde
- bcd cde fgh
- cde fgh hij
console日志显示
- 3> (abc,1)
- 1> (fgh,1)
- 3> (bcd,1)
- 3> (cde,1)
- 3> (bcd,2)
- 3> (cde,2)
- 3> (cde,3)
- 1> (fgh,2)
- 2> (hij,1)
执行maven打包,将打包的jar上传到集群中在socker中输入字符,结果和本地一样
https://juejin.cn/post/7283311024979066937
http://www.liuhaihua.cn/archives/710270.html