• Flink 写入Apache Arrow


    Apache Arrow是什么


    数据格式:arrow 定义了一种在内存中表示tabular data的格式。这种格式特别为数据分析型操作(analytical operation)进行了优化。比如说列式格式(columnar format),能充分利用现代cpu的优势,进行向量化计算(vectorization)。不仅如此,Arrow还定义了IPC格式,序列化内存中的数据,进行网络传输,或者把数据以文件的方式持久化。

     


    开发库:arrow定义的格式是与语言无关的,所以任何语言都能实现Arrow定义的格式。arrow项目为几乎所有的主流编程语言提供了SDK
    Arrow其实和protobuf很像,只不过protobuf是为了structured data提供内存表示方式和序列化方案。可是两者的使用场景却很不一样。protobuf主要是序列化structured data,有很多的键值对和非常深的nested structure。arrow序列化的对象主要还是表格状数据。

    另外,Arrow定义了一个传输协议,能把多个record batch序列化成一个二进制的字节流,并且把这些字节流反序列化成record batch,从让数据能在不同的进程之间进行交换。

    通过将Apache Arrow作为标准的数据交换格式,各种大数据分析系统和应用程序之间的互操作性水平提高了。 标准格式无需花费大量的CPU周期对数据进行序列化和反序列化以在各种格式之间转换数据,而是可以在系统和进程之间以无缝,无摩擦的方式共享数据。 这意味着客户能够相互结合部署这些系统,而不会产生任何开销。

    Flink简单读写Apache Arrow

     代码依赖

    1. <dependency>
    2. <groupId>org.apache.arrow</groupId>
    3. <artifactId>arrow-memory-netty</artifactId>
    4. <version>${arrow.version}</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.apache.arrow</groupId>
    8. <artifactId>arrow-vector</artifactId>
    9. <version>${arrow.version}</version>
    10. </dependency>
    11. <!-- https://mvnrepository.com/artifact/org.apache.arrow.gandiva/arrow-gandiva -->
    12. <dependency>
    13. <groupId>org.apache.arrow.gandiva</groupId>
    14. <artifactId>arrow-gandiva</artifactId>
    15. <version>8.0.0</version>
    16. </dependency>

    测试代码 

    本文通过DataStream ,将Flink DataStream<Event>映射到Apache Arrow数据格式,并通过文件存储,方便后续Python读写。

    数据的存储方式通过Flink CheckPoint 机制,周期性存储,本文暂未实现。

    1. **
    2. * Flink DataStream写入数据到Apache Arrow
    3. */
    4. public class FLinkWriteArrowFile {
    5. public static void main(String[] args) throws Exception {
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. env.setParallelism(1);
    8. // 每 1000ms 开始一次 checkpoint
    9. env.enableCheckpointing(1000);
    10. // 设置模式为精确一次 (这是默认值)
    11. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    12. // 确认 checkpoints 之间的时间会进行 500 ms
    13. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    14. // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    15. env.getCheckpointConfig().setCheckpointTimeout(60000);
    16. // 同一时间只允许一个 checkpoint 进行
    17. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    18. // 开启在 job 中止后仍然保留的 externalized checkpoints
    19. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    20. SingleOutputStreamOperator<Event> eventStream = env.fromElements(
    21. new Event("Alice", "./home", 1000L),
    22. new Event("Bob", "./cart", 2000L),
    23. new Event("Alice", "./prod?id=1", 5 * 1000L),
    24. new Event("Cary", "./home", 60 * 1000L),
    25. new Event("Bob", "./prod?id=3", 90 * 1000L),
    26. new Event("Alice", "./prod?id=1", 105 * 1000L)
    27. );
    28. eventStream.addSink(new ArrowSink());
    29. env.execute();
    30. // while (true) ;
    31. }
    32. public static class ArrowSink extends RichSinkFunction<Event> implements CheckpointListener {
    33. private VectorSchemaRoot root;
    34. private FileOutputStream fileOutputStream;
    35. VarCharVector userVector;
    36. VarCharVector urlVector;
    37. BigIntVector timestampVector;
    38. @Override
    39. public void open(Configuration parameters) throws Exception {
    40. Field user = new Field("user",
    41. FieldType.nullable(new ArrowType.Utf8()),/*children*/ null);
    42. Field url = new Field("url",
    43. FieldType.nullable(new ArrowType.Utf8()),/*children*/ null);
    44. Field timestamp = new Field("timestamp",
    45. FieldType.nullable(new ArrowType.Int(64, true)),/*children*/ null);
    46. Schema schema = new Schema(asList(user, url, timestamp));
    47. BufferAllocator allocator = new RootAllocator();
    48. root = VectorSchemaRoot.create(schema, allocator);
    49. File file = new File("random_access_file.arrow");
    50. fileOutputStream = new FileOutputStream(file);
    51. userVector = (VarCharVector) root.getVector("user");
    52. userVector.allocateNew();
    53. urlVector = (VarCharVector) root.getVector("url");
    54. urlVector.allocateNew();
    55. timestampVector = (BigIntVector) root.getVector("timestamp");
    56. timestampVector.allocateNew();
    57. }
    58. @Override
    59. public void invoke(Event value, Context context) throws Exception {
    60. int index = userVector.getLastSet() + 1;
    61. userVector.set(index, value.user.getBytes(StandardCharsets.UTF_8));
    62. urlVector.set(index, value.url.getBytes(StandardCharsets.UTF_8));
    63. timestampVector.set(index, value.timestamp);
    64. //todo:增加时间判断
    65. if (index >= urlVector.getByteCapacity()) {
    66. ArrowFileWriter writer = new ArrowFileWriter(root, /*provider*/ null, fileOutputStream.getChannel());
    67. writer.start();
    68. writer.writeBatch();
    69. writer.end();
    70. System.out.println("Record batches written: " + writer.getRecordBlocks().size()
    71. + ". Number of rows written: " + root.getRowCount());
    72. }
    73. }
    74. @Override
    75. public void close() throws Exception {
    76. super.close();
    77. fileOutputStream.close();
    78. }
    79. @Override
    80. public void notifyCheckpointComplete(long checkpointId) throws Exception {
    81. //todo
    82. // ArrowFileWriter writer = new ArrowFileWriter(root, /*provider*/ null, fileOutputStream.getChannel());
    83. // writer.start();
    84. // writer.writeBatch();
    85. // writer.end();
    86. // System.out.println("Record batches written: " + writer.getRecordBlocks().size()
    87. // + ". Number of rows written: " + root.getRowCount());
    88. }
    89. @Override
    90. public void notifyCheckpointAborted(long checkpointId) throws Exception {
    91. // CheckpointListener.super.notifyCheckpointAborted(checkpointId);
    92. }
    93. }
    94. }
  • 相关阅读:
    通过HatchBush对象的()属性可设置HatchBush对象的阴影样式。
    单调栈理论基础 及 力扣:739. 每日温度
    1.8 信息系统服务管理、1.9 信息系统规划
    算法笔记-第七章-链表(未完成)
    如何将项目部署到服务器上(全套教程)
    力扣--第三大的数
    Rust教程6:并发编程和线程通信
    【02】Camunda系列-扩展案例-用户任务、网关、决策自动化
    Apache shiro 漏洞总结
    Axure绘制数字加减器
  • 原文地址:https://blog.csdn.net/wank1259162/article/details/125509197