• Yarn模式部署Flink集群


    一、环境准备

    1、准备两台服务器server115 和server116安装好hadoop环境,其中server115配置hdfs的namenode,在server116上配置hdfs的SecondaryNameNode,server116配置yarn的 ResourceManager,启动hadoop集群

    2、配置hadoop环境变量

       vim  /etc/profile

    1. export HADOOP_CLASSPATH=`hadoop classpath`

    二、配置FLink集群环境

    关于flink的架构图

     部署配置:

    服务器server115server116
    Flink组件taskmangerjobmanager   taskmanager

    (1) 进入server115节点,编辑flink的配置文件flink-conf.yml

    1. # JobManager runs.
    2. jobmanager.rpc.address: server116
    3. jobmanager.bind-host: server116
    4. taskmanager.bind-host: server115
    5. taskmanager.host: server115
    6. rest.port: 8081
    7. rest.address: server116

    (2) 进入server116节点,编辑flink的配置文件flink-conf.yml

    1. # JobManager runs.
    2. jobmanager.rpc.address: server116
    3. jobmanager.bind-host: server116
    4. taskmanager.bind-host: server116
    5. taskmanager.host:server116
    6. rest.port: 8081
    7. rest.address: server116

    (3)在server115、server116的节点workers配置

    1. server115
    2. server116

    三、启动flink

     ./flink/bin/yarn-session.sh    -d
    

     JobManager Web Interface: http://server116:40617

    四、任务测试

      1、 开启一个tcp服务

    nc  -l  -p  9999

     2、编写java侦听代码

    1. public class SocketFlinkExecute {
    2. public static void main(String[] args) {
    3. StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("server116", 40617);
    4. DataStream<Tuple2<String, Integer>> dataStream = env
    5. .socketTextStream("server116", 9999)
    6. .flatMap(new MySplitter())
    7. .keyBy(value -> value.f0)
    8. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    9. .sum(1);
    10. dataStream.print();
    11. try {
    12. env.execute("Window WordCount Remote");
    13. } catch (Exception e) {
    14. e.printStackTrace();
    15. }
    16. }
    17. public static class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    18. @Override
    19. public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
    20. for (String word: sentence.split(" ")) {
    21. out.collect(new Tuple2<String, Integer>(word, 1));
    22. }
    23. }
    24. }
    25. }

    运行程序,tcp服务端发送文本数据

     

     

  • 相关阅读:
    OLOv9与YOLOv8性能差别详解
    oracle查询数据库内全部的表名、列明、注释、数据类型、长度、精度等
    电源硬件设计----电源基础知识(3)
    Node面试题总结最全
    Exception_json反序列化失败_JSONException
    【OpenCV】-重映射
    7亿中国男人,今年夏天都在穿什么?
    【MQTT】关于部署含有MQTT协议的程序pod到K8S中出现的问题
    基于语义分割的相机外参标定
    Android学习笔记 67. A部分:首个交互式UI
  • 原文地址:https://blog.csdn.net/fengchengwu2012/article/details/128065230