state作用
保留当前key的历史状态。
state用法
ListState
有valueState listState mapstate 。冒失没有setstate
state案例
比如起点的小说不能被下载。别人只能通过截屏,提取文字的方式盗版小说。
起点做了防爬措施。防爬措施如下。
1.如果一个用户1s翻1页,或者速度更快,连续10次,那么就认为用户是机器人。
上述两种情况,用户 不断的发起点击事件
userId=1 click_time=2023-09-07 00:00:00
userId=1 click_time=2023-09-07 00:00:01
我们如何判断1呢?
lastClickState 保留用户上一次的点击时间。
clickcntState 保留用户1s1页连续点击次数。
来一条数据就与上一次的lastClickState去对比,
如果间隔<1s clickcntState就+1
如果>1s clickcntState就置于0
同时判断clickcntState次数是否>=10如果大于就将该userid 输出到sink
来个demo直接说话。
- package com.chenchi.pojo;
-
- public class User {
- public Integer userId;
- public Integer vip;
-
- public long clickTime=System.currentTimeMillis();
- public User() {
- }
-
- public User(Integer userId, Integer vip) {
- this.userId = userId;
- this.vip = vip;
- }
-
- public Integer getUserId() {
- return userId;
- }
-
- public void setUserId(Integer userId) {
- this.userId = userId;
- }
-
- public Integer getVip() {
- return vip;
- }
-
- public void setVip(Integer vip) {
- this.vip = vip;
- }
-
- @Override
- public String toString() {
- return "User{" +
- "userId=" + userId +
- ", vip=" + vip +
- ", clickTime=" + clickTime +
- '}';
- }
-
- public long getClickTime() {
- return clickTime;
- }
-
- public void setClickTime(long clickTime) {
- this.clickTime = clickTime;
- }
- }
- package com.chenchi.pojo;
-
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
- import java.util.Random;
-
- public class UserSource implements SourceFunction
{ - boolean run = true;
- public UserSource(){}
- int randomBound=1000;
- int interval=1000;
- public UserSource(Integer randomBound){
- this.randomBound=randomBound;
- }
- public UserSource(Integer randomBound,int interval){
- this.randomBound=randomBound;
- this.interval=interval;
- }
- @Override
- public void run(SourceContext
sourceContext) throws Exception { - while (true) {
- Integer userId = new Random().nextInt(randomBound);
- Integer vip = new Random().nextInt(10);
- sourceContext.collect(new User(userId, vip));
- Thread.sleep(interval);
- }
- }
-
- @Override
- public void cancel() {
- run = false;
- }
- }
- package com.chenchi.state;
-
- import com.chenchi.pojo.User;
- import com.chenchi.pojo.UserSource;
- import org.apache.flink.api.common.state.ValueState;
- import org.apache.flink.api.common.state.ValueStateDescriptor;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
- import org.apache.flink.util.Collector;
-
- public class ValueStateDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- CustomProcess customProcess = new CustomProcess();
- DataStreamSource<User> userDataStreamSource = env.addSource(new UserSource(4,100));
- userDataStreamSource.print();
- userDataStreamSource
- .keyBy(user->user.userId)
- .process(customProcess)
- .print();
- env.execute();
- }
-
- static class CustomProcess extends KeyedProcessFunction<Integer,User,String> {
- ValueState<Long> lastClickTime;
- ValueState<Integer> cnt;
- @Override
- public void open(Configuration parameters) throws Exception {
- lastClickTime= getRuntimeContext().getState(new ValueStateDescriptor<Long>("click", TypeInformation.of(Long.class)));
- cnt= getRuntimeContext().getState(new ValueStateDescriptor<Integer>("cnt",Integer.class));
- super.open(parameters);
- }
-
- @Override
- public void processElement(User user, KeyedProcessFunction<Integer, User, String>.Context context, Collector<String> collector) throws Exception {
- Integer userId = user.getUserId();
- long clickTime = user.getClickTime();
- Long last = lastClickTime.value();
- Integer value = cnt.value();
- if (value==null){
- value=0;
- }
- if (last!=null&&clickTime-last<1000){
- //点击太快
- cnt.update(value+1);
- }else {
- //之前可能是不喜欢的页数突然点快了 点击慢就置0
- cnt.update(0);
- }
- //保存当前的listState
- lastClickTime.update(clickTime);
- if (cnt.value()>10){
- collector.collect("userId="+userId+",clickCnt="+cnt.value()+",click太快 注意注意");
- }
- }
- }
- }
打印日志
10> User{userId=0, vip=0, clickTime=1694083167883}
11> User{userId=0, vip=4, clickTime=1694083167994}
12> User{userId=2, vip=4, clickTime=1694083168102}
1> User{userId=2, vip=7, clickTime=1694083168212}
2> User{userId=2, vip=3, clickTime=1694083168320}
3> User{userId=1, vip=0, clickTime=1694083168428}
4> User{userId=0, vip=7, clickTime=1694083168537}
5> User{userId=2, vip=3, clickTime=1694083168646}
6> User{userId=2, vip=0, clickTime=1694083168757}
7> User{userId=2, vip=3, clickTime=1694083168866}
8> User{userId=0, vip=9, clickTime=1694083168975}
9> User{userId=0, vip=3, clickTime=1694083169084}
10> User{userId=2, vip=7, clickTime=1694083169191}
11> User{userId=0, vip=7, clickTime=1694083169298}
12> User{userId=0, vip=6, clickTime=1694083169406}
1> User{userId=3, vip=9, clickTime=1694083169513}
2> User{userId=0, vip=4, clickTime=1694083169618}
3> User{userId=3, vip=3, clickTime=1694083169726}
4> User{userId=1, vip=8, clickTime=1694083169833}
5> User{userId=2, vip=1, clickTime=1694083169942}
6> User{userId=3, vip=2, clickTime=1694083170050}
7> User{userId=2, vip=8, clickTime=1694083170158}
8> User{userId=1, vip=4, clickTime=1694083170267}
9> User{userId=1, vip=2, clickTime=1694083170374}
10> User{userId=0, vip=1, clickTime=1694083170481}
11> User{userId=3, vip=6, clickTime=1694083170589}
12> User{userId=0, vip=9, clickTime=1694083170696}
1> User{userId=3, vip=1, clickTime=1694083170804}
2> User{userId=1, vip=8, clickTime=1694083170911}
3> User{userId=1, vip=3, clickTime=1694083171018}
4> User{userId=0, vip=7, clickTime=1694083171126}
5> User{userId=1, vip=8, clickTime=1694083171233}
6> User{userId=3, vip=5, clickTime=1694083171341}
7> User{userId=0, vip=8, clickTime=1694083171448}
9> userId=0,clickCnt=11,click太快 注意注意
效果符合预期。