• hdfswriter优化之提高写速度


    一般来说我们的数据都是mysql->hive tpg->hive orcle->hive

    之前的文章介绍过优化读的速度。比如创建索引,增加fetchsize 增加并行。

    那么怎么增加hdfs写的速度呢?

    以orc文件为例。目前我们能控制的无非就是并行写,不能根本的解决问题

    举例 1e的数据,mysql分成10channel,51个tasks,那么在hdfswriter上会同时有10个线程去写orc文件,关键问题在于

    比如mysql读的速度是2w Records/s, hdfs写的速度是1w Records/s 那么不管你怎么切分任务,写的速度永远是导数的瓶颈?

    如何解决?

    首先看datax的orc写的源码

    /**
     * 写orcfile类型文件
     *
     * @param lineReceiver
     * @param config
     * @param fileName
     * @param taskPluginCollector
     */
    public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                           TaskPluginCollector taskPluginCollector) {
       List columns = config.getListConfiguration(Key.COLUMN);
       String compress = config.getString(Key.COMPRESS, null);
       List columnNames = getColumnNames(columns);
       List columnTypeInspectors = getColumnTypeInspectors(columns);
       StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory
             .getStandardStructObjectInspector(columnNames, columnTypeInspectors);
    
       OrcSerde orcSerde = new OrcSerde();
    
       FileOutputFormat outFormat = new OrcOutputFormat();
       if (!"NONE".equalsIgnoreCase(compress) && null != compress) {
          Class codecClass = getCompressCodec(compress);
          if (null != codecClass) {
             outFormat.setOutputCompressorClass(conf, codecClass);
          }
       }
       try {
          RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
          Record record = null;
          while ((record = lineReceiver.getFromReader()) != null) {
             MutablePair, Boolean> transportResult = transportOneRecord(record, columns, taskPluginCollector);
             if (!transportResult.getRight()) {
                writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector));
             }
          }
          writer.close(Reporter.NULL);
       } catch (Exception e) {
          String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
          LOG.error(message);
          Path path = new Path(fileName);
          deleteDir(path.getParent());
          throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
       }
    }

    注意这里它是 

    FileOutputFormat outFormat = new OrcOutputFormat();// 设置 输出格式orc

    RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);

    writer.write(xx,row)

    查看源码outFormat.getRecordWriter() 方法 本质

    然后真正开始写的时候writer.write()

     注意这个方法当write为null的时候,会new一个writer。上文有一个,这里又能new一个,

    是否说明这两个writer都能写orc文件

    那这两个有什么区别呢? 

    根据我上一篇文章写过的

    orc文件的读写及整合hive_cclovezbf的博客-CSDN博客_hive读取orc文件

    writer = OrcFile.createWriter(path, options);

    writer.addRowBatch(batch); 这里是可以批量添加然后写入的,默认是1024条数据写入

    而datax的代码里是来一条写一条,这两种效率谁高谁低就不用说了吧。。

    但是改造起来比较麻烦。。有点懒。。需要的时候再写把。

  • 相关阅读:
    达梦:开启sql日志记录
    Python版中秋佳节月饼抢购脚本
    热缩膜机效果不好怎么办?
    Unity Ugui 顶点颜色赋值
    【数据结构】单链表
    智能答题功能,CRMEB知识付费系统必须有!
    sqli-labs注入方法总结
    链接装载与库:第六章——可执行文件的装载与进程
    五大步骤实现MapGIS Web 功能服务拓展
    Reggie外卖项目 —— 移动端小程序之手机验证码登录
  • 原文地址:https://blog.csdn.net/cclovezbf/article/details/126937058