一、环境准备
1、准备两台服务器server115 和server116安装好hadoop环境,其中server115配置hdfs的namenode,在server116上配置hdfs的SecondaryNameNode,server116配置yarn的 ResourceManager,启动hadoop集群

2、配置hadoop环境变量
vim /etc/profile
-
- export HADOOP_CLASSPATH=`hadoop classpath`
二、配置FLink集群环境
关于flink的架构图

部署配置:
| 服务器 | server115 | server116 |
|---|---|---|
| Flink组件 | taskmanger | jobmanager taskmanager |
(1) 进入server115节点,编辑flink的配置文件flink-conf.yml
- # JobManager runs.
- jobmanager.rpc.address: server116
- jobmanager.bind-host: server116
- taskmanager.bind-host: server115
- taskmanager.host: server115
- rest.port: 8081
- rest.address: server116
(2) 进入server116节点,编辑flink的配置文件flink-conf.yml
- # JobManager runs.
- jobmanager.rpc.address: server116
- jobmanager.bind-host: server116
- taskmanager.bind-host: server116
- taskmanager.host:server116
- rest.port: 8081
- rest.address: server116
(3)在server115、server116的节点workers配置
- server115
- server116
三、启动flink
./flink/bin/yarn-session.sh -d

JobManager Web Interface: http://server116:40617

四、任务测试
1、 开启一个tcp服务
nc -l -p 9999

2、编写java侦听代码
- public class SocketFlinkExecute {
-
- public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("server116", 40617);
- DataStream<Tuple2<String, Integer>> dataStream = env
- .socketTextStream("server116", 9999)
- .flatMap(new MySplitter())
- .keyBy(value -> value.f0)
- .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
- .sum(1);
- dataStream.print();
- try {
- env.execute("Window WordCount Remote");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
- for (String word: sentence.split(" ")) {
- out.collect(new Tuple2<String, Integer>(word, 1));
- }
- }
- }
-
- }
-
运行程序,tcp服务端发送文本数据

