• flink理论干货笔记(2)


    101. 滴滴的流计算平台有三种开发方式,web ide、本地ide、streamSql ide,降低了用户开发使用流计算的门槛。平台包含任务监控(延迟、吞吐量、自定义指标)和告警(钉钉/电话),任务诊断系统(任务日志采集接入ES) 。滴滴的流计算业务有以下四种:实时ETL、实时数据报表、实时业务监控、CEP在线业务。实时网关日志监控主要用到kafka-flink的streaming-es这三个组件

    102. streamsql是滴滴内部的flink sql项目,目的是支持DDL(多格式多数据源)、DML(比如插入流数据到某一张sink表)、常用核心功能点(如group agg、window agg、join、udf、udtf、udaf),其中join包括双流的no-window join,流和维度表的join

    103. 滴滴的streamsql支持分流,比如同时写入hbase和kafka,其中stream sql join,对于维度表,只支持当前表,不支持历史表。stream sql未来会承担90%的流计算任务,另外CEP也会融入stream sql体系。还将进行算子级别的扩容。

    104. 字节经历了Jstorm任务迁移到flink的过程。借鉴了flink-storm,实现了flink-jstorm,支持将jstorm topology 结构转化为flink job

    105. flink在异常容错方面是很苛刻的,因为要保证exactly-once,因此JobManager可能就是个瓶颈,同时也会是单点问题,所以要做HA。相比而言,storm只需要保证at-least-once,因此重启作业即可

    106. 在flink读取kafka时,由于kafka容易出现节点故障或数据均衡时导致leader切换,因此需要做一些优化 

    107. 还需要做一些容灾,比如一个机房的所有节点都挂掉,虽然概率不大,要做多机房部署,以及流热备 

    108. flink1.0.0加入了state api,即ValueState、ReducingState、ListState等,是一个里程碑,用户能像使用java集合那样使用flink state,自动享受到状态的一致性保证。 

    109. flink1.1.0加入了Session Window,且能正确处理乱序的迟到数据,保证结果正确 

    110. flink1.2.0加入了ProcessFunction,一个lower level api,用于实现高级且复杂的功能,包括注册各种State,以及注册定时器(基于事件时间或处理时间),用于开发基于时间的应用程序 

    111. flink1.3.0加入了side output功能,支持多种输出,即除了主流输出外,还包括如异常数据、迟到数据、侧边流等 

    112. flink1.5.0加入了broadcastState,是对state api的扩展,用来存储上游被广播过来的数据。基于这种state能很好解决CEP中的动态规则功能,以及sql中不等值join的场景 

    113. flink1.6.0加入了state ttl功能,以及datastream interval join功能。state ttl实现在申请某个state时可以指定一个生命周期参数(TTL),指定该state过了多久需要被系统自动清除,而不再需要使用ProcessFunction注册Timer来清除了,基于ttl的方式更原生解决该问题。datastream interval join使区间join成为可能。 

    114. 在高层api方面,flink1.0.0加入了table api和cep api,其中table api支持java和scala,类似spark的dataframe api,同时它与sql很像,可以共用。后来在flink1.1.0中对table模块做了重构,同时支持table api和sql,以及代码共用。 

    115. flink1.2.0在table api和sql上支持丰富的内置窗口操作,如Tumbling window、Sliding Window、Session Window 

    116. flink1.3.0提出Dynamic Table概念,使的流批互相转换。流可以是表,表可以是流。Retraction机制是Dynamic Table的基础之一,通过它能实现多级aggregate、多级join,从而保证流式sql的语义和结果正确。同时该版本支持了cep算子的可伸缩性(即改变并发) 

    117. flink1.5.0在table api和sql上支持了join,包括无限流的join和带窗口的join。还添加了sql cli支持,用于交互式查询。 

    118. 在检查点和故障恢复方面,flink1.0.0提供了RocksDB支持,把状态从内存转移到磁盘,避免了fullgc和oom 

    119. flink1.1.0支持RocksDB快照的异步化,避免了阻塞主数据流的处理,提高了吞吐量 

    120. flink1.2.0引入了KeyGroup机制,支持了KeydState和OperatorState的可扩缩性,支持了对带状态的流计算任务改变并发的功能 

    121. flink1.3.0支持了增量检查点功能。标志着flink流计算正式达到生产就绪状态。同时引入细粒度的故障恢复功能,只恢复失败节点的联通子图,不用对整个job进行恢复 

    122. flink1.5.0引入了本地状态恢复机制。该机制会提前将状态文件在本地也备份一份,故障发生时,可以直接在本地恢复,而不用从远程hdfs重新下载,提高了恢复效率 

    123. 在flink runtime方面,flink1.2.0提供了async i/o功能,是阿里贡献的,为了解决与外部系统交互的网络延迟导致的系统瓶颈。比如需要查询外部hbase,异步能同时发起多个请求,且避免了阻塞,提升了吞吐量,提高了cpu使用率 

    124. flink1.3.0引入了HistoryServer模块,当job结束后,会把job的状态都进行归档,方便后续排查 

    125. flink1.4.0提供了端到端的exactly-once语义保证。在该版本之前,exactly-once只保证flink本身,不包括输出给外部系统的部分。该版本基于两阶段提交协议,实现了该功能。内置支持kafka的端到端保证。同时支持自定义实现外部存储的端到端保证。 

    126. flink1.5.0发布了新的部署模型和处理模型,即FLIP6,对核心代码改动很大,是runtime改动最大的一次,可以在yarn、mesos上更好动态分配资源,以及释放资源,提高了资源利用率,以及作业之间的隔离。

    127. 除了FLIP6改进,还对网络栈做了重构,避免上下游多个task共享一个tcp连接,导致task反压,甚至阻塞。为了改进反压机制,提出了基于credit的流量控制,使得流控粒度精细到task级别,有效缓解了反压对吞吐量的影响 

    128. 已知吞吐量at most once > at least once > exactly once,思考:exactly-once在并发量大的时候吞吐是否会明显下降? 以及flink使用RocksDBStateBackend时吞吐较低,该如何优化? 

    129. 回顾spark,spark中默认是批处理,它的每个算子生成一个RDD,所有算子构成一个DAG,然后有了宽窄依赖、shuffle、stage、job、task、血缘、action、transformation等概念,当然RDD衍生出替代品ds和df

    130. 至于spark streaming,是基于spark的批处理来模拟出的流计算,即微批处理,基本概念是Dstream,即离散流,它是数据流的抽象,本质上是把一系列连续到来的RDD,按照批次(即时间窗)划分,每批数据构成新的RDD,传入批处理的DAG中。即每个Dstream==时间窗内的原始RDDs==新的RDD,所以Dstream也有它自己的血缘关系,即Dstream依赖它的父Dstream,从而构成了DstreamGraph,相当于批处理的DAG。每个时间窗提交一次job。而WindowedDstream是滑动窗口,是在Dstream上允许离散化有重叠,有两个参数,窗口长度(即涵盖多少个RDD),和每次滑动间隔。

    131. 由于spark的批处理有内部容错机制,即通过血缘关系重新计算RDD,因此不需要checkpoint,而spark streaming流计算中,当下一时刻的RDD需要前一时刻RDD,就需要checkpoint来切断依赖链,这点和flink是一样的。也就是说,只有流计算才需要检查点。

    132. flink中默认就是流计算。数据流(即事件序列)就是数据的基本模型,它没有表或者数据块那么直观,但可以证明是完全等效的。流可以是无界或者有界的。 

    133. flink用数据流上的变换,也就是算子,来描述数据处理。算子和DAG的概念和spark相同。flink的节点(vertex)和spark的stage相当,这个怎么理解?所以DAG是由stage或vertex构成的,而不是算子? 是的

    134. 思考:spark用宽窄依赖来划分stage,那flink用什么划分vertex? 

    135. flink和spark的DAG执行有个很大区别,flink中事件在一个节点执行完就可以直接输出到下个节点立即处理,消除了不必要的延迟,且所有节点同时运行。啥意思?如果另一个路没执行完,最后汇合,不还是卡住要等吗?相比之下,spark的机制(无论微批还是批处理),都是上游stage处理完才统一开始下游,更符合常理 

    136. flink的流处理,为了提高效率,也可以把多个事件一起传输或计算,这仅仅是执行时的优化,每个算子可以单独决定,而在spark streaming中是强制的 

    137. spark的RDD模型和机器学习迭代训练非常契合,怎么理解? 

    138. structured streaming出来缓解了spark streaming中用户需要自行加一个中间结果表来记住还没完成的窗口计算结果(因为一个批的结果依赖以前的批),但是它的状态维护能力和flink还是不能比

    139. spark中,dataframe在RDD上加了列,变成结构化数据,而dataset在dataframe的列上加了类型。在spark2.0中,dataframe和dataset做了整合,即dataframe=dataset[Row] 

    140. flink默认使用ProcessingTime处理,因此当需要EventTime时,就需要显式设置,如env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    141. watermark用来追踪业务事件,可理解为EventTime世界中的时钟,指示当前处理到什么时刻的数据了。如果数据经过整理,没有乱序,即事件的时间戳是单调递增的,可以将数据的业务时间就当作watermark 

    142. AscendingTimestampExtractor可用来抽取时间戳和watermark的生成,只需传入pojo

    143. 真实业务场景一般都是乱序的数据,要用BoundedOutOfOrdernessTimestampExtractor

    144. 水印和状态在flink应用代码中分别如何体现?
    答:当收到一条数据(如ItemViewCount),就注册一个windowEnd+1的定时器,而当该定时器被触发时,意味着收到了windowEnd+1的水印,即收集了该windowEnd下所有商品的窗口统计值。也就是说,水印是flink底层技术,应用层和定时器触发有关。
    而状态可以体现在ListState中。它用来保存收到的消息,并保证在故障发生时,状态数据不丢失且一致。它是flink提供的类似java的List接口的state api,它集成了框架的checkpoint机制,自动做到了exactly-once语义保证。所以,用到ListState就等于用了flink的状态机制。 

    145. sql api和datastream、dataset的关系?
    A.  sql 基于datastream
    B.  sql 基于dataset
    C.  sql 基于datastream和dataset
    D.  sql 与datastream和dataset并列 

    146. sql api和table api的关系?
    A.  sql基于table
    B.  sql和table并列 

    147. 在老版本中,table&sql同时基于datastream和dataset,而新版本中,table&sql与datastream和dataset并列,即与runtime层之间多了查询处理器层

    148. flink sql job包括source operator、query operator、sink operator这三部分 

    149. select算子作用是对关系进行垂直分割,消去某些列,意思是从tuple或pojo选择某些成员? 

    150. where算子作用是从数据流/集过滤数据,根据某些条件做水平分割,选择符合条件的记录;group by 算子对数据进行分组

    151. union 算子将两个流合并,要求字段类型、顺序完全一致,注意它会去重;union all算子将两个流合并,要求字段类型、顺序完全一致,注意它不会去重 

    152. join算子将两个流的记录联合成宽表,flink支持的join有:join(即inner join)、left join、right join、full join 

    153. flink的开窗方式大的分类有两种,一是overwindow,即每个元素对应一个窗口。二是groupwindow,它包括滑动窗口、滚动窗口、会话窗口。groupwindow除了以上三种bounded window,还支持unbounded window,也就是全局window。 

    154. flink sql既支持overwindow,又支持groupwindow,目前flink sql只能基于时间对groupwindow进行窗口划分? 

    155. overwindow的时间可用processing time或event time,数据集可用bounded 或unbounded,划分方式可用rows或range,这样就有了8种组合类型

    156. bounded rows(属于over window)中between  rowCount  PRECEDING  AND  CURRENT  ROW,当rowCount为2,表示以3个元素作为窗口。而当把rowCount改为UNBOUNDED,即变为unbounded rows 

    157. 注意,在同一时刻到达的元素,可能被分到不同的窗口,rows这点和range不同;另外,按照元素个数划分窗口,的确和按照时间划分(即时间窗),有很大的不同 

    158. bounded range(属于over window)中具有相同时间值的所有元素都视为同一计算行,因此属于同一个窗口。between timeInterval PRECEDING AND CURRENT ROW,当timeInterval为2,表示以3秒的数据作为窗口。即range也是基于时间的,而不是元素个数。当把timeInterval变为UNBOUNDED,即变为unbounded range 

    159. groupwindow的tumble window,即滚动窗口,数据不重叠,写法为group  by  tumble (rowtime, INTERVAL '2' MINUTE),即窗口大小为2分钟 

    160. groupwindow的hop window,即滑动窗口,数据可以有重叠,写法为group by    
    hop(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),表示每5分钟统计近10分钟的数据

    161. groupwindow的session window是没有固定大小的窗口,它不重叠,也没有固定的起止时间,当一段时间没有收到元素就会被关闭,通过配置session gap来指定非活跃周期的时长。写法为group  by  session ( rowtime, INTERVAL  '3'  MINUTE),表示统计连续的两个访问用户之间的访问间隔不超过3分钟的访问量。 

    162. 另外,使用辅助函数
    tumble_start/tumble_end
    hop_start/hop_end
    session_start/session_end
    可以分别指定滚动、滑动、会话的开始结束时间 

    163. flink sql可以按不同地域统计每2分钟的淘宝首页的访问量(pv),只需给出一张淘宝页面访问表。可以用bounded  eventTime Tumble window,那么这算批计算还是流计算?

    164. StreamTableSource通过StreamExecutionEnvironment的addSource获取DataStream,为此需要自定义SourceFunction,并产生watermark,即实现DefinedRowtimeAttributes接口

    165. source function需要自定义,其中datalist参数是Either的Seq,重写run方法,对于datalist的每条数据,如果它是Left,就按时间戳收集(collectWithTimestamp),如果它是Right,就发射水印(emitWatermark) 

    166. 自定义table source,需要实现StreamTableSource,以及DefinedRowtimeAttributes特质,然后重写getDataStream方法,然后执行execEnv.addSource,参数是自定义的source function,后者接受Seq类型的data(它是Left和Right交替的序列),然后设置并行度,最后执行returns,参数为RowTypeInfo类型 

    167. 如果要写入flink内置的csvsink,可以自定义CsvTableSink,它是TableSink的子类 

    168. 主程序包括环境定义,source/sink的注册,以及sql查询的执行。具体包括StreamExecutionEnvironment、TableEnvironment、registerTableSource、registerTableSink、sqlQuery、insertInto等。

    169.  sink虽然指定了一张表,但其实写入了磁盘文件 

    170. flink的类型在flink.api.common.typeinfo中,具体包括TypeInfomation、BasicTypeInfo、BasicArrayTypeInfo、CompositeType、NumericTypeInfo、TupleTypeInfoBase、CRowTypeInfo、PojoTypeInfo、AvroTypeInfo、IntegerTypeInfo、FractionTypeInfo、RowTypeInfo、TupleTypeInfo、CaseClassTypeInfo、OptionTypeInfo、EitherTypeInfo、ListTypeInfo、MapTypeInfo、SqlTimeTypeInfo、GenericTypeInfo等等

    171. TypeInformation是一切类型的公共基类,它和它的子类必须可序列化,因为类型会随着flink作业提交,被传到每个执行节点。 

    172. TypeExtractor可以利用方法签名、子类信息等自动提取和恢复类型信息。但由于java的类型擦除,并不总是有效,有时需要手动处理,比如用returns方法声明返回类型。此外ExecutionEnvironment类的registerType方法,可以向flink注册子类信息

    173. flink ml的api有ml.math.DenseVector、ml.math.SparseVector、ml.math.DenseMatrix、
    ml. math. SparseMatrix 等等

    174. TypeInformation的of方法可以创建类型信息对象。对于非泛型类,可以直接传入Class对象。对于泛型类,需要借助TypeHint来保存泛型类型信息。 

    175. BasicTypeInfo定义了一系列常用类型的快捷方式,如String、Boolean、Byte等,可以直接使用。 

    176. flink common中的Types类提供了与BasicTypeInfo等价的各种类型,也更方便。

    177. 注意,flink table也有Types类,要区分flink common的Types;具体是org.apache.flink.api.common.typeinfo.Types 和org.apache.flink.table.api.Types

    178. 自定义类(如MyTuple)上使用@TypeInfo注解,可以提供原生内存管理,令存储更紧凑,效率更高。后面需要创建相应的TypeInfoFactory子类(如MyTupleTypeInfoFactory)并覆盖createTypeInfo方法,该方法返回值是TypeInformation,为此要自定义该类的子类型(如MyTupleTypeInfo)。而MyTupleTypeInfo中的isBasicType、isTupleType等方法,是为了给TypeExtractor提供决策依据

    179. flink自带很多TypeSerializer子类,此外也可以通过继承来实现自己的序列化器。flink无法序列化的类型,会交给Kryo处理,如果它也无法处理(比如Thrift、Protobuf等第三方类),可以强制使用Avro代替Kryo,即enableForceAvro(),或者为Kryo增加自定义的Serializer来增强它,即addDefaultKryoSerializer()以及registerTypeWithKryoSerializer()。而执行disableGenericTypes()可以完全禁用Kryo,但遇到无法处理的类容易导致异常 

    180. flink 内置的类型系统,虽然强大,但也有一些缺点。首先,lambda函数由于比较特殊,是匿名的,也没有相关的类,其类型信息难以获取(即面向对象的容易,函数式编程的难以解决)。其次,kryo的JavaSerializer存在bug,推荐用org路径的版本,而不是com路径的版本 

    181. 流表二象性、流是表的changeLog、动态表(流计算也可以像批处理一样使用sql来描述,逻辑等价)、先使用批处理进行历史数据的计算,再自动转成流计算任务处理最新的实时数据

    182. flink sql层的最新架构,即query processor,就是阿里提出的,即流和批可以做到复用优化层和算子层,当然流和批也可以各自保留自己独特的优化和算子

    183. 另外,阿里对原来flink sql的Row数据结构改进,提出了BinaryRow,虽然都代表关系数据的一行,但后者使用二进制来存储,也就是任意类型都统一用byte[]表示。好处是,使得对象存储更紧凑,省略掉不必要的序列化和反序列化,避免了不必要的装箱和拆箱,对GC也更友好。最终整个sql层执行效率提高了一倍以上。 

    184. 此外,阿里还引入了更广泛的代码生成技术,通过预先知道算子要处理的数据类型,针对性得生成高效代码,包括排序、聚合等算子。 

    185. 在runtime层,阿里改造了flink的资源调度系统,让flink可以跑在yarn集群上,并重构了master架构,让一个job对应一个master,从此master不再是集群瓶颈。Flip-6架构,让flink资源管理变成可插拔的架构,如今flink无缝运行在yarn/mesos/k8s上,离不开这个架构。 

    186. 在可靠性和稳定性方面,阿里改善了flink的failover机制。对master的failover,原生flink会重启所有job,改进后master的失效不会导致job重启。同时还引入region-based的task failover,尽量减少任何task的failover对用户造成的影响。 

    187. 阿里还提出了增量checkpoint机制,降低了cp成本,以及checkpoint小文件合并,使hdfs namenode减轻了压力。此外,对外部状态访问实行了异步访问,提出了AsyncOperator,让用户在flink job中写异步调用和写"hello world"一样简单,提升了job吞吐量。 

    188. 阿里还引入了更灵活的调度机制,能根据任务之间的依赖关系进行高效调度。在数据shuffle方面,阿里提出了yarn shuffle service,同时改造了shuffle架构,让shuffle变成可插拔的架构。 

    189. 如果说spark的基本模型是rdd,那么flink的基本模型应该是Event,在spark的流计算中,一个窗口内的rdd构成dstream,在flink的流计算中,一个窗口内也有多个Event。spark 的rdd像是一个类或结构体,而flink的记录/元素/事件,像是元组。 

    190. spark rdd是分布式的,同时有分区的概念。而flink event又不是分布式,也没有分区。如何类比? 

    191. spark rdd代表多行数据,而flink event表示一行数据,似乎又无法类比?  

    192. spark rdd是分布式弹性数据集,为批处理服务,所以它和flink的dataset可以类比?所以dataset也构成DAG? 

    193. flink的dataset是否也分transformation和action算子? 

    194. flink的dataset是否是分布式的,即各个节点上有部分数据,或者说分区? 

    195. flink支持远程环境下集群执行,需要加入flink-clients依赖项,比如2.11版,然后通过ExecutionEnvironment的createRemoteEnvironment来创建执行环境,需指定名称、端口和udf的jar包,其中jar包是用户自定义代码,比如FilterFunction 

    196. flink有两种本地环境,即LocalExecutionEnvironment和CollectionEnvironment,其中集合环境的开销较低,因为它不会启动完整的flink运行时。而LocalEnvironment会全面运行flink,包括jobmanager和taskmanager,和集群模式很相似 

    197. 集合环境主要用于自动测试、代码调试,仅适用于jvm堆的小数据,且只有单线程 

    198. flink cep是flink之上实现的复杂事件处理库,允许在无休止的事件流中检测事件模式。需要加入flink-cep_2.11或flink-cep-scala_2.11。所以,它只能是基于流计算,而不是批处理 

    199. flink cep的核心api是CEP.pattern(),它接受一个DataStream和一个Pattern,后者允许你定义要从输入流提取的复杂模式序列,比如begin、where、next、subtype、where、followedBy等。其中where中可以定义SimpleCondition类并重写filter方法。而CEP.pattern得到的流可以继续用select,并结合PatternSelectFunction,然后重写select方法

    200. Pattern api中模式可以是单模式或循环模式,比如a b+ c?  d,其中只有b是循环模式 

     

  • 相关阅读:
    String字符串,FastJson常用操作方法
    [前端基础]typescript安装以及类型拓展
    【RV1106/RV1103】RV1103增加RTL8723BS
    vue2修改组件样式
    使用Typora编辑markdown上传CSDN时图片大小调整麻烦问题
    pytorch分布式数据训练结合学习率周期及混合精度
    玩机搞机---脱离电脑 用手机给手机刷机 解锁bl 获取root的方法教程
    cnpm : 无法加载文件 C:\Users\XXX\AppData\Roaming\npm\cnpm.ps1,因为在此系统上禁止运行脚本——解决办法
    【深度学习实验】卷积神经网络(六):自定义卷积神经网络模型(VGG)实现图片多分类任务
    RabbitMQ快速入门
  • 原文地址:https://blog.csdn.net/fearlesslpp/article/details/126331742