• Flink-提交job


    目录

    一、Flink 流处理扩展及说明

    二、Flink部署

    三、Standalone模式

    四、在命令行提交job:

    五、在网页中提交flink job


    一、Flink 流处理扩展及说明

    涉及:自定义线程优先级

     =socket流中读取数据并行度只能是 1

    1、特定的算子设定了并行度最优先

    2、算子没有设定并行度就是用整体运行环境设置的并行度

    3、环境的并行度没有设置就使用提交时候提交参数设置的并行度

    4、都没有设置就遵循 flink的配置文件

    增加:

    1、自定义线程

    2、从外部命令中提取参数 

    1. package com.atguigu.wc
    2. import org.apache.flink.api.java.utils.ParameterTool
    3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    4. import org.apache.flink.streaming.api.scala._
    5. object StreamWordCount_02 {
    6. def main(args: Array[String]): Unit = {
    7. //创建流处理的执行环境
    8. val env = StreamExecutionEnvironment.getExecutionEnvironment
    9. //env.setParallelism(8) //自定义线程,设置几就有几个线程
    10. //从外部命令中提取参数,作为socket主机名和端口号
    11. val paramTool:ParameterTool = ParameterTool.fromArgs(args)
    12. val host:String = paramTool.get("host")
    13. val port:Int = paramTool.getInt("port")
    14. //接收一个socket文本流
    15. val inputDataStream:DataStream[String] = env.socketTextStream(host,port)
    16. //进行转化处理统计
    17. val resultDataStream:DataStream[(String,Int)] = inputDataStream
    18. .flatMap(_.split(" "))
    19. .filter(_.nonEmpty)
    20. .map((_,1))
    21. .keyBy(0)
    22. .sum(1)
    23. resultDataStream.print().setParallelism(1) //自定义线程为1;所以线程是一致的,并行度
    24. //启动任务执行
    25. env.execute("stream word count")
    26. }
    27. }

    二、Flink部署

    三、Standalone模式

    首先:安装

    1、将文件flink-1.10.1-bin-scala_2.12.tgz上传到software目录下

    2、解压缩flink-1.10.1-bin-scala_2.12.tgz到module

    解压缩命令:tar -zxvf -C flink-1.10.1-bin-scala_2.12.tgz /opt/module/

    3、在conf目录下修改flink-conf.yaml文件

    [atguigu@hadoop102 conf]$ vi flink-conf.yaml

    修改内容如下:

     jobmanager.rpc.address:hadoop102

     4、在conf目录下修改slaves文件

    修改内容如下:

    hadoop102

    hadoop103

    hadoop104

     5、使用xsync命令分发flink文件

    [atguigu@hadoop102 module]$ xsync flink-1.10.0 

    6、启动flink

    在bin目录下启动flink

    [atguigu@hadoop102 bin]$ ./start-cluster.sh

    关闭flink命令

    [atguigu@hadoop102 bin]$ ./stop-cluster.sh
    

    访问以下网址对flink集群和任务进行监控管理  

    新建标签页icon-default.png?t=M7J4http://hadoop102:8081/在命令行提交job

    如果没有安装netcat则需要安装

    命令:yum install -y nc

    1、首先需要打包jar

    Maven》生命周期》package 

    在target目录下可以查看到打包好的文件,或者在本机上也可以查看

    在flink文件下创建一个目录data

    将打包好的文件FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar上传到flink下的data文件 

    [atguigu@hadoop102 flink]$ mkdir data 

    四、在命令行提交job:

    前提要先启动flink

    [atguigu@hadoop102 bin]$ ./flink sun -c com.atguigu.wc.StreamWordCount -p 2 ../data/FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host hadoop102 --port 7777

    参数解析:

    -c :指定main方法的全类名

    -p : 并行度

    --host hadoop102 --port 7777 :外部设置的参数

    3、打开一个新页面,打开netcat

    命令:yum install -y nc

    4、在网页中即可查看到正在运行的job

    可以在命令行上上传数据,在Task Managers》点击运行的集群》Stdout上查看任务运行

     

     5、取消job(需要的话可以)

    第一种方法:在网页中取消

    第二种方法:在命令行取消

    [atguigu@hadoop102 bin]$ ./flink cancel + 你的job id

     id获取方法:

    第一种:在网页中获取

     第二种:在命令行输入命令

    在bin目录下 ./flink list -a    即可查看提交的job id

    第三种:在提交的命令行中也可以获取

    查看所有运行或者重启中的flink job

    命令: . /flink list 

    五、在网页中提交flink job

    网址: http://hadoop102:8081/

     首先需要打包好jar》进入网页》Submit New Job》Add New》选择上传的jar》单击选择的作业》配置对应参数

    参数解读:

     可show plan 查看

     最后点击submit提交job即可

    测试:

    可以 在命令行输入数据。在网页上面查看 在Task Managers》点击运行的集群》Stdout上查看任务运行

     

  • 相关阅读:
    如何把大的‘tar‘存档文件分割成特定大小的多个文件
    C语言实验四 循环结构程序设计(一)
    第五章:人工智能深度学习教程-人工神经网络(第一节-人工神经网络及其应用)
    ​kali渗透测试环境搭建
    JavaScript(Array,String,window对象)入门
    Pandas数据分析15——pandas数据透视表和交叉表
    如何完美解决Sqoop导入导出MySQL数据错位问题
    开源软件的影响力及未来发展趋势
    基于jsp的学生培训管理系统
    一份超预期的期中成绩,拨开百果园“高价值迷雾”
  • 原文地址:https://blog.csdn.net/qq_70085330/article/details/126455451