• flink学习之state


    state作用

    保留当前key的历史状态。

    state用法

    ListState vipList = getRuntimeContext().getListState(new ListStateDescriptor("vipList", TypeInformation.of(Integer.class)));

    有valueState listState mapstate 。冒失没有setstate

    state案例

    比如起点的小说不能被下载。别人只能通过截屏,提取文字的方式盗版小说。

    起点做了防爬措施。防爬措施如下。

    1.如果一个用户1s翻1页,或者速度更快,连续10次,那么就认为用户是机器人。

    上述两种情况,用户 不断的发起点击事件 

    userId=1 click_time=2023-09-07 00:00:00

    userId=1 click_time=2023-09-07 00:00:01

    我们如何判断1呢?

    lastClickState 保留用户上一次的点击时间。

    clickcntState  保留用户1s1页连续点击次数。

    来一条数据就与上一次的lastClickState去对比,

    如果间隔<1s clickcntState就+1

    如果>1s  clickcntState就置于0

    同时判断clickcntState次数是否>=10如果大于就将该userid 输出到sink

    来个demo直接说话。

    1. package com.chenchi.pojo;
    2. public class User {
    3. public Integer userId;
    4. public Integer vip;
    5. public long clickTime=System.currentTimeMillis();
    6. public User() {
    7. }
    8. public User(Integer userId, Integer vip) {
    9. this.userId = userId;
    10. this.vip = vip;
    11. }
    12. public Integer getUserId() {
    13. return userId;
    14. }
    15. public void setUserId(Integer userId) {
    16. this.userId = userId;
    17. }
    18. public Integer getVip() {
    19. return vip;
    20. }
    21. public void setVip(Integer vip) {
    22. this.vip = vip;
    23. }
    24. @Override
    25. public String toString() {
    26. return "User{" +
    27. "userId=" + userId +
    28. ", vip=" + vip +
    29. ", clickTime=" + clickTime +
    30. '}';
    31. }
    32. public long getClickTime() {
    33. return clickTime;
    34. }
    35. public void setClickTime(long clickTime) {
    36. this.clickTime = clickTime;
    37. }
    38. }
    1. package com.chenchi.pojo;
    2. import org.apache.flink.streaming.api.functions.source.SourceFunction;
    3. import java.util.Random;
    4. public class UserSource implements SourceFunction {
    5. boolean run = true;
    6. public UserSource(){}
    7. int randomBound=1000;
    8. int interval=1000;
    9. public UserSource(Integer randomBound){
    10. this.randomBound=randomBound;
    11. }
    12. public UserSource(Integer randomBound,int interval){
    13. this.randomBound=randomBound;
    14. this.interval=interval;
    15. }
    16. @Override
    17. public void run(SourceContext sourceContext) throws Exception {
    18. while (true) {
    19. Integer userId = new Random().nextInt(randomBound);
    20. Integer vip = new Random().nextInt(10);
    21. sourceContext.collect(new User(userId, vip));
    22. Thread.sleep(interval);
    23. }
    24. }
    25. @Override
    26. public void cancel() {
    27. run = false;
    28. }
    29. }
    1. package com.chenchi.state;
    2. import com.chenchi.pojo.User;
    3. import com.chenchi.pojo.UserSource;
    4. import org.apache.flink.api.common.state.ValueState;
    5. import org.apache.flink.api.common.state.ValueStateDescriptor;
    6. import org.apache.flink.api.common.typeinfo.TypeInformation;
    7. import org.apache.flink.configuration.Configuration;
    8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    10. import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    11. import org.apache.flink.util.Collector;
    12. public class ValueStateDemo {
    13. public static void main(String[] args) throws Exception {
    14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    15. CustomProcess customProcess = new CustomProcess();
    16. DataStreamSource<User> userDataStreamSource = env.addSource(new UserSource(4,100));
    17. userDataStreamSource.print();
    18. userDataStreamSource
    19. .keyBy(user->user.userId)
    20. .process(customProcess)
    21. .print();
    22. env.execute();
    23. }
    24. static class CustomProcess extends KeyedProcessFunction<Integer,User,String> {
    25. ValueState<Long> lastClickTime;
    26. ValueState<Integer> cnt;
    27. @Override
    28. public void open(Configuration parameters) throws Exception {
    29. lastClickTime= getRuntimeContext().getState(new ValueStateDescriptor<Long>("click", TypeInformation.of(Long.class)));
    30. cnt= getRuntimeContext().getState(new ValueStateDescriptor<Integer>("cnt",Integer.class));
    31. super.open(parameters);
    32. }
    33. @Override
    34. public void processElement(User user, KeyedProcessFunction<Integer, User, String>.Context context, Collector<String> collector) throws Exception {
    35. Integer userId = user.getUserId();
    36. long clickTime = user.getClickTime();
    37. Long last = lastClickTime.value();
    38. Integer value = cnt.value();
    39. if (value==null){
    40. value=0;
    41. }
    42. if (last!=null&&clickTime-last<1000){
    43. //点击太快
    44. cnt.update(value+1);
    45. }else {
    46. //之前可能是不喜欢的页数突然点快了 点击慢就置0
    47. cnt.update(0);
    48. }
    49. //保存当前的listState
    50. lastClickTime.update(clickTime);
    51. if (cnt.value()>10){
    52. collector.collect("userId="+userId+",clickCnt="+cnt.value()+",click太快 注意注意");
    53. }
    54. }
    55. }
    56. }

    打印日志

    10> User{userId=0, vip=0, clickTime=1694083167883}
    11> User{userId=0, vip=4, clickTime=1694083167994}
    12> User{userId=2, vip=4, clickTime=1694083168102}
    1> User{userId=2, vip=7, clickTime=1694083168212}
    2> User{userId=2, vip=3, clickTime=1694083168320}
    3> User{userId=1, vip=0, clickTime=1694083168428}
    4> User{userId=0, vip=7, clickTime=1694083168537}
    5> User{userId=2, vip=3, clickTime=1694083168646}
    6> User{userId=2, vip=0, clickTime=1694083168757}
    7> User{userId=2, vip=3, clickTime=1694083168866}
    8> User{userId=0, vip=9, clickTime=1694083168975}
    9> User{userId=0, vip=3, clickTime=1694083169084}
    10> User{userId=2, vip=7, clickTime=1694083169191}
    11> User{userId=0, vip=7, clickTime=1694083169298}
    12> User{userId=0, vip=6, clickTime=1694083169406}
    1> User{userId=3, vip=9, clickTime=1694083169513}
    2> User{userId=0, vip=4, clickTime=1694083169618}
    3> User{userId=3, vip=3, clickTime=1694083169726}
    4> User{userId=1, vip=8, clickTime=1694083169833}
    5> User{userId=2, vip=1, clickTime=1694083169942}
    6> User{userId=3, vip=2, clickTime=1694083170050}
    7> User{userId=2, vip=8, clickTime=1694083170158}
    8> User{userId=1, vip=4, clickTime=1694083170267}
    9> User{userId=1, vip=2, clickTime=1694083170374}
    10> User{userId=0, vip=1, clickTime=1694083170481}
    11> User{userId=3, vip=6, clickTime=1694083170589}
    12> User{userId=0, vip=9, clickTime=1694083170696}
    1> User{userId=3, vip=1, clickTime=1694083170804}
    2> User{userId=1, vip=8, clickTime=1694083170911}
    3> User{userId=1, vip=3, clickTime=1694083171018}
    4> User{userId=0, vip=7, clickTime=1694083171126}
    5> User{userId=1, vip=8, clickTime=1694083171233}
    6> User{userId=3, vip=5, clickTime=1694083171341}
    7> User{userId=0, vip=8, clickTime=1694083171448}
    9> userId=0,clickCnt=11,click太快 注意注意

    效果符合预期。 

  • 相关阅读:
    第十三章《集合》第5节:Map集合
    OCR表格识别(三)——文本检测与文本识别理论学习
    【JavaWeb】Servlet身份验证过滤器
    Python + Django4 搭建个人博客(十四):实现用户登录和登出功能
    数据结构与算法 #18 下跳棋,极富想象力的同向双指针模拟
    mdadm命令详解及实验过程
    【BERT-多标签文本分类实战】之七——训练-评估-测试与运行主程序
    soildwork2022怎么恢复软件界面的默认设置?
    [附源码]计算机毕业设计springboot医院门诊管理信息系统
    Python小练习三
  • 原文地址:https://blog.csdn.net/cclovezbf/article/details/132743540