本次实验主要是向大家演示了在Flink Standalone 模式下运行WordCount程序的过程,其目的想让大家知道flink程序开发完成后,如何将其打包上传到服务器,并能通过查看webui界面查看flink相关的概念等。
- 虚拟机VMWareworkStation
- 操作系统:Centos7.5
- nc服务:netcat
集群环境拓扑结构:
package com.suben.flink.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
* flink run -c com.suben.flink.wc.WordCountStream -p 2 flink-basic-1.0-SNAPSHOT.jar --host master --port 6666
*/
object WordCountStream {
def main(args: Array[String]): Unit = {
// 1. 初始化环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 读取数据并进行转换
// 从外部命令中提取参数,作为socket主机名和端口号
val paramTool: ParameterTool = ParameterTool.fromArgs(args)
val host: String = paramTool.get("host")
val port: Int = paramTool.getInt("port")
// 接收一个socket文本流
val inputDataStream: DataStream[String] = env.socketTextStream(host, port)
// 进行转化处理统计
val resultDataStream: DataStream[(String, Int)] = inputDataStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
// 3. 为结果好看些,设置并发度为1
resultDataStream.print().setParallelism(1)
// 启动任务执行
env.execute("stream word count")
}
}
使用ssh工具如mobXstrem或者xshell等上传到虚拟机中,如上传到master主节点中。
启动flink集群,在master主节点上执行命令:start-cluster
,回车即可,如图所示:
验证下flink集群是否成功启动,执行:jps
命令即可,如图所示:
启动nc服务,执行: nc -l -p 6666
,如图所示:
提交程序包到flinkj集群有两种方式
:
flink run -c com.suben.flink.wc.WordCountStream -p 2 flink-basic-1.0-SNAPSHOT.jar --host master --port 6666
我这里采取的是在webUI上传提交程,程序提交后,会看到如下图所示:
在nc端发送如下数据:
I love Guizhou
I love my home
I love Flink
I love Bigdata
如图所示:
我们看看程序运行结果是在slave01从节点上的,如下所示:
点击上图所示【步骤3】的LOG后会跳转到slave01节点界面,具体如下图所示:
该实验的演示主要想向大家展示了Flink程序打包上传服务器的过程及其提交程序到Flink集群中的两种方式,并能在集群中非常明确从节点才是真正负责程序执行的节点,并能看到Flink流式计算是一个有状态的分布式实时计算框架,这个有状态的特点也是它和其他流式计算框架,如Spark、Storm等最显著的区别。另外,通过webui上也看到了taskmanager负责执行程序外还负责分配task slots并将其注册到Jobmanager中。在webui上你才会看到Flink运行时的架构图中所示的TaskManager和JobManager等组成部分。好勒,我们的实验到此结束!Enjoy yourself
~~~~