• flink学习之广播流与合流操作demo


    广播流是什么?

    将一条数据广播到所有的节点。使用 dataStream.broadCast()

    广播流使用场景?

    一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复

    vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。

    广播流怎么用?

    一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流

    不多说直接上demo,开箱即用

    1. package com.chenchi.broadcast;
    2. import org.apache.flink.api.common.state.BroadcastState;
    3. import org.apache.flink.api.common.state.MapStateDescriptor;
    4. import org.apache.flink.api.common.state.ValueState;
    5. import org.apache.flink.api.common.state.ValueStateDescriptor;
    6. import org.apache.flink.api.common.typeinfo.Types;
    7. import org.apache.flink.configuration.Configuration;
    8. import org.apache.flink.streaming.api.datastream.BroadcastStream;
    9. import org.apache.flink.streaming.api.datastream.DataStream;
    10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    11. import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
    12. import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    13. import org.apache.flink.streaming.api.functions.source.SourceFunction;
    14. import org.apache.flink.util.Collector;
    15. import java.util.HashMap;
    16. import java.util.Random;
    17. public class BroadCastStreamDemo {
    18. public static void main(String[] args) throws Exception {
    19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    20. DataStream patternDataStream = env.addSource(new ChangeSource());
    21. DataStream userDataStream = env.addSource(new CustomerSource());
    22. userDataStream.print("user");
    23. patternDataStream.print("pattern");
    24. //test1 直接合流 不广播。只会在一个节点更新。 用于特殊需求?
    25. // userDataStream
    26. // .keyBy(user -> user.userId)
    27. // .connect(patternDataStream)
    28. // .process(new CustomerSimpleProcess())
    29. // .print();
    30. //test2
    31. // 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
    32. // userDataStream
    33. // .keyBy(user -> user.userId)
    34. // .connect(patternDataStream.broadcast())
    35. // .process(new CustomerSimpleProcess())
    36. // .print();
    37. //test3
    38. MapStateDescriptor bcStateDescriptor = new MapStateDescriptor<>(
    39. "patterns", Types.VOID, Types.POJO(Pattern.class));
    40. //通过描述器 更新
    41. BroadcastStream broadcast = patternDataStream.broadcast(bcStateDescriptor);
    42. userDataStream
    43. .keyBy(user -> user.userId)
    44. .connect(broadcast)
    45. .process(new CustomerBroadCastProcess())
    46. .print();
    47. env.execute();
    48. }
    49. private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction {
    50. @Override
    51. public void processElement(User user, KeyedBroadcastProcessFunction.ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
    52. Integer userVip = user.getVip();
    53. //获取广播流的数据 不是通过map保存
    54. Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);
    55. if (pattern!=null){
    56. Integer patternVip = pattern.vip;
    57. String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;
    58. if (userVip>= patternVip){
    59. result=result+"符合要求";
    60. }else {
    61. result=result+"不符合要求";
    62. }
    63. collector.collect(result);
    64. }else {
    65. System.out.println("pattern is null ");
    66. }
    67. }
    68. @Override
    69. public void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction
    70. User, Pattern, String>.Context context, Collector collector) throws Exception {
    71. BroadcastState bcState = context.getBroadcastState(
    72. new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));
    73. // 将广播状态更新为当前的pattern
    74. bcState.put(null, pattern);
    75. }
    76. }
    77. public static class CustomerSimpleProcess extends CoProcessFunction {
    78. ValueState vip; //这个是保留主流的state的。 不是保留广播流的state
    79. HashMap vipMap;
    80. @Override
    81. public void open(Configuration parameters) throws Exception {
    82. vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));
    83. vipMap=new HashMap();
    84. super.open(parameters);
    85. }
    86. @Override
    87. public void processElement1(User user, CoProcessFunction.Context context, Collector collector) throws Exception {
    88. Integer userVip = user.getVip();
    89. Integer patternVip = vipMap.getOrDefault("vip", 0);
    90. String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;
    91. if (userVip>=patternVip){
    92. result=result+"符合要求";
    93. }else {
    94. result=result+"不符合要求";
    95. }
    96. collector.collect(result);
    97. }
    98. @Override
    99. public void processElement2(Pattern pattern, CoProcessFunction.Context context, Collector collector) throws Exception {
    100. vipMap.put("vip",pattern.vip);
    101. }
    102. }
    103. public static class User {
    104. public Integer userId;
    105. public Integer vip;
    106. public User() {
    107. }
    108. public User(Integer userId, Integer vip) {
    109. this.userId = userId;
    110. this.vip = vip;
    111. }
    112. public Integer getUserId() {
    113. return userId;
    114. }
    115. public void setUserId(Integer userId) {
    116. this.userId = userId;
    117. }
    118. public Integer getVip() {
    119. return vip;
    120. }
    121. public void setVip(Integer vip) {
    122. this.vip = vip;
    123. }
    124. @Override
    125. public String toString() {
    126. return "Action{" +
    127. "userId=" + userId +
    128. ", vip='" + vip + '\'' +
    129. '}';
    130. }
    131. }
    132. // 定义行为模式POJO类,包含先后发生的两个行为
    133. public static class Pattern {
    134. public Integer vip;
    135. public Pattern() {
    136. }
    137. public Pattern(Integer vip) {
    138. this.vip = vip;
    139. }
    140. @Override
    141. public String toString() {
    142. return "Pattern{" +
    143. "vip='" + vip + '\'' +
    144. '}';
    145. }
    146. }
    147. private static class CustomerSource implements SourceFunction {
    148. boolean run = true;
    149. @Override
    150. public void run(SourceContext sourceContext) throws Exception {
    151. while (true) {
    152. Integer userId = new Random().nextInt(1000);
    153. Integer vip = new Random().nextInt(10);
    154. sourceContext.collect(new User(userId, vip));
    155. Thread.sleep(1000);
    156. }
    157. }
    158. @Override
    159. public void cancel() {
    160. run = false;
    161. }
    162. }
    163. private static class ChangeSource implements SourceFunction {
    164. boolean run = true;
    165. @Override
    166. public void run(SourceContext sourceContext) throws Exception {
    167. int i = 1;
    168. while (true) {
    169. sourceContext.collect(new Pattern(i++));
    170. Thread.sleep(5000);
    171. }
    172. }
    173. @Override
    174. public void cancel() {
    175. run = false;
    176. }
    177. }
    178. }

    demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。

    test1 不广播

    注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2

    但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1

    test2 map保存变化数据

    test3通过描述器获取数据

    和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。

  • 相关阅读:
    认识异常【超详细】
    网络安全(黑客)自学
    find /root -type f -mtime +30 -exec rm {} ;的含义
    Lidar和IMU(INS)外参标定----常用开源项目总结
    STP生成树协议详解
    (数组) 1991. 找到数组的中间位置 ——【Leetcode每日一题】
    SpringBoot【整合Shiro】
    数字化时代,数据仓库究竟是干什么的?
    Docker容器-------安装及优化
    【无标题】
  • 原文地址:https://blog.csdn.net/cclovezbf/article/details/132697877