• Flink standalone 模式下运行WordCount程序过程


    WordCount打包部署到Flink Standalone 集群模式的运行过程演示

    一、实验环境


    本次实验主要是向大家演示了在Flink Standalone 模式下运行WordCount程序的过程,其目的想让大家知道flink程序开发完成后,如何将其打包上传到服务器,并能通过查看webui界面查看flink相关的概念等。

    1. 虚拟机VMWareworkStation
    2. 操作系统:Centos7.5
    3. nc服务:netcat

    集群环境拓扑结构:
    在这里插入图片描述

    二、程序及打包


    1. 源代码如下:
      package com.suben.flink.wc
      
      import org.apache.flink.api.java.utils.ParameterTool
      import org.apache.flink.streaming.api.scala._
      
      /**
       *  flink run -c com.suben.flink.wc.WordCountStream -p 2 flink-basic-1.0-SNAPSHOT.jar --host master --port 6666
       */
      object WordCountStream {
        def main(args: Array[String]): Unit = {
          // 1. 初始化环境
          val env = StreamExecutionEnvironment.getExecutionEnvironment
      
          // 2. 读取数据并进行转换
          // 从外部命令中提取参数,作为socket主机名和端口号
          val paramTool: ParameterTool = ParameterTool.fromArgs(args)
          val host: String = paramTool.get("host")
          val port: Int = paramTool.getInt("port")
      
          // 接收一个socket文本流
          val inputDataStream: DataStream[String] = env.socketTextStream(host, port)
          // 进行转化处理统计
          val resultDataStream: DataStream[(String, Int)] = inputDataStream
            .flatMap(_.split(" "))
            .filter(_.nonEmpty)
            .map((_, 1))
            .keyBy(0)
            .sum(1)
      
          // 3. 为结果好看些,设置并发度为1
          resultDataStream.print().setParallelism(1)
      
          // 启动任务执行
          env.execute("stream word count")
      
        }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
    2. IDEA中打包:
      在这里插入图片描述
    3. 生成如下图所示程序包:
      在这里插入图片描述

    三、代码上传及运行


    1. 使用ssh工具如mobXstrem或者xshell等上传到虚拟机中,如上传到master主节点中。

    2. 启动flink集群,在master主节点上执行命令:start-cluster,回车即可,如图所示:
      在这里插入图片描述

    3. 验证下flink集群是否成功启动,执行:jps命令即可,如图所示:
      在这里插入图片描述

    4. 启动nc服务,执行: nc -l -p 6666,如图所示:
      在这里插入图片描述

    5. 提交程序包到flinkj集群有两种方式

      1. 命令提交方式,执行如下命令即可
        flink run -c com.suben.flink.wc.WordCountStream -p 2 flink-basic-1.0-SNAPSHOT.jar --host master --port 6666
        
        • 1
      2. flink的集群webui上提交,如下图所示
        在这里插入图片描述
        在这里插入图片描述
    6. 我这里采取的是在webUI上传提交程,程序提交后,会看到如下图所示:
      在这里插入图片描述

    7. 在nc端发送如下数据:

      I love Guizhou
      I love my home
      I love Flink
      I love Bigdata

      如图所示:
      在这里插入图片描述

    8. 我们看看程序运行结果是在slave01从节点上的,如下所示:
      在这里插入图片描述
      点击上图所示【步骤3】的LOG后会跳转到slave01节点界面,具体如下图所示:
      在这里插入图片描述

    四、小结


    该实验的演示主要想向大家展示了Flink程序打包上传服务器的过程及其提交程序到Flink集群中的两种方式,并能在集群中非常明确从节点才是真正负责程序执行的节点,并能看到Flink流式计算是一个有状态的分布式实时计算框架,这个有状态的特点也是它和其他流式计算框架,如Spark、Storm等最显著的区别。另外,通过webui上也看到了taskmanager负责执行程序外还负责分配task slots并将其注册到Jobmanager中。在webui上你才会看到Flink运行时的架构图中所示的TaskManager和JobManager等组成部分。好勒,我们的实验到此结束!Enjoy yourself ~~~~

  • 相关阅读:
    vant的作用及其使用方法
    【计算机毕设之基于Java的贫困生资助管理系统-哔哩哔哩】 https://b23.tv/LrumkKI
    Potplayer通过公网访问群晖WebDav,快速搭建远程办公环境
    bihash总结
    【基础教程】基于Matlab实现多种算法的曲线拟合
    Tuxera NTFS与Paragon NTFS:两款NTFS驱动软件的深度对比 tuxera和paragon NTFS哪个好
    2023 全栈工程师 Node.Js 服务器端 web 框架 Express.js 详细教程(更新中)
    Java版企业电子招标采购系统源码—企业战略布局下的采购寻源
    @Valid和 @Validated
    「MySQL高级篇」SQL优化
  • 原文地址:https://blog.csdn.net/sujiangming/article/details/125434760