数据格式:arrow 定义了一种在内存中表示tabular data的格式。这种格式特别为数据分析型操作(analytical operation)进行了优化。比如说列式格式(columnar format),能充分利用现代cpu的优势,进行向量化计算(vectorization)。不仅如此,Arrow还定义了IPC格式,序列化内存中的数据,进行网络传输,或者把数据以文件的方式持久化。
开发库:arrow定义的格式是与语言无关的,所以任何语言都能实现Arrow定义的格式。arrow项目为几乎所有的主流编程语言提供了SDK
Arrow其实和protobuf很像,只不过protobuf是为了structured data提供内存表示方式和序列化方案。可是两者的使用场景却很不一样。protobuf主要是序列化structured data,有很多的键值对和非常深的nested structure。arrow序列化的对象主要还是表格状数据。
另外,Arrow定义了一个传输协议,能把多个record batch序列化成一个二进制的字节流,并且把这些字节流反序列化成record batch,从让数据能在不同的进程之间进行交换。
通过将Apache Arrow作为标准的数据交换格式,各种大数据分析系统和应用程序之间的互操作性水平提高了。 标准格式无需花费大量的CPU周期对数据进行序列化和反序列化以在各种格式之间转换数据,而是可以在系统和进程之间以无缝,无摩擦的方式共享数据。 这意味着客户能够相互结合部署这些系统,而不会产生任何开销。
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-memory-netty</artifactId>
- <version>${arrow.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-vector</artifactId>
- <version>${arrow.version}</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.arrow.gandiva/arrow-gandiva -->
- <dependency>
- <groupId>org.apache.arrow.gandiva</groupId>
- <artifactId>arrow-gandiva</artifactId>
- <version>8.0.0</version>
- </dependency>
本文通过DataStream ,将Flink DataStream<Event>映射到Apache Arrow数据格式,并通过文件存储,方便后续Python读写。
数据的存储方式通过Flink CheckPoint 机制,周期性存储,本文暂未实现。
- **
- * Flink DataStream写入数据到Apache Arrow
- */
-
- public class FLinkWriteArrowFile {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // 每 1000ms 开始一次 checkpoint
- env.enableCheckpointing(1000);
-
- // 设置模式为精确一次 (这是默认值)
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
- // 确认 checkpoints 之间的时间会进行 500 ms
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
-
- // Checkpoint 必须在一分钟内完成,否则就会被抛弃
- env.getCheckpointConfig().setCheckpointTimeout(60000);
-
- // 同一时间只允许一个 checkpoint 进行
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-
- // 开启在 job 中止后仍然保留的 externalized checkpoints
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
- SingleOutputStreamOperator<Event> eventStream = env.fromElements(
- new Event("Alice", "./home", 1000L),
- new Event("Bob", "./cart", 2000L),
- new Event("Alice", "./prod?id=1", 5 * 1000L),
- new Event("Cary", "./home", 60 * 1000L),
- new Event("Bob", "./prod?id=3", 90 * 1000L),
- new Event("Alice", "./prod?id=1", 105 * 1000L)
- );
-
- eventStream.addSink(new ArrowSink());
- env.execute();
- // while (true) ;
- }
-
- public static class ArrowSink extends RichSinkFunction<Event> implements CheckpointListener {
-
- private VectorSchemaRoot root;
- private FileOutputStream fileOutputStream;
-
- VarCharVector userVector;
- VarCharVector urlVector;
- BigIntVector timestampVector;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- Field user = new Field("user",
- FieldType.nullable(new ArrowType.Utf8()),/*children*/ null);
-
- Field url = new Field("url",
- FieldType.nullable(new ArrowType.Utf8()),/*children*/ null);
-
- Field timestamp = new Field("timestamp",
- FieldType.nullable(new ArrowType.Int(64, true)),/*children*/ null);
-
- Schema schema = new Schema(asList(user, url, timestamp));
- BufferAllocator allocator = new RootAllocator();
- root = VectorSchemaRoot.create(schema, allocator);
- File file = new File("random_access_file.arrow");
- fileOutputStream = new FileOutputStream(file);
-
- userVector = (VarCharVector) root.getVector("user");
- userVector.allocateNew();
- urlVector = (VarCharVector) root.getVector("url");
- urlVector.allocateNew();
- timestampVector = (BigIntVector) root.getVector("timestamp");
- timestampVector.allocateNew();
- }
-
- @Override
- public void invoke(Event value, Context context) throws Exception {
- int index = userVector.getLastSet() + 1;
- userVector.set(index, value.user.getBytes(StandardCharsets.UTF_8));
- urlVector.set(index, value.url.getBytes(StandardCharsets.UTF_8));
- timestampVector.set(index, value.timestamp);
-
- //todo:增加时间判断
- if (index >= urlVector.getByteCapacity()) {
- ArrowFileWriter writer = new ArrowFileWriter(root, /*provider*/ null, fileOutputStream.getChannel());
- writer.start();
- writer.writeBatch();
- writer.end();
- System.out.println("Record batches written: " + writer.getRecordBlocks().size()
- + ". Number of rows written: " + root.getRowCount());
- }
- }
-
-
- @Override
- public void close() throws Exception {
- super.close();
- fileOutputStream.close();
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- //todo
- // ArrowFileWriter writer = new ArrowFileWriter(root, /*provider*/ null, fileOutputStream.getChannel());
- // writer.start();
- // writer.writeBatch();
- // writer.end();
- // System.out.println("Record batches written: " + writer.getRecordBlocks().size()
- // + ". Number of rows written: " + root.getRowCount());
- }
-
- @Override
- public void notifyCheckpointAborted(long checkpointId) throws Exception {
- // CheckpointListener.super.notifyCheckpointAborted(checkpointId);
- }
- }
- }