• 18.flink kafka使用thrift序列化


    摘要

    最近kafka中有一批被thrift序列化的java对象数据,我用flink去消费。 本来百度找了找发现没有这样的demo,无奈只能去翻官网了,经过三四个小时才跑通,这里记录下详细代码,方便后来人。
    官网
    注意:此模块是datastream 模块的kafka消费方式, 不是Table模块的方式,table模块消费kafka的代码和我这里写的不同。flink DataSet,DataStream,和Tabpe/Sql模块对外部数据的访问都是不同的,假如都访问mysql其代码甚至是mavan依赖可能不同,具体你要去看官网的,百度你看再多也没用。你看下图红色箭头部分。
    在这里插入图片描述

    一.thrift文件

    typedef i16 short
    typedef i32 int
    typedef i64 long
    typedef bool boolean
    typedef string String
    
    struct Person {
        1: optional String username,
        2: optional int age,
        3: optional boolean married
    }
    /*
    * 此文件用于定义序列化存入kafka  topic的数据结构
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    二:flink依赖

     
            org.apache.thrift
            libthrift
            0.14.1
        
        
            org.apache.kafka
            kafka-clients
            2.1.0
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    三:kafka connector 依赖

    	
    			org.apache.flink
    			flink-connector-kafka_2.11
    			1.13.6
    		
    
    • 1
    • 2
    • 3
    • 4
    • 5

    四.反序列化和序列化代码

    序列化用于kafkaproducer, 反序列化用于flink消费

    package com.pg.flink.dataStreame.kafkaConnect.serializer;
    
    
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Serializer;
    import org.apache.thrift.TException;
    import org.apache.thrift.TSerializer;
    import org.apache.thrift.protocol.TBinaryProtocol;
    
    import java.util.Map;
    //这是序列化
    public class ThriftSerializer implements Serializer {
    
    
    	@Override
    	public void configure(Map map, boolean b) {
    
    	}
    
    	@Override
    	public byte[] serialize(String topic, Person person) {
    		try {
    			TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
    			return ser.serialize(person);
    		} catch (TException ex) {
    			throw new SerializationException(
    					"Can't serialize data='" + person + "' for topic='" + topic + "'", ex);
    		}
    	}
    
    	@Override
    	public void close() {
    
    	}
    }
    
    //下面是反序列化
    package com.pg.flink.dataStreame.kafkaConnect.serializer;
    
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.thrift.TDeserializer;
    import org.apache.thrift.protocol.TBinaryProtocol;
    
    import java.util.Arrays;
    import java.util.Map;
    
    //Deserializer是kafka提供的拓展接口
    public class ThriftDeserializer implements Deserializer {
    
    
      @Override
      public void configure(Map configs, boolean isKey) {
    
      }
    
      @Override
      public Person deserialize(String topic, byte[] data) {
        try {
        //TDeserializer 是thrift的反序列化类
          TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
          Person person = new Person();
          deserializer.deserialize(person, data);
    
          return person;
        } catch (Exception ex) {
          throw new SerializationException(
                  "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
        }
      }
    
      @Override
      public void close() {
    
      }
    }
    //下面是kafka生产者,
    //生产者代码和flink程序无关,这里方便你测试
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    五.flink代码

    
    package com.pg.flink.dataStreame.kafkaConnect;
    import com.pg.flink.dataStreame.kafkaConnect.serializer.Person;
    import com.pg.flink.dataStreame.kafkaConnect.serializer.ThriftDeserializer;
    import com.twitter.chill.thrift.TBaseSerializer;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class KafkaMain {
    	public static void main(String[] args) throws Exception {
    		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		env.getConfig().addDefaultKryoSerializer(Person.class, TBaseSerializer.class);
    		KafkaSource source = KafkaSource.builder()
    				.setBootstrapServers("192.168.134.128:9092")
    				.setTopics("kafka-thrift")
    				.setGroupId("pg10")
    				.setStartingOffsets(OffsetsInitializer.earliest())
    				.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(ThriftDeserializer.class))
    				.build();
    		DataStreamSource  ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    		ds.print();
    		env.execute("Flink Batch Java API Skeleton");
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    六.如何退出消费kafka数据

    flink消费kafka的时候保持的是长连接,正常来说该flink程序永远不会退出. 所以说这种情况官方提供了:setBounded(OffsetsInitializer)和setUnbounded(OffsetsInitializer)

    		Map offsets=new HashMap<>();
    		TopicPartition topicPartition = new TopicPartition("kafka-thrift",0);
    		offsets.put(topicPartition,2L);
    			KafkaSource.builder().setUnbounded(OffsetsInitializer.offsets(offsets))
    			流处理模式用:setUnbounded
    			批处理模式用:setBounded
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    你是不是以为这样你的程序就退出了, 我也这么认为实际上并不会,经过我的测试加上这种配置的话只不过source端会停止从kafka获取数据,而flink程序依旧还在。
    所以并不好。

    我觉得官方有点坑了,我最终采用的flag判断抛出异常的方式,比如:

      try {
                ds.map((s)->{
                    if(s.contains("end_flag")){
                        throw new Exception("结束");
                    }else {
                        return s;
                    }
                }).print();
                env.execute("Flink Batch Java API Skeleton");
            }catch (Exception e){
    
                System.out.println("结束");
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    go语法基础二(结构体,协程,锁,xml,io)
    抖音作品评论id提取工具|视频内容提取软件
    24.讲二叉树基础(下):有了如此高效的散列表,为什么还需要二叉树
    设计模式之单例模式
    用全栈智能,联想如何“零故障”支持亚运会?
    Sublime Text教程
    空间参考简介
    通过C语言实现计算机模拟疫情扩散
    前后端、数据库时间格式化方法
    ES6常问面试题(Promise,async和await 等)
  • 原文地址:https://blog.csdn.net/qq_36066039/article/details/126291165