1、首先导入依赖:
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.2</version>
2、 source:Flink从Kafka中读取数据
public class Demo01KafkaSource {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource source = KafkaSource.builder()
.setBootstrapServers("master:9092,node1:9092,node2:9092")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
DataStreamSource kafkaSourceDS = env.
fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
启动生产kafka:
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
3、sink:Flink向Kafka中写入数据
public class Demo02KafkaSink {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource studentDS = env.readTextFile("flink/data/students.txt");
KafkaSink sink = KafkaSink.builder()
.setBootstrapServers("master:9092,node1:9092,node2:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new SimpleStringSchema())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
启动消费kafka:
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic student