• hdfswriter优化之提高写速度


    一般来说我们的数据都是mysql->hive tpg->hive orcle->hive

    之前的文章介绍过优化读的速度。比如创建索引,增加fetchsize 增加并行。

    那么怎么增加hdfs写的速度呢?

    以orc文件为例。目前我们能控制的无非就是并行写,不能根本的解决问题

    举例 1e的数据,mysql分成10channel,51个tasks,那么在hdfswriter上会同时有10个线程去写orc文件,关键问题在于

    比如mysql读的速度是2w Records/s, hdfs写的速度是1w Records/s 那么不管你怎么切分任务,写的速度永远是导数的瓶颈?

    如何解决?

    首先看datax的orc写的源码

    /**
     * 写orcfile类型文件
     *
     * @param lineReceiver
     * @param config
     * @param fileName
     * @param taskPluginCollector
     */
    public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                           TaskPluginCollector taskPluginCollector) {
       List columns = config.getListConfiguration(Key.COLUMN);
       String compress = config.getString(Key.COMPRESS, null);
       List columnNames = getColumnNames(columns);
       List columnTypeInspectors = getColumnTypeInspectors(columns);
       StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory
             .getStandardStructObjectInspector(columnNames, columnTypeInspectors);
    
       OrcSerde orcSerde = new OrcSerde();
    
       FileOutputFormat outFormat = new OrcOutputFormat();
       if (!"NONE".equalsIgnoreCase(compress) && null != compress) {
          Class codecClass = getCompressCodec(compress);
          if (null != codecClass) {
             outFormat.setOutputCompressorClass(conf, codecClass);
          }
       }
       try {
          RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
          Record record = null;
          while ((record = lineReceiver.getFromReader()) != null) {
             MutablePair, Boolean> transportResult = transportOneRecord(record, columns, taskPluginCollector);
             if (!transportResult.getRight()) {
                writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector));
             }
          }
          writer.close(Reporter.NULL);
       } catch (Exception e) {
          String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
          LOG.error(message);
          Path path = new Path(fileName);
          deleteDir(path.getParent());
          throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
       }
    }

    注意这里它是 

    FileOutputFormat outFormat = new OrcOutputFormat();// 设置 输出格式orc

    RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);

    writer.write(xx,row)

    查看源码outFormat.getRecordWriter() 方法 本质

    然后真正开始写的时候writer.write()

     注意这个方法当write为null的时候,会new一个writer。上文有一个,这里又能new一个,

    是否说明这两个writer都能写orc文件

    那这两个有什么区别呢? 

    根据我上一篇文章写过的

    orc文件的读写及整合hive_cclovezbf的博客-CSDN博客_hive读取orc文件

    writer = OrcFile.createWriter(path, options);

    writer.addRowBatch(batch); 这里是可以批量添加然后写入的,默认是1024条数据写入

    而datax的代码里是来一条写一条,这两种效率谁高谁低就不用说了吧。。

    但是改造起来比较麻烦。。有点懒。。需要的时候再写把。

  • 相关阅读:
    javaEE -9(7000字详解TCP/IP协议)
    kubectl别名配置
    go-zero微服务实战系列(八、如何处理每秒上万次的下单请求)
    视错觉与魔术(一)——经典回顾
    代码随想录算法训练营day56||72. 编辑距离||647. 回文子串 ||516.最长回文子序列
    【Unity基础】6.动画状态机
    Mac下Qt设置应用程序名称--多国语言
    Java Lambda表达式:简洁且强大的函数式编程工具
    WebGL学习(一)渲染关系
    使用 Django ORM 进行数据库操作
  • 原文地址:https://blog.csdn.net/cclovezbf/article/details/126937058