呵呵 最近有一系列环境搭建的相关需求
记录一下
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
150 为 master, 151 为 slave01, 152 为 slave02
三台机器都做了 trusted shell
flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152
192.168.110.150, 192.168.110.151, 192.168.110.152 上面安装 jdk, 上传 flink 的安装包
安装包来自于 Apache Downloads
更新 master, slave01, slave02 的 flink-conf.yml 的配置文件如下
- jobmanager.rpc.address: master
- jobmanager.rpc.port: 612
- jobmanager.bind-host: 0.0.0.0
- jobmanager.memory.process.size: 1600
- jobmanager.execution.failover-strategy: region
-
- taskmanager.bind-host: 0.0.0.0
- taskmanager.host: master
- taskmanager.memory.process.size: 1728m
- taskmanager.numberOfTaskSlots:
- parallelism.default: 1
-
- rest.port: 8081
- rest.address: master
- rest.bind-address: 0.0.0.0
- jobmanager.rpc.address: master
- jobmanager.rpc.port: 612
- jobmanager.bind-host: 0.0.0.0
- jobmanager.memory.process.size: 1600
- jobmanager.execution.failover-strategy: region
-
- taskmanager.bind-host: 0.0.0.0
- taskmanager.host: slave01
- taskmanager.memory.process.size: 1728m
- taskmanager.numberOfTaskSlots:
- parallelism.default: 1
-
- rest.port: 8081
- rest.address: master
- rest.bind-address: 0.0.0.0
- jobmanager.rpc.address: master
- jobmanager.rpc.port: 612
- jobmanager.bind-host: 0.0.0.0
- jobmanager.memory.process.size: 1600
- jobmanager.execution.failover-strategy: region
-
- taskmanager.bind-host: 0.0.0.0
- taskmanager.host: slave02
- taskmanager.memory.process.size: 1728m
- taskmanager.numberOfTaskSlots:
- parallelism.default: 1
-
- rest.port: 8081
- rest.address: master
- rest.bind-address: 0.0.0.0
三台机器 master 启动 jobmanager, slave01, slave02 启动 taskmanager
相关脚本 存在于 flink 家目录的 bin 目录下面
测试的 flink driver program 如下, 然后直接打包
- package com.hx.test;
-
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.client.deployment.executors.RemoteExecutor;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.configuration.DeploymentOptions;
- import org.apache.flink.configuration.JobManagerOptions;
- import org.apache.flink.configuration.RestOptions;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
- import org.apache.flink.util.Collector;
-
- import scala.Unit;
-
- /**
- * Test01WordCount
- *
- * @author Jerry.X.He <970655147@qq.com>
- * @version 1.0
- * @date 2021-04-02 18:07
- */
- public class Test03SteamingWordCount {
-
- public static void main(String[] args) throws Exception {
-
- // 创建一个批处理的执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // Configuration conf = new Configuration();
- // conf.setString(JobManagerOptions.ADDRESS, "192.168.110.150");
- // conf.setInteger(JobManagerOptions.PORT, 6123);
- // conf.setInteger(RestOptions.PORT, 8081);
- // conf.setString(DeploymentOptions.TARGET.key(), RemoteExecutor.NAME);
- // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.setParallelism(1);
-
- DataStreamSource<String> inputDs = env.addSource(new MySource());
- inputDs
- .flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String s, Collector<String> collector) throws Exception {
- String[] splited = s.split("\\s+");
- for(String str : splited) {
- collector.collect(str);
- }
- }
- })
- .map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String s) throws Exception {
- return new Tuple2<>(s, 1);
- }
- })
- .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
-
- @Override
- public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
- return stringIntegerTuple2.f0;
- }
- })
- .sum(1)
- .addSink(new MySink());
-
-
- env.execute();
-
- }
-
- }
-
- // MySensorSource
- class MySource implements SourceFunction<String> {
-
- private boolean running = true;
-
- public void cancel() {
- running = false;
- }
-
- public void run(SourceFunction.SourceContext<String> sourceContext) {
- while (running) {
- sourceContext.collect("234 345 123 346 234");
- sourceContext.collect("123 124");
-
- try {
- Thread.sleep(5000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- // MySensorSource
- class MySink implements SinkFunction<Tuple2<String, Integer>> {
-
- private boolean running = true;
-
- public void invoke(Tuple2<String, Integer> value, SinkFunction.Context context) {
- System.out.println(value.f0 + " - " + value.f1);
- }
-
- }
通过 flink webui 上传 driver program, 然后 可以直接 submitJob
提交 job 之后, job 的监控如下
driver program 的标准输出如下
完