Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。
环境搭建:
①、安装flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
②、安装Netcat
Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。
用于测试网络中的端口,发送文件等操作。
进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作
yum install -y nc # 安装nc命令
nc -lk 8888 # 启动socket端口
一、依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demoartifactId>
<groupId>com.etgroupId>
<version>1.0-SNAPSHOTversion>
parent>
<modelVersion>4.0.0modelVersion>
<artifactId>flinkartifactId>
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-autoconfigureartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-javaartifactId>
<version>1.17.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.17.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clientsartifactId>
<version>1.17.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-baseartifactId>
<version>1.17.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-filesartifactId>
<version>1.17.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafkaartifactId>
<version>1.17.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-runtime-webartifactId>
<version>1.17.0version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-shade-pluginartifactId>
<executions>
<execution>
<phase>packagephase>
<goals>
<goal>shadegoal>
goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlersresource>
transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factoriesresource>
transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemasresource>
transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.et.flink.job.SocketJobmainClass>
transformer>
transformers>
configuration>
execution>
executions>
plugin>
plugins>
build>
project>
二、SoketJob
public class SocketJob{
public static void main(String[] args)throws Exception{
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定并行度,默认电脑线程数
env.setParallelism(3);
// 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);
// 处理数据: 切换、转换、分组、聚合 得到统计结果
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
.flatMap(
(String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
)
.setParallelism(2)
// // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2。只有显式设置系统当前返回类型,才能正确解析出完整数据
.returns(new TypeHint<Tuple2<String, Integer>>() {
})
// .returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
// 输出
sum.print();
// 执行
env.execute();
}
}
测试:
启动socket流:
nc -l 8888
本地执行:直接ideal启动main程序,在socket流中输入
abc bcd cde
bcd cde fgh
cde fgh hij
集群执行:
执行maven打包,将打包的jar上传到集群中