mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.15.1
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.15.1
// 创建一个编程入口环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 批处理的入口环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 流批一体的入口环境
ExecutionConfig config = env.getConfig();
config.setExecutionMode(ExecutionMode.BATCH); // 指定批 或 流处理
// 显式声明为本地运行环境,且带webUI
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 12345);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
source是用来获取外部数据的算子,按照获取数据的方式,可分为:
DataStreamSource<Integer> fromElements = env.fromElements(1, 2, 3, 4, 5);
SingleOutputStreamOperator<String> source = env.socketTextStream("localhost", 9999)
DataStreamSource<String> fileSource = env.readTextFile("data/input/wc.txt", "utf-8");
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
// 设置订阅的目标主题
.setTopics("tp01")
// 设置消费者组id
.setGroupId("gp01")
// 设置kafka服务器地址
.setBootstrapServers("hdp01:9092")
// 起始消费位移的指定:
// OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST) 消费起始位移选择之前所提交的偏移量(如果没有,则重置为LATEST)
// OffsetsInitializer.earliest() 消费起始位移直接选择为 “最早”
// OffsetsInitializer.latest() 消费起始位移直接选择为 “最新”
// OffsetsInitializer.offsets(Map) 消费起始位移选择为:方法所传入的每个分区和对应的起始偏移量
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// 设置value数据的反序列化器
.setValueOnlyDeserializer(new SimpleStringSchema())
// 开启kafka底层消费者的自动位移提交机制
// 它会把最新的消费位移提交到kafka的consumer_offsets中
// 就算把自动位移提交机制开启,KafkaSource依然不依赖自动位移提交机制
// (宕机重启时,优先从flink自己的状态中去获取偏移量<更可靠>)
.setProperty("auto.offset.commit", "true")
// 把本source算子设置成 BOUNDED属性(有界流)
// 将来本source去读取数据的时候,读到指定的位置,就停止读取并退出
// 常用于补数或者重跑某一段历史数据
// .setBounded(OffsetsInitializer.committedOffsets())
// 把本source算子设置成 UNBOUNDED属性(无界流)
// 但是并不会一直读数据,而是达到指定位置就停止读取,但程序不退出
// 主要应用场景:需要从kafka中读取某一段固定长度的数据,然后拿着这段数据去跟另外一个真正的无界流联合处理
//.setUnbounded(OffsetsInitializer.latest())
.build();
// env.addSource(); // 接收的是 SourceFunction接口的 实现类
DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");// 接收的是 Source 接口的实现类
streamSource.print();
DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());
// 自定义类
class MySourceFunction implements SourceFunction<EventLog>{
private volatile boolean flag = true;
@Override
public void run(SourceContext<EventLog> ctx) throws Exception {
EventLog eventLog = new EventLog();
String[] events = {"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"};
HashMap<String, String> eventInfoMap = new HashMap<>();
while(flag){
eventLog.setGuid(RandomUtils.nextLong(1,1000));
eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());
eventLog.setTimeStamp(System.currentTimeMillis());
eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]);
eventInfoMap.put(RandomStringUtils.randomAlphabetic(1),RandomStringUtils.randomAlphabetic(2));
eventLog.setEventInfo(eventInfoMap);
ctx.collect(eventLog);
eventInfoMap.clear();
Thread.sleep(RandomUtils.nextInt(500,1500));
}
}
@Override
public void cancel() {
flag = false;
}
}
从并行度的角度,Source 又可分为并行 Source 和 非并行 Source:
其中自定义source
带 Rich 的,都拥有 open(), close(), getRuntimeContext() 方法
带 Parallel 的,都可多实例并行执行
SingleOutputStreamOperator<UserInfo> beanStream = streamSource.map(json -> JSON.parseObject(json, UserInfo.class));
SingleOutputStreamOperator<UserInfo> filtered = beanStream.filter(bean -> bean.getFriends().size() <= 3);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = upperCased.flatMap((String s, Collector<Tuple2<String, Integer>> collector) -> {
String[] words = s.split("\\s+");
for (String word : words) {
collector.collect(Tuple2.of(word, 1));
}
}).returns(new TypeHint<Tuple2<String, Integer>>() {}); // 通过 TypeHint 传达返回数据类型
// 各性别用户的好友总数
KeyedStream<Tuple2<String, Integer>, String> keyedStream = flatten
.map(bean -> Tuple2.of(bean.getGender(), 1))
.returns(new TypeHint<Tuple2<String, Integer>>() {
})
.keyBy(tp -> tp.f0);
SingleOutputStreamOperator<Tuple2<String, Integer>> genderFriendCount = keyedStream.sum(1);
//max、maxBy 区别: 更新状态的逻辑! max只更新要求最大值的字段; 而 maxBy 会更新整条数据;
tuple4Stream.keyBy(tp -> tp.f2)
/*.max(3); */ // 各性别中,用户好友数量最大值
.maxBy(3); // 求各性别中,好友数量最大的那个人
//需求: 求各性别中,好友数量最大的那个人,而且如果前后两个人的好友数量相同,则输出的结果中,也需要将uid/name等信息更新成后面一条数据的值
SingleOutputStreamOperator<Tuple4<Integer, String, String, Integer>> reduceResult = tuple4Stream.keyBy(tp -> tp.f2)
.reduce(new ReduceFunction<Tuple4<Integer, String, String, Integer>>() {
/**
*
* @param value1 是此前的聚合结果
* @param value2 是本次的新数据
* @return 更新后的聚合结果
* @throws Exception
*/
@Override
public Tuple4<Integer, String, String, Integer> reduce(Tuple4<Integer, String, String, Integer> value1, Tuple4<Integer, String, String, Integer> value2) throws Exception {
if (value1 == null || value2.f3 >= value1.f3) {
return value2;
} else {
return value1;
}
}
});
streamSource.print();
// 构造一个FileSink对象
FileSink<String> rowSink = FileSink
.forRowFormat(new Path("out/filesink/"), new SimpleStringEncoder<String>("utf-8"))
// 文件的滚动策略 (间隔时长10s,或文件大小达到 5M,就进行文件切换
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMillis(10000)) // flink-1.15.0版本 时间控制不生效
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
// 分桶的策略(划分子文件夹的策略)
.withBucketAssigner(new DateTimeBucketAssigner<>())
.withBucketCheckInterval(5)
// 输出文件的文件名相关配置
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("doitedu").withPartSuffix(".txt").build())
.build();
streamSource.map(JSON::toJSONString)
.sinkTo(rowSink) /*Sink 的实现类对象,用 sinkTo()来添加 */
ParquetWriterFactory<EventLog> writerFactory = AvroParquetWriters.forReflectRecord(EventLog.class);// 该方法,传入一个普通的JavaBean类,就可以自动通过反射来生成 Schema
FileSink<EventLog> parquetSink = FileSink
.forBulkFormat(new Path("out/bulksink2"), writerFactory) // 核心点: 要一个parquet文件输出器 writerFactory
.withBucketAssigner(new DateTimeBucketAssigner<>()) // 分桶策略
.withBucketCheckInterval(5) //分桶检查间隔
.withRollingPolicy(OnCheckpointRollingPolicy.build()) // 当 checkpoint发生时,进行文件滚动
.withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".parquet").withPartPrefix("log").build())
.build();
streamSource.sinkTo(parquetSink);
// 1. 构造一个kafka的sink算子
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hdp01:9092,hdp02:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
.setTopic("event-log")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// .setKafkaProducerConfig()
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// 2. 把数据流输出到构造好的sink算子
streamSource.map(JSON::toJSONString)
.sinkTo(kafkaSink);
SinkFunction<EventLog> jdbcSink = JdbcSink.sink(
"insert into t_eventlog values (?,?,?,?,?) on duplicate key update guid=?,sessionId=?,eventId=?,ts=?,eventInfo=? ",
new JdbcStatementBuilder<EventLog>() {
@Override
public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
preparedStatement.setLong(1, eventLog.getGuid());
preparedStatement.setString(2, eventLog.getSessionId());
preparedStatement.setString(3, eventLog.getEventId());
preparedStatement.setLong(4, eventLog.getTimeStamp());
preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));
preparedStatement.setString(6, eventLog.getSessionId());
preparedStatement.setString(7, eventLog.getEventId());
preparedStatement.setLong(8, eventLog.getTimeStamp());
preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3)
.withBatchSize(1)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hdp01:3306/abc?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("123456")
.build()
);
// 输出数据
streamSource.addSink(jdbcSink);
SinkFunction<EventLog> exactlyOnceSink = JdbcSink.exactlyOnceSink(
"insert into t_eventlog values (?,?,?,?,?) on duplicate key update guid=?,sessionId=?,eventId=?,ts=?,eventInfo=? ",
new JdbcStatementBuilder<EventLog>() {
@Override
public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {
preparedStatement.setLong(1, eventLog.getGuid());
preparedStatement.setString(2, eventLog.getSessionId());
preparedStatement.setString(3, eventLog.getEventId());
preparedStatement.setLong(4, eventLog.getTimeStamp());
preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));
preparedStatement.setString(6, eventLog.getSessionId());
preparedStatement.setString(7, eventLog.getEventId());
preparedStatement.setLong(8, eventLog.getTimeStamp());
preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3)
.withBatchSize(1)
.build(),
JdbcExactlyOnceOptions.builder()
// mysql不支持同一个连接上存在并行的多个事务,必须把该参数设置为true
.withTransactionPerConnection(true)
.build(),
new SerializableSupplier<XADataSource>() {
@Override
public XADataSource get() {
// XADataSource就是jdbc连接,不过它是支持分布式事务的连接
// 而且它的构造方法,不同的数据库构造方法不同
MysqlXADataSource xaDataSource = new MysqlXADataSource();
xaDataSource.setUrl("jdbc:mysql://hdp01:3306/abc?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8");
xaDataSource.setUser("root");
xaDataSource.setPassword("123456");
return xaDataSource;
}
}
);
// 输出数据
streamSource.addSink(exactlyOnceSink);