最近kafka中有一批被thrift序列化的java对象数据,我用flink去消费。 本来百度找了找发现没有这样的demo,无奈只能去翻官网了,经过三四个小时才跑通,这里记录下详细代码,方便后来人。
官网
注意:此模块是datastream 模块的kafka消费方式, 不是Table模块的方式,table模块消费kafka的代码和我这里写的不同。flink DataSet,DataStream,和Tabpe/Sql模块对外部数据的访问都是不同的,假如都访问mysql其代码甚至是mavan依赖可能不同,具体你要去看官网的,百度你看再多也没用。你看下图红色箭头部分。
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String
struct Person {
1: optional String username,
2: optional int age,
3: optional boolean married
}
/*
* 此文件用于定义序列化存入kafka topic的数据结构
*/
org.apache.thrift
libthrift
0.14.1
org.apache.kafka
kafka-clients
2.1.0
org.apache.flink
flink-connector-kafka_2.11
1.13.6
序列化用于kafkaproducer, 反序列化用于flink消费
package com.pg.flink.dataStreame.kafkaConnect.serializer;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import java.util.Map;
//这是序列化
public class ThriftSerializer implements Serializer {
@Override
public void configure(Map map, boolean b) {
}
@Override
public byte[] serialize(String topic, Person person) {
try {
TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
return ser.serialize(person);
} catch (TException ex) {
throw new SerializationException(
"Can't serialize data='" + person + "' for topic='" + topic + "'", ex);
}
}
@Override
public void close() {
}
}
//下面是反序列化
package com.pg.flink.dataStreame.kafkaConnect.serializer;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import java.util.Arrays;
import java.util.Map;
//Deserializer是kafka提供的拓展接口
public class ThriftDeserializer implements Deserializer {
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public Person deserialize(String topic, byte[] data) {
try {
//TDeserializer 是thrift的反序列化类
TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
Person person = new Person();
deserializer.deserialize(person, data);
return person;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
@Override
public void close() {
}
}
//下面是kafka生产者,
//生产者代码和flink程序无关,这里方便你测试
package com.pg.flink.dataStreame.kafkaConnect;
import com.pg.flink.dataStreame.kafkaConnect.serializer.Person;
import com.pg.flink.dataStreame.kafkaConnect.serializer.ThriftDeserializer;
import com.twitter.chill.thrift.TBaseSerializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaMain {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().addDefaultKryoSerializer(Person.class, TBaseSerializer.class);
KafkaSource source = KafkaSource.builder()
.setBootstrapServers("192.168.134.128:9092")
.setTopics("kafka-thrift")
.setGroupId("pg10")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(ThriftDeserializer.class))
.build();
DataStreamSource ds = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
ds.print();
env.execute("Flink Batch Java API Skeleton");
}
}
flink消费kafka的时候保持的是长连接,正常来说该flink程序永远不会退出. 所以说这种情况官方提供了:setBounded(OffsetsInitializer)和setUnbounded(OffsetsInitializer)
Map offsets=new HashMap<>();
TopicPartition topicPartition = new TopicPartition("kafka-thrift",0);
offsets.put(topicPartition,2L);
KafkaSource.builder().setUnbounded(OffsetsInitializer.offsets(offsets))
流处理模式用:setUnbounded
批处理模式用:setBounded
你是不是以为这样你的程序就退出了, 我也这么认为实际上并不会,经过我的测试加上这种配置的话只不过source端会停止从kafka获取数据,而flink程序依旧还在。
所以并不好。
我觉得官方有点坑了,我最终采用的flag判断抛出异常的方式,比如:
try {
ds.map((s)->{
if(s.contains("end_flag")){
throw new Exception("结束");
}else {
return s;
}
}).print();
env.execute("Flink Batch Java API Skeleton");
}catch (Exception e){
System.out.println("结束");
}