问题一:
- 22/06/23 12:08:58 ERROR hdfs.HDFSEventSink: process failed
- java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
- at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
- at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
- at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
- at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:379)
- at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
- at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
- at java.lang.Thread.run(Thread.java:748)
- 22/06/23 12:08:58 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
- org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
- at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
- at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
- at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
- at java.lang.Thread.run(Thread.java:748)
- Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
- at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
- at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
- at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
- at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:379)
- ... 3 more
- 22/06/23 12:08:58 WARN source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 4000 milliseconds
- 22/06/23 12:09:02 INFO avro.ReliableSpoolingFileEventReader: Last read was never committed - resetting mark position.
分析以上日志可知:
1. 22/06/23 12:08:58 ERROR hdfs.HDFSEventSink: process failed sink进程失败.
2. java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null flume的日志头信息中缺少timestamp.
3. 22/06/23 12:08:58 ERROR flume.SinkRunner: Unable to deliver event. sinkRunner组件无法发送event.
4. 22/06/23 12:08:58 WARN source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 4000 milliseconds channel堆积event满了,现在source无法写入。4秒后再试.
从以上可知: 应该是sink的问题。跟timestamp的配置有关. source和channel没有问题.

以上是数据采集的目录,从这里可以看到,有一部分文件已经标识为 COMPLETED了,即source没有问题,但后面有几个文件没有变为COMPLETED,与前面的信息配置分析可知,这是因为channel满了,无法继续读取数据导致.

hdfs中一个文件都没有,说明sink工作失败。
查看我的配置文件:
- a3.sources = r3
- a3.sinks = k3
- a3.channels = c3
-
- # Describe/configure the source
- a3.sources.r3.type = spooldir
- a3.sources.r3.spoolDir = /tmp/upload
- a3.sources.r3.fileSuffix = .COMPLETED
- a3.sources.r3.fileHeader = true
- #a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
-
- # Describe the sink
- a3.sinks.k3.type = hdfs
- a3.sinks.k3.hdfs.path = = hdfs://node1:8020/flume/upload/%Y%m%d/%H
- #上传文件的前缀
- a3.sinks.k3.hdfs.filePrefix = upload-
- #是否按照时间滚动文件夹
- a3.sinks.k3.hdfs.round = true
- #多少时间单位创建一个新的文件夹
- a3.sinks.k3.hdfs.roundValue = 10
- #重新定义时间单位
- a3.sinks.k3.hdfs.roundUnit = minute
- #是否使用本地时间戳
- #a3.sinks.k3.hdfs.useLocalTimeStamp = true #关键是这一句被 注释了
-
-
- # Use a channel which buffers events in memory
- a3.channels.c3.type = memory
- a3.channels.c3.capacity = 1000
- a3.channels.c3.transactionCapacity = 100
-
- # Bind the source and sink to the channel
- a3.sources.r3.channels = c3
- a3.sinks.k3.channel = c3
#a3.sinks.k3.hdfs.useLocalTimeStamp = true 这一句话被 注释了, 无法生成timestamp.
2. 改好后,运行又出现一错误:

java.net.URISyntaxException: Illegal character in scheme name at index 0: = hdfs://node1:8020/flume/upload/20220623/12/upload-.1655957677405.tmp
这是说hdfs的路径配置的第0个字母的 = 不合法. 然后到配置文件中一查,发现多写了一个 = .
去掉后运行结果如下.

生成了大量的上传文件, 这是因为 时间间隔没有配置。 再修改sink的配置信息如下:
- a3.sources = r3
- a3.sinks = k3
- a3.channels = c3
-
- # Describe/configure the source
- a3.sources.r3.type = spooldir
- a3.sources.r3.spoolDir = /tmp/upload
- a3.sources.r3.fileSuffix = .COMPLETED
- a3.sources.r3.fileHeader = true
- #a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
-
- # Describe the sink
- a3.sinks.k3.type = hdfs
- a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload/%Y%m%d/%H
- #上传文件的前缀
- a3.sinks.k3.hdfs.filePrefix = upload-
- #是否按照时间滚动文件夹
- a3.sinks.k3.hdfs.round = true
- #多少时间单位创建一个新的文件夹
- a3.sinks.k3.hdfs.roundValue = 10
- #重新定义时间单位
- a3.sinks.k3.hdfs.roundUnit = minute
- #是否使用本地时间戳
- a3.sinks.k3.hdfs.useLocalTimeStamp = true
- #积攒多少个Event才flush到HDFS一次
- a3.sinks.k3.hdfs.batchSize = 1000
- #设置文件类型,可支持压缩
- a3.sinks.k3.hdfs.fileType = DataStream
- #多久生成一个新的文件
- a3.sinks.k3.hdfs.rollInterval = 600
- #设置每个文件的滚动大小大概是128M
- a3.sinks.k3.hdfs.rollSize = 134217700
- #文件的滚动与Event数量无关
- a3.sinks.k3.hdfs.rollCount = 0
- #最小冗余数
- a3.sinks.k3.hdfs.minBlockReplicas = 1
-
- # Use a channel which buffers events in memory
- a3.channels.c3.type = memory
- a3.channels.c3.capacity = 1000
- a3.channels.c3.transactionCapacity = 1000
-
- # Bind the source and sink to the channel
- a3.sources.r3.channels = c3
- a3.sinks.k3.channel = c3
生成的文件数可控.
