• 04 flink 集群搭建


    前言

     

    呵呵 最近有一系列环境搭建的相关需求

    记录一下

    flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152

    150 为 master, 151 为 slave01, 152 为 slave02

    三台机器都做了 trusted shell  
     

    flink 集群搭建

    flink 三个节点 : 192.168.110.150, 192.168.110.151, 192.168.110.152

    1. 基础环境准备

    192.168.110.150, 192.168.110.151, 192.168.110.152 上面安装 jdk, 上传 flink 的安装包

    安装包来自于 Apache Downloads

    2. flink 配置调整

    更新 master, slave01, slave02 的 flink-conf.yml 的配置文件如下

    1. jobmanager.rpc.address: master
    2. jobmanager.rpc.port: 612
    3. jobmanager.bind-host: 0.0.0.0
    4. jobmanager.memory.process.size: 1600
    5. jobmanager.execution.failover-strategy: region
    6. taskmanager.bind-host: 0.0.0.0
    7. taskmanager.host: master
    8. taskmanager.memory.process.size: 1728m
    9. taskmanager.numberOfTaskSlots:
    10. parallelism.default: 1
    11. rest.port: 8081
    12. rest.address: master
    13. rest.bind-address: 0.0.0.0

    1. jobmanager.rpc.address: master
    2. jobmanager.rpc.port: 612
    3. jobmanager.bind-host: 0.0.0.0
    4. jobmanager.memory.process.size: 1600
    5. jobmanager.execution.failover-strategy: region
    6. taskmanager.bind-host: 0.0.0.0
    7. taskmanager.host: slave01
    8. taskmanager.memory.process.size: 1728m
    9. taskmanager.numberOfTaskSlots:
    10. parallelism.default: 1
    11. rest.port: 8081
    12. rest.address: master
    13. rest.bind-address: 0.0.0.0

    1. jobmanager.rpc.address: master
    2. jobmanager.rpc.port: 612
    3. jobmanager.bind-host: 0.0.0.0
    4. jobmanager.memory.process.size: 1600
    5. jobmanager.execution.failover-strategy: region
    6. taskmanager.bind-host: 0.0.0.0
    7. taskmanager.host: slave02
    8. taskmanager.memory.process.size: 1728m
    9. taskmanager.numberOfTaskSlots:
    10. parallelism.default: 1
    11. rest.port: 8081
    12. rest.address: master
    13. rest.bind-address: 0.0.0.0

    3. 启动集群 

    三台机器 master 启动 jobmanager, slave01, slave02 启动 taskmanager 

    相关脚本 存在于 flink 家目录的 bin 目录下面

    测试集群

    测试的 flink driver program 如下, 然后直接打包 

    1. package com.hx.test;
    2. import org.apache.flink.api.common.functions.FlatMapFunction;
    3. import org.apache.flink.api.common.functions.MapFunction;
    4. import org.apache.flink.api.java.functions.KeySelector;
    5. import org.apache.flink.api.java.tuple.Tuple2;
    6. import org.apache.flink.client.deployment.executors.RemoteExecutor;
    7. import org.apache.flink.configuration.Configuration;
    8. import org.apache.flink.configuration.DeploymentOptions;
    9. import org.apache.flink.configuration.JobManagerOptions;
    10. import org.apache.flink.configuration.RestOptions;
    11. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    13. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    14. import org.apache.flink.streaming.api.functions.source.SourceFunction;
    15. import org.apache.flink.util.Collector;
    16. import scala.Unit;
    17. /**
    18. * Test01WordCount
    19. *
    20. * @author Jerry.X.He <970655147@qq.com>
    21. * @version 1.0
    22. * @date 2021-04-02 18:07
    23. */
    24. public class Test03SteamingWordCount {
    25. public static void main(String[] args) throws Exception {
    26. // 创建一个批处理的执行环境
    27. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    28. // Configuration conf = new Configuration();
    29. // conf.setString(JobManagerOptions.ADDRESS, "192.168.110.150");
    30. // conf.setInteger(JobManagerOptions.PORT, 6123);
    31. // conf.setInteger(RestOptions.PORT, 8081);
    32. // conf.setString(DeploymentOptions.TARGET.key(), RemoteExecutor.NAME);
    33. // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    34. env.setParallelism(1);
    35. DataStreamSource<String> inputDs = env.addSource(new MySource());
    36. inputDs
    37. .flatMap(new FlatMapFunction<String, String>() {
    38. @Override
    39. public void flatMap(String s, Collector<String> collector) throws Exception {
    40. String[] splited = s.split("\\s+");
    41. for(String str : splited) {
    42. collector.collect(str);
    43. }
    44. }
    45. })
    46. .map(new MapFunction<String, Tuple2<String, Integer>>() {
    47. @Override
    48. public Tuple2<String, Integer> map(String s) throws Exception {
    49. return new Tuple2<>(s, 1);
    50. }
    51. })
    52. .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
    53. @Override
    54. public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
    55. return stringIntegerTuple2.f0;
    56. }
    57. })
    58. .sum(1)
    59. .addSink(new MySink());
    60. env.execute();
    61. }
    62. }
    63. // MySensorSource
    64. class MySource implements SourceFunction<String> {
    65. private boolean running = true;
    66. public void cancel() {
    67. running = false;
    68. }
    69. public void run(SourceFunction.SourceContext<String> sourceContext) {
    70. while (running) {
    71. sourceContext.collect("234 345 123 346 234");
    72. sourceContext.collect("123 124");
    73. try {
    74. Thread.sleep(5000);
    75. } catch (Exception e) {
    76. e.printStackTrace();
    77. }
    78. }
    79. }
    80. }
    81. // MySensorSource
    82. class MySink implements SinkFunction<Tuple2<String, Integer>> {
    83. private boolean running = true;
    84. public void invoke(Tuple2<String, Integer> value, SinkFunction.Context context) {
    85. System.out.println(value.f0 + " - " + value.f1);
    86. }
    87. }

    通过 flink webui 上传 driver program, 然后 可以直接 submitJob 

    提交 job 之后, job 的监控如下 

    driver program 的标准输出如下

  • 相关阅读:
    echarts插件-liquidFill(水球图)
    《对比Excel,轻松学习Python数据分析》读书笔记------数据可视化
    基础设施SIG月度动态:「龙蜥大讲堂」基础设施系列专题分享完美收官,容器镜像构建 2.0 版本上线
    SparkSQL的编程题
    软件测试工程师2022年的三阶段总结
    安装windows linux 双系统
    uniapp app或微信小程序项目使用gite仓库中的图片
    【考研数学】高等数学第五模块 —— 级数(2,幂级数)
    遥控器红外解码数码管显示
    语雀服务器P0事故的一些启发
  • 原文地址:https://blog.csdn.net/u011039332/article/details/124786339