• 快速灵敏的 Flink1


    一、flink单机安装

    1、解压
    tar -zxvf ./flink-1.13.2-bin-scala_2.12.tgz -C /opt/soft/
    2、改名字
    mv ./flink-1.13.2/ ./flink1132
    3、profile配置
    1. #FLINK
    2. export FLINK_HOME=/opt/soft/flink1132
    3. export PATH=$FLINK_HOME/bin:$PATH
    4、查看版本
    flink --version
    5、启动关闭flink
    1. start-cluster.sh
    2. stop-cluster.sh
    6、登录网页   http://192.168.91.11:8081

    二、flink开发

    1、步骤

    创建运行环境--> 加载数据源--> 转换--> 下沉

    2、案例

    (1)学习数据源加载
    1. package nj.zb.kb23.source
    2. import org.apache.flink.streaming.api.scala._
    3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    4. object AA {
    5. def main(args: Array[String]): Unit = {
    6. //1、创建环境变量
    7. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    8. //设置并行步 1
    9. env.setParallelism(1)
    10. //2、加载数据源
    11. val stream: DataStream[Any] = env.fromElements(1,2,3,3,4,"hello",3.1415)
    12. //3、下沉
    13. stream.print()
    14. env.execute("sourcetest")
    15. }
    16. }
    (2)样例类加载数据源
    1. package nj.zb.kb23.source
    2. import org.apache.flink.streaming.api.scala._
    3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    4. import scala.util.Random
    5. //定义样例类
    6. case class SensorReading(id:String,timestamp:Long,temperature:Double)
    7. object AA {
    8. def main(args: Array[String]): Unit = {
    9. //1、创建环境变量
    10. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    11. //设置并行步 1
    12. env.setParallelism(1)
    13. //2、加载数据源
    14. val stream: DataStream[SensorReading] = env.fromCollection(List(
    15. SensorReading("sensor_1", 1698731530, 26.3),
    16. SensorReading("sensor_2", 1698731530, 26.5),
    17. SensorReading("sensor_3", 1698731531, 26.7),
    18. SensorReading("sensor_4", 1698731530, 26.9),
    19. ))
    20. //3、输出,又叫下沉
    21. stream.print()
    22. env.execute("sourcetest")
    23. }
    24. }

    (3)指定文件加载数据
    1. package nj.zb.kb23.source
    2. import org.apache.flink.streaming.api.scala._
    3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    4. object AA {
    5. def main(args: Array[String]): Unit = {
    6. //1、创建环境变量
    7. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    8. //设置并行步 1
    9. env.setParallelism(1)
    10. //2、加载数据源
    11. val stream: DataStream[String] = env.readTextFile("D:\\caozuo\\ideal\\flinkstu\\resources\\sensor")
    12. //3、输出,又叫下沉
    13. stream.print()
    14. env.execute("sourcetest")
    15. }
    16. }

    (4)指定端口,实时处理数据源
    1. package nj.zb.kb23.source
    2. import org.apache.flink.streaming.api.scala._
    3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    4. //定义样例类
    5. case class SensorReading(id:String,timestamp:Long,temperature:Double)
    6. object AA {
    7. def main(args: Array[String]): Unit = {
    8. //1、创建环境变量
    9. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    10. //设置并行步 1
    11. env.setParallelism(1)
    12. //2、加载数据源
    13. //(1)真实时处理 nc -lk 7777
    14. val stream: DataStream[String] = env.socketTextStream("192.168.91.11",7777)
    15. stream.print()
    16. //3、转换拼接
    17. val stream1: DataStream[(String, Int)] = stream
    18. .map(x=>x.split(","))
    19. .flatMap(x=>x)
    20. .map(x=>(x,1))
    21. stream1.print()
    22. //①sum
    23. val value: DataStream[(String, Int)] = stream
    24. .map(x=>x.split(","))
    25. .flatMap(x=>x).map(x=>(x,1))
    26. .keyBy(x=>x._1)
    27. .sum(1)
    28. value.print()
    29. // ⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇相等
    30. //②reduce
    31. val value: DataStream[(String, Int)] = stream
    32. .map(x => x.split(","))
    33. .flatMap(x => x).map(x => (x, 1))
    34. .keyBy(x => x._1)
    35. .reduce((x, y) => (x._1 + "#" + y._1, x._2 + y._2))
    36. value.print()
    37. //4、输出,又叫下沉
    38. env.execute("sourcetest")
    39. }
    40. }
    (5)kafka加载数据
    1. package nj.zb.kb23.source
    2. import java.util.Properties
    3. import org.apache.flink.api.common.serialization.SimpleStringSchema
    4. import org.apache.flink.streaming.api.scala._
    5. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    7. import org.apache.kafka.clients.consumer.ConsumerConfig
    8. //定义样例类
    9. case class SensorReading(id:String,timestamp:Long,temperature:Double)
    10. object AA {
    11. def main(args: Array[String]): Unit = {
    12. //1、创建环境变量
    13. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    14. //设置并行步 1
    15. env.setParallelism(1)
    16. //2、加载数据源
    17. val prop = new Properties()
    18. prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092")
    19. prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"sensorgroup1")
    20. prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
    21. prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
    22. prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
    23. val stream: DataStream[String] = env.addSource(
    24. new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), prop)
    25. )
    26. val value: DataStream[(String, Int)] = stream.flatMap(x => x.split(" "))
    27. .map(x => (x, 1))
    28. .keyBy(x => x._1)
    29. .reduce((x: (String, Int), y: (String, Int)) => (x._1, x._2 + y._2))
    30. //4、输出,又叫下沉
    31. stream.print()
    32. env.execute("sourcetest")
    33. }
    34. }
    (6)自定义数据源加载数据
    1. package nj.zb.kb23.source
    2. import org.apache.flink.streaming.api.functions.source.SourceFunction
    3. import org.apache.flink.streaming.api.scala._
    4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    5. import scala.util.Random
    6. //定义样例类
    7. case class SensorReading(id:String,timestamp:Long,temperature:Double)
    8. object AA {
    9. def main(args: Array[String]): Unit = {
    10. //1、创建环境变量
    11. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    12. //设置并行步 1
    13. env.setParallelism(1)
    14. //2、加载数据源
    15. val stream: DataStream[SensorReading] = env.addSource(new MySensorSource)
    16. //4、输出,又叫下沉
    17. stream.print()
    18. env.execute("sourcetest")
    19. }
    20. }
    21. //模拟自定义数据源
    22. class MySensorSource extends SourceFunction[SensorReading]{
    23. override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    24. //(1)随机数,true一直生成随机数
    25. val random = new Random()
    26. while (true){
    27. val d: Double = Math.random()
    28. ctx.collect(SensorReading("随机数:"+random.nextInt(),System.currentTimeMillis(),d))
    29. Thread.sleep(1000)
    30. }
    31. }
    32. override def cancel(): Unit = {
    33. }
    34. }

    三、flink运行四大组件

    1、作业管理器jobmanager

    应用程序执行的主过程中,执行应用程序会被jobmanager最先接收,这个应用程序会包括:作业图(jobGraph),逻辑数据流图(logical dataflow graph)和打包了所有的类, 库和其他资源的jar包。jobmanager会向资源管理器请求执行任务必要的资源,也就是任务管理器上的插槽(slot)。一旦它获取了足够的资源,就会将执行图分发到真正运行它们的taskmanager上。在实际运行中,由jobmanager负责协调各项中央操作。

    2、任务管理器taskmanager

    taskmanager是指工作进程。Flink中包含了多个taskmanager,每个taskmanager中又存在着一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。开始运行后,taskmanager中的插槽会被注册给资源管理器,在收到指令后,taskmanager会提供多个插槽任jobmanager调用。jobmanager通过给插槽分配tasks来执行。运行同一应用程序的taskmanager可以子啊执行过程中互相交换数据。

    3、资源管理器resourcemanager

    资源管理器在作业管理器申请插槽资源时,会将空闲插槽的任务管理器分配给作业管理器。如果没有足够的插槽来满足作业管理器的请求时,它会向资源提供平台发起会话,以提供启动taskmanager进程的容器。

    4、分发器 dispatcher
    1. 提供了REST接口,在应用提交时可以跨作业运行。
    2. 在应用被提交执行的情况下,分发器启动将应用提交给jobmanager。
    3. Webui会由dispatcher启动,以便展示和监控作业的执行信息。
    4. 这取决于应用提交运行的方式取决于是否需要dispatche
  • 相关阅读:
    电子学会C/C++编程等级考试2021年06月(一级)真题解析
    重庆新壹汽与一汽集团达成新能源项目战略合作,赋能“碳中和”创造“碳财富”
    vue实战-排序
    有关Git(小白一看就懂)入门版
    【PAT甲级】1006 Sign In and Sign Out
    【leetcode】【初级算法】【字符串9】最长公共前缀
    负载均衡群集
    java计算机毕业设计社区卫生预约挂号系统源码+系统+数据库+lw文档+mybatis+运行部署
    Swagger-go学习笔记
    新手学习c语言_第二部分
  • 原文地址:https://blog.csdn.net/berbai/article/details/134189326