阅读目标:
本文为入门级别文章,即阅读完下文你需要简单的知道 flink 是做什么用的,他的主要特点是什么。工欲善其事必先利其器更深入的了解,待熟练后再回头看看。
简而言之flink就是一个框架,你在框架里面编写代码(接收从某处来的数据->数据处理/转换->将处理好的数据输出到某地),将编写好的代码交给flink集群,由集群取调度任务去处理
阅读并实践本文可能会存在某些问题,你还需要阅读其他文章/博客加深对flink的理解(如下文中提到的某些概念:有界、无界等等
实际是因为我懒得写了。。。
Flink 起源于一个叫作 Stratosphere 的项目,它是由 3 所地处柏林的大学和欧洲其他一些大 学在 2010~2014 年共同进行的研究项目,由柏林理工大学的教授沃克尔·马尔科(Volker Markl) 领衔开发。2014 年 4 月,Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,Flink 就 是在此基础上被重新设计出来的。 在德语中,“flink”一词表示“快速、灵巧”。项目的 logo 是一只彩色的松鼠,当然了, 这不仅是因为 Apache 大数据项目对动物的喜好(是否联想到了 Hadoop、Hive?),更是因为 松鼠这种小动物完美地体现了“快速、灵巧”的特点。关于 logo 的颜色,还一个有趣的缘由: 柏林当地的松鼠非常漂亮,颜色是迷人的红棕色;而 Apache 软件基金会的 logo,刚好也是一 根以红棕色为主的渐变色羽毛。于是,Flink 的松鼠 Logo 就设计成了红棕色,而且拥有一个漂 亮的渐变色尾巴,尾巴的配色与 Apache 软件基金会的 logo 一致。这只松鼠色彩炫目,既呼应 了 Apache 的风格,似乎也预示着 Flink 未来将要大放异彩。
Flink 的官网主页地址:https://flink.apache.org/ 在 Flink 官网主页的顶部可以看到,项目的核心目标,是“数据流上的有状态计算”(Stateful Computations over Data Streams)。
很多专业词汇,我们从中至少可以提炼出一些容易理解的信息:Flink 是一个“框 架”,是一个数据处理的“引擎”;既然是“分布式”,当然是为了应付大规模数据的应用场景 了;另外,Flink 处理的是数据流。所以,Flink 是一个流式大数据处理引擎。 而“内存执行速度”和“任意规模”,突出了 Flink 的两个特点:速度快、可扩展性强— —这说的自然就是小松鼠的“快速”和“灵巧”了。
以下案例为环境jdk1.8,且以下案例均为展示使用,目的是为了明白这两种方式的区别以及基本使用
- jdk 1.8
- maven
- win10
- flink 1.15.2
以下示例代码仅做入门级别使用,非生产可用。
pom文件
UTF-8
1.8
1.15.2
1.8
2.12
2.17.1
org.apache.flink
flink-streaming-java
${flink.version}
provided
org.apache.flink
flink-clients
${flink.version}
provided
org.apache.flink
flink-connector-kafka
${flink.version}
org.ansj
ansj_seg
5.1.6
org.apache.logging.log4j
log4j-slf4j-impl
${log4j.version}
runtime
org.apache.logging.log4j
log4j-api
${log4j.version}
runtime
org.apache.logging.log4j
log4j-core
${log4j.version}
runtime
这里可以简单的理解为源源不断的数据,需要不断监听某个消息队列(kafka)或者其他来源。
public static final String HOST = "192.168.20.127";
public static final Integer PORT = 8888;
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = environment.socketTextStream(HOST, PORT);
SingleOutputStreamOperator> wordsCollector = source.flatMap((String line, Collector> collector) -> {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(new Tuple2(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
SingleOutputStreamOperator> sum = wordsCollector.keyBy(0).sum(1);
sum.print();
environment.execute();
}
这里可以简单的理解为批量数据处理。
运行类
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//new 一个实例!
Properties properties = new Properties();
//告诉程序我们要接收那台机器上生产的数据
properties.setProperty("bootstrap.servers", "master:9092");
//告诉程序开启分区,已经分区名称
properties.setProperty("group.id", "temp-1");
//属性key.serializer和value.serializer就是key和value指定的序列化方式。
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//读取kafka数据的时候需要指定消费策略,如果不指定会使用auto.offset.reset设置
//earliest当各分区下有已提交的offset时,从提交的offset开始消费;
//无提交的offset时,从头开始消费;
//latest,当各分区下有已提交的offset时,从提交的offset开始消费;
//无提交的offset时,消费新产生的该分区下的数据;
//none,topic各分区都存在已提交的offset时,从offset后开始消费;
//只要有一个分区不存在已提交的offset,则抛出异常
properties.setProperty("auto.offset.reset", "earliest");
//enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
properties.setProperty("enable.auto.commit", "false");
//如果FlinkKafkaConsumer没有开启checkpoint功能,为了不重复读取
//这种方式无法实现Exactly-Once(只执行一次)
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer("test_topic", new SimpleStringSchema(), properties);
DataStreamSource lines = environment.addSource(flinkKafkaConsumer);
SingleOutputStreamOperator> sum = lines.flatMap((String line, Collector> collector) -> {
List terms = ToAnalysis.parse(line).getTerms();
terms.forEach(item -> {
collector.collect(new Tuple2<>(item.getName(), 1L));
});
}).returns(Types.TUPLE(Types.STRING, Types.LONG)).keyBy(0).sum(1);
sum.print();
environment.execute("word-coun-kafka");
}
提交有两种方式
web-ui界面
访问部署服务器 ip:8081
点击 Submit new Job
点击Add new
编辑Entry class与Parallelism等
main()
函数的类的全限定名点击Submit
点击Jobs -> Running Jobs 查看
命令行
如果要把job提交到jobmanager,应该在jobmanager服务器上提交
Flink的安装和部署主要分为本地模式和集群模式,其中本地模式只需直接解压就可以使用,不以修改任何参数,一般在做一些简单测试的时候使用。
集群模式包含Standalone、Flink on Yarn等模式,适合在生产环境下面使用,且需要修改对应的配置 参数。
## 官方版本(可能下载速度慢)
curl -O https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
## 腾讯云镜像(推荐,国内速度快)
curl -O http://mirrors.cloud.tencent.com/apache/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
下载完成解压,解压后目录如下
系统环境
以下均基于 Kernel
- CentOS Linux release 7.9.2009 (Core)
- Linux version 3.10.0-1160.el7.x86_64
- gcc version 4.8.5 20150623 (Red Hat 4.8.5-44) (GCC)
- open-jdk 11
- 大部分过程中使用root用户。请在生产环境或特殊环境注意用户切换。本文不在linux用户做过多赘述。
自己是jobmanager也是taskmanager(会话模式)
配置文件详解
conf/flink-conf.yaml
cd conf
vim flink-conf.yaml
# 此处修改集群时需要修改
jobmanager.rpc.address: localhost
# 默认1623
jobmanager.rpc.port: 6123
# 任务管理默认
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
# 任务槽 资源(并行执行 相当于 组)
taskmanager.numberOfTaskSlots: 1
# 默认并行度
parallelism.default: 1
# web界面默认端口 需要修改时 解开注释
#rest.port: 8081
master
当前jobmanager(默认localhost)以及webui端口(默认8081)
works
单节点启动默认这里面没有东西
启动脚本
# 进入flink bin目录
cd bin
# 单节点集群启动
./start-cluster.sh
访问服务器ip加8081(默认)
停止服务
# 进入flink bin目录
cd bin
# 单节点集群启动
./stop-cluster.sh
至少需要三台服务器。一台jobmanager
,两台taskmanager
,三台服务器之间需要配置免密登录,这里为了方便,我修改了hosts文件,三台服务器分别为
master
、slave0
、slave1
。(会话模式)
修改hosts(ip地址 主机名/域名 (主机别名))
自己的服务器IP-1 master
自己的服务器IP-2 slave0
自己的服务器IP-3 slave1
使配置文件生效请参考 CentOS修改hosts
服务器之间免密登录
请自行百度/google(master 最好也将自身产生的秘钥导入自身,不导也可以会导致每次启动flink需要输入本机密码)
修改配置文件
master 服务器
flink-conf.yaml
# 用于节点间通信
jobmanager.rpc.address: 0.0.0.0
master
master:8081
works
# 另外两台机器
slave0
slave1
slave0 服务器
flink-conf.yaml
jobmanager.rpc.address: master
# 不改此处 集群运行后 solt为0
jobmanager.bind-host: 0.0.0.0
master
master:8081
works
slave0
slave1
slave1 服务器
flink-conf.yaml
jobmanager.rpc.address: master
# 不改此处 集群运行后 solt为0
jobmanager.bind-host: 0.0.0.0
master
master:8081
works
slave0
slave1
修改环境变量
master/slave0/slave1 分别执行以下操作(因文件都是由master分发,所以目录位置应都一致,当然可自行修改)
## 修改环境变量
vim /etc/profile
## 新增以下内容
export FLINK_HOME=/software/flink-cluster/flink/
export PATH=$PATH:$FLINK_HOME/bin
## 使环境变量生效
source /etc/profile
运行集群
在master
bin目录下执行,看到以下几截图后集群启动成功,即可访问webUI界面
./start-cluster.sh
且执行jps
命令后
且slave0
与slave1
执行jps
后
web ui 界面
## 下载openjdkjdk
curl -O https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz
## 解压
tar zxf openjdk-11+28_linux-x64_bin.tar.gz
## 添加jdk11 /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
sudo update-alternatives --install /usr/bin/java java /home/flink/opt/jdk-11/bin/java 1
## 添加jdk11 /opt/openjdk11/jdk-11/ 应为压缩包实际解压路径
sudo update-alternatives --install /usr/bin/javac javac /home/flink/opt/jdk-11/bin/javac 1
## 切换
sudo update-alternatives --config java
sudo update-alternatives --config javac
windows 10 专业版 21H2 WSL2
运行Docker Desktop Installer.exe
su
chmod -v u+w /etc/sudoers
vim /etc/sudoers
root ALL=(ALL) ALL
chmod -v u-w /etc/sudoers
exit