• 【实战-08】flink 消费kafka自定义序列化


    目的

    让从kafka消费出来的数据,直接就转换成我们的对象

    mvn pom

    <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
    
      http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.boke</groupId>
    	<artifactId>Flink1.7.1</artifactId>
    	<version>1.0-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    	<name>Flink Quickstart Job</name>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<flink.version>1.17.1</flink.version>
    		<target.java.version>1.8</target.java.version>
    		<scala.binary.version>2.12</scala.binary.version>
    		<maven.compiler.source>${target.java.version}</maven.compiler.source>
    		<maven.compiler.target>${target.java.version}</maven.compiler.target>
    		<log4j.version>2.17.1</log4j.version>
    	</properties>
    
    	<repositories>
    		<repository>
    			<id>apache.snapshots</id>
    			<name>Apache Development Snapshot Repository</name>
    			<url>https://repository.apache.org/content/repositories/snapshots/</url>
    			<releases>
    				<enabled>false</enabled>
    			</releases>
    			<snapshots>
    				<enabled>true</enabled>
    			</snapshots>
    		</repository>
    	</repositories>
    
    	<dependencies>
    		<!-- Apache Flink dependencies -->
    		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-streaming-java</artifactId>
    			<version>${flink.version}</version>
    <!--			<scope>provided</scope>-->
    		</dependency>
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-clients</artifactId>
    			<version>${flink.version}</version>
    <!--			<scope>provided</scope>-->
    		</dependency>
    		<!-- table 环境依赖【connectors 和 formats 和driver】 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/overview/		-->
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-connector-kafka</artifactId>
    			<version>${flink.version}</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-table-api-java</artifactId>
    			<version>${flink.version}</version>
    <!--			<scope>provided</scope>-->
    		</dependency>
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-table-api-java-bridge</artifactId>
    			<version>${flink.version}</version>
    <!--			<scope>provided</scope>-->
    		</dependency>
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-connector-jdbc</artifactId>
    			<version>3.1.0-1.17</version>
    		</dependency>
    		<dependency>
    			<groupId>mysql</groupId>
    			<artifactId>mysql-connector-java</artifactId>
    			<version>8.0.18</version>
    		</dependency>
    		<!--idea 运行比西甲这个否则报错:【 Make sure a planner module is on the classpath】-->
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-table-planner-loader</artifactId>
    			<version>${flink.version}</version>
    			<!--			<scope>provided</scope>-->
    		</dependency>
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-table-runtime</artifactId>
    			<version>${flink.version}</version>
    			<!--			<scope>provided</scope>-->
    		</dependency>
    		<!--第三方的包-->
    		<dependency>
    			<groupId>com.alibaba</groupId>
    			<artifactId>fastjson</artifactId>
    			<version>1.2.83</version>
    		</dependency>
    		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
    
    		<!-- Example:
    
    		<dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-connector-kafka</artifactId>
    			<version>${flink.version}</version>
    		</dependency>
    		-->
    
    		<!-- Add logging framework, to produce console output when running in the IDE. -->
    		<!-- These dependencies are excluded from the application JAR by default. -->
    		<dependency>
    			<groupId>org.apache.logging.log4j</groupId>
    			<artifactId>log4j-slf4j-impl</artifactId>
    			<version>${log4j.version}</version>
    			<scope>runtime</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.logging.log4j</groupId>
    			<artifactId>log4j-api</artifactId>
    			<version>${log4j.version}</version>
    			<scope>runtime</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.logging.log4j</groupId>
    			<artifactId>log4j-core</artifactId>
    			<version>${log4j.version}</version>
    			<scope>runtime</scope>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    
    			<!-- Java Compiler -->
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-compiler-plugin</artifactId>
    				<version>3.1</version>
    				<configuration>
    					<source>${target.java.version}</source>
    					<target>${target.java.version}</target>
    				</configuration>
    			</plugin>
    
    			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
    			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
    			<plugin>
    				<groupId>org.apache.maven.plugins</groupId>
    				<artifactId>maven-shade-plugin</artifactId>
    				<version>3.1.1</version>
    				<executions>
    					<!-- Run shade goal on package phase -->
    					<execution>
    						<phase>package</phase>
    						<goals>
    							<goal>shade</goal>
    						</goals>
    						<configuration>
    							<createDependencyReducedPom>false</createDependencyReducedPom>
    							<artifactSet>
    								<excludes>
    									<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
    									<exclude>com.google.code.findbugs:jsr305</exclude>
    									<exclude>org.slf4j:*</exclude>
    									<exclude>org.apache.logging.log4j:*</exclude>
    								</excludes>
    							</artifactSet>
    							<filters>
    								<filter>
    									<!-- Do not copy the signatures in the META-INF folder.
    									Otherwise, this might cause SecurityExceptions when using the JAR. -->
    									<artifact>*:*</artifact>
    									<excludes>
    										<exclude>META-INF/*.SF
    										META-INF/*.DSA
    										META-INF/*.RSA
    									
    								
    							
    							
    								
    								
    									com.boke.DataStreamJob
    								
    							
    						
    					
    				
    			
    		
    
    		
    			
    
    				
    				
    					org.eclipse.m2e
    					lifecycle-mapping
    					1.0.0
    					
    						
    							
    								
    									
    										org.apache.maven.plugins
    										maven-shade-plugin
    										[3.1.1,)
    										
    											shade
    										
    									
    									
    										
    									
    								
    								
    									
    										org.apache.maven.plugins
    										maven-compiler-plugin
    										[3.1,)
    										
    											testCompile
    											compile
    										
    									
    									
    										
    									
    								
    							
    						
    					
    				
    			
    		
    	
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256

    核心代码

    package com.boke.kafka;
    
    import com.alibaba.fastjson.JSONObject;
    
    public class Student {
        public String name;
        public Integer age;
    
        public Student(String name, Integer age) {
            this.name = name;
            this.age = age;
        }
    
        public static Student fromJson(String s){
            JSONObject jsonObject = JSONObject.parseObject(s);
            String name = jsonObject.getString("name");
            Integer age = jsonObject.getInteger("age");
            return new Student(name,age);
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Integer getAge() {
            return age;
        }
    
        public void setAge(Integer age) {
            this.age = age;
        }
    }
    
    
    
    //下面是main主函数
    package com.boke.kafka;
    
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    import java.nio.charset.StandardCharsets;
    
    public class kafkaSource {
        public static void main(String[] args) {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            KafkaSource<Student> source = KafkaSource.<Student>builder()
                    .setBootstrapServers("brokers")
                    .setTopics("input-topic")
                    .setGroupId("my-group")
                    .setStartingOffsets(OffsetsInitializer.earliest())//【无论如何都从最早开始消费】
    //                .setStartingOffsets(OffsetsInitializer.latest())//【无论如何都从最新开始消费】
    //                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//【groupid 存在offset 则从offset消费,否则从最早开始消费】
    //                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))//【groupid 存在offset 则从offset消费,否则从最新开始消费】
    
    //                .setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema())))
    //                .setDeserializer(KafkaRecordDeserializationSchema.of(new SimpleStringSchema());
                    .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchema()))
    //                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly())
                    .build();
    
            env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        }
    }
    class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<Student>{
    
    
    
    
        @Override
        public boolean isEndOfStream(Student nextElement) {
            return false;
        }
    
    //Deserializes the Kafka record.
    //Params:
    //record – Kafka record to be deserialized.
    //Returns:
    //The deserialized message as an object (null if the message cannot be deserialized).
        @Override
        public Student deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            /*
             *自定义kafka反序列化
             *如果数据异常,可以直接返回nulll即可,源码中有一句英文:null if the message cannot be deserialized
             * */
            String topic = record.topic();
            long KafkaTimeStamp = record.timestamp();
            int partitionNum = record.partition();
            String value = new String(record.value(), StandardCharsets.UTF_8);
            return Student.fromJson(value);
        }
    
    
    
        @Override
        public TypeInformation<Student> getProducedType() {
            return TypeInformation.of(new TypeHint<Student>() {});
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
  • 相关阅读:
    太赞了,300+图解Pandas,超级用心的教程!
    【力扣面试经典150题】(链表)K 个一组翻转链表
    运动的人需要什么装备?运动健身装备推荐分享
    docker搭建mysql环境
    Android图片涂鸦,Kotlin(1)
    python的深浅copy
    4zhou 舵机
    什么是Nginx?
    java面向对象(上)
    webrtc 安卓端多人视频会议
  • 原文地址:https://blog.csdn.net/qq_36066039/article/details/134252277