• 解决Flume数据采集中出现的几个问题


    问题一: 

    1. 22/06/23 12:08:58 ERROR hdfs.HDFSEventSink: process failed
    2. java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
    3. at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
    4. at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
    5. at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
    6. at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:379)
    7. at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    8. at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    9. at java.lang.Thread.run(Thread.java:748)
    10. 22/06/23 12:08:58 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
    11. org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
    12. at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
    13. at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
    14. at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
    15. at java.lang.Thread.run(Thread.java:748)
    16. Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
    17. at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
    18. at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
    19. at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
    20. at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:379)
    21. ... 3 more
    22. 22/06/23 12:08:58 WARN source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 4000 milliseconds
    23. 22/06/23 12:09:02 INFO avro.ReliableSpoolingFileEventReader: Last read was never committed - resetting mark position.

    分析以上日志可知: 

    1. 22/06/23 12:08:58 ERROR hdfs.HDFSEventSink: process failed    sink进程失败. 

    2. java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null           flume的日志头信息中缺少timestamp. 

    3. 22/06/23 12:08:58 ERROR flume.SinkRunner: Unable to deliver event.      sinkRunner组件无法发送event. 

    4. 22/06/23 12:08:58 WARN source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 4000 milliseconds      channel堆积event满了,现在source无法写入。4秒后再试. 

    从以上可知:   应该是sink的问题。跟timestamp的配置有关.    source和channel没有问题. 

    以上是数据采集的目录,从这里可以看到,有一部分文件已经标识为 COMPLETED了,即source没有问题,但后面有几个文件没有变为COMPLETED,与前面的信息配置分析可知,这是因为channel满了,无法继续读取数据导致. 

    hdfs中一个文件都没有,说明sink工作失败。 

    查看我的配置文件: 

    1. a3.sources = r3
    2. a3.sinks = k3
    3. a3.channels = c3
    4. # Describe/configure the source
    5. a3.sources.r3.type = spooldir
    6. a3.sources.r3.spoolDir = /tmp/upload
    7. a3.sources.r3.fileSuffix = .COMPLETED
    8. a3.sources.r3.fileHeader = true
    9. #a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    10. # Describe the sink
    11. a3.sinks.k3.type = hdfs
    12. a3.sinks.k3.hdfs.path = = hdfs://node1:8020/flume/upload/%Y%m%d/%H
    13. #上传文件的前缀
    14. a3.sinks.k3.hdfs.filePrefix = upload-
    15. #是否按照时间滚动文件夹
    16. a3.sinks.k3.hdfs.round = true
    17. #多少时间单位创建一个新的文件夹
    18. a3.sinks.k3.hdfs.roundValue = 10
    19. #重新定义时间单位
    20. a3.sinks.k3.hdfs.roundUnit = minute
    21. #是否使用本地时间戳
    22. #a3.sinks.k3.hdfs.useLocalTimeStamp = true #关键是这一句被 注释了
    23. # Use a channel which buffers events in memory
    24. a3.channels.c3.type = memory
    25. a3.channels.c3.capacity = 1000
    26. a3.channels.c3.transactionCapacity = 100
    27. # Bind the source and sink to the channel
    28. a3.sources.r3.channels = c3
    29. a3.sinks.k3.channel = c3

    #a3.sinks.k3.hdfs.useLocalTimeStamp = true    这一句话被 注释了, 无法生成timestamp. 

    2. 改好后,运行又出现一错误: 

     java.net.URISyntaxException: Illegal character in scheme name at index 0: = hdfs://node1:8020/flume/upload/20220623/12/upload-.1655957677405.tmp

    这是说hdfs的路径配置的第0个字母的  = 不合法.   然后到配置文件中一查,发现多写了一个  =   . 

    去掉后运行结果如下. 

     

      生成了大量的上传文件, 这是因为 时间间隔没有配置。 再修改sink的配置信息如下:

    1. a3.sources = r3
    2. a3.sinks = k3
    3. a3.channels = c3
    4. # Describe/configure the source
    5. a3.sources.r3.type = spooldir
    6. a3.sources.r3.spoolDir = /tmp/upload
    7. a3.sources.r3.fileSuffix = .COMPLETED
    8. a3.sources.r3.fileHeader = true
    9. #a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
    10. # Describe the sink
    11. a3.sinks.k3.type = hdfs
    12. a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload/%Y%m%d/%H
    13. #上传文件的前缀
    14. a3.sinks.k3.hdfs.filePrefix = upload-
    15. #是否按照时间滚动文件夹
    16. a3.sinks.k3.hdfs.round = true
    17. #多少时间单位创建一个新的文件夹
    18. a3.sinks.k3.hdfs.roundValue = 10
    19. #重新定义时间单位
    20. a3.sinks.k3.hdfs.roundUnit = minute
    21. #是否使用本地时间戳
    22. a3.sinks.k3.hdfs.useLocalTimeStamp = true
    23. #积攒多少个Event才flush到HDFS一次
    24. a3.sinks.k3.hdfs.batchSize = 1000
    25. #设置文件类型,可支持压缩
    26. a3.sinks.k3.hdfs.fileType = DataStream
    27. #多久生成一个新的文件
    28. a3.sinks.k3.hdfs.rollInterval = 600
    29. #设置每个文件的滚动大小大概是128M
    30. a3.sinks.k3.hdfs.rollSize = 134217700
    31. #文件的滚动与Event数量无关
    32. a3.sinks.k3.hdfs.rollCount = 0
    33. #最小冗余数
    34. a3.sinks.k3.hdfs.minBlockReplicas = 1
    35. # Use a channel which buffers events in memory
    36. a3.channels.c3.type = memory
    37. a3.channels.c3.capacity = 1000
    38. a3.channels.c3.transactionCapacity = 1000
    39. # Bind the source and sink to the channel
    40. a3.sources.r3.channels = c3
    41. a3.sinks.k3.channel = c3

     生成的文件数可控. 

     

     

  • 相关阅读:
    postman点code选http出现乱码?
    Heap (mathematics)
    基于springboot的健身管理系统
    统信UOS系统开发笔记(五):安装QtCreator开发IDE中的中文输入环境Fcitx输入法
    常见的安全测试漏洞
    C#爬虫项目实战:如何解决Instagram网站的封禁问题
    ant design vue 实现组件类型推断 vue3,vite,ts
    BaGet搭建Nuget私仓(window10&docker)
    C语言解决约瑟夫环问题
    【LeetCode】775. 全局倒置与局部倒置
  • 原文地址:https://blog.csdn.net/zhangyingchengqi/article/details/125424768