• flink 自定义序列化对象Sink/Source


    flink接收字节数据

    此处用 rabbitmq做Source
    引入jar包

    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-rabbitmq_2.12</artifactId>
                <version>1.12.0</version>
            </dependency>
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    用rabbit做中间件

    引用flink已经封装好的RMQSource

           public static RMQSource<Byte[]> buildByteExchangeSource(String queueName,String exchange,String routerKey) {
            RMQConnectionConfig config = new RMQConnectionConfig
                    .Builder().setHost("127.0.0.1").setVirtualHost("/")
                    .setPort(9092)
                    .setUserName("uname")
                    .setPassword("pwd")
                    .build();
    // ByteArrayDeserializationSchema   需要自己实现的序列化类                
            RMQSource<Byte[]> queueName1 = new RMQSource<>(config, "queueName", false, new ByteArrayDeserializationSchema());
            return queueName1;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    此处会发现默认实现的RMQSource没有绑定exchange的地方;

    通过源码发现RMQSource是继承了MultipleIdsMessageAcknowledgingSourceBase方法实现了ResultTypeQueryable接口;
    自己可以模仿RMQSource写一个自己的Source,只需要在其open()方法内将通道和指定的exchange进行绑定
    这个是我自己的默认实现
    另外需要 自己的 序列化

    public class ByteArrayDeserializationSchema  extends AbstractDeserializationSchema<Byte[]>{
    
        @Override
        public Byte[] deserialize(byte[] bytesPrim) throws IOException {
            Byte[] bytes = new Byte[bytesPrim.length];
            int i = 0;
            for (byte b : bytesPrim){
                bytes[i++] = b; // Autoboxing
            }
            return bytes;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    flink内部通过自定义对象流转

    引入jar包

            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.6</version>
            </dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    用kafka 做中间件

    entity
    import lombok.Data;
    import java.io.Serializable;
    @Data
    public class SourceBean implements Serializable {
    
        private String action;
        private String device_id;
        private String username;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    schema

    这个是类实现了实体的 序列化方法

    
    import com.google.gson.Gson;
    import org.apache.flink.api.common.serialization.DeserializationSchema;
    import org.apache.flink.api.common.serialization.SerializationSchema;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    
    import java.io.IOException;
    import java.nio.charset.Charset;
    
    public class SourceBeanSchema implements DeserializationSchema<SourceBean>, SerializationSchema<SourceBean> {
    
        private static final Gson gson = new Gson();
    
        @Override
        public SourceBean deserialize(byte[] bytes) throws IOException {
        // gson 作序列化,对于整数会自动添加小数位,可用JSONObject做反序列化
            return gson.fromJson(new String(bytes), SourceBean.class);
        }
    
        @Override
        public boolean isEndOfStream(SourceBean metricEvent) {
            return false;
        }
    
        @Override
        public byte[] serialize(SourceBean metricEvent) {
            return gson.toJson(metricEvent).getBytes(Charset.forName("UTF-8"));
        }
    
        @Override
        public TypeInformation<SourceBean> getProducedType() {
            return TypeInformation.of(SourceBean.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    kafkaUtil
    
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    
    import java.util.Properties;
    
    public class MyKafkaUtil {
    
        public static Properties buildKafkaPProps() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "127.0.0.1:9092");
            props.put("zookeeper.connect", "127.0.0.1:2181");
            props.put("max.request.size", "104857600");
            props.put("max.partition.fetch.bytes", "115343360");
            props.put("ack", "0");
            return props;
        }
    
        public static Properties buildKafkaCProps() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "127.0.0.1:9092");
            props.put("zookeeper.connect", "127.0.0.1:2181");
            props.put("group.id", "groupId");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("auto.offset.reset", "latest");
            props.put("max.request.size", "104857600");
            props.put("max.partition.fetch.bytes", "115343360");
            props.put("ack", "0");
            return props;
        }
    
        //封装FlinkKafkaSink
        public static FlinkKafkaProducer<SourceBean> getBoxSourceSink(String topic) {
            //Kafka连接的一些属性配置
            Properties props = buildKafkaPProps();
            return new FlinkKafkaProducer<SourceBean>(topic,
                    new KafkaSerializationSchemaWrapper(topic, null, false, new SourceBeanSchema()),
                    props,
                    FlinkKafkaProducer.Semantic.NONE,
                    200);
        }
    
        //封装FlinkKafkaSource
        public static FlinkKafkaConsumer<SourceBean> getBoxSourceSource(String topic, String groupId) {
            //Kafka连接的一些属性配置
            Properties props = buildKafkaCProps();
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            return new FlinkKafkaConsumer<SourceBean>(topic, new SourceBeanSchema(), props);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
  • 相关阅读:
    hadoop 3.x大数据集群搭建系列7-安装Hudi
    Django设计ORM模型步骤
    leetcode每天5题-Day10
    《C++设计模式》——结构型
    模拟版图设计工程师要学些什么?从入门到入行,你想知道的都在这里了
    NIFI实现JSON转SQL并插入到数据库表中
    基于PRM(probabilistic roadmaps)算法的机器人路线规划算法matlab仿真
    300道SpringCloud面试题2022(面试题及答案)
    k8s的网络插件Flannel和Calico(the hard way)
    一个故事看懂CPU的SIMD技术
  • 原文地址:https://blog.csdn.net/xuxie13/article/details/126640155