• 记录一下flume中因为taildir_position.json因位置不对导致数据无法从kafka被采到hdfs上的问题


    【背景说明】

    我需要用flume将kafka上的数据采集到hdfs上,发现数据怎么到不了hdfs。

    【问题排查】

           1.kafka上已有相应的数据

            2.我的flume配置文档(没问题),

            3.时间拦截器(没问题),

            4.JSONObject.class(flume/lib中也已经导入),

            5.f3也能正常启动

    这是我的flume配置文档:

    vim kafka_to_hdfs_db.conf

    1. a1.sources = r1
    2. a1.channels = c1
    3. a1.sinks = k1
    4. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    5. a1.sources.r1.batchSize = 5000
    6. a1.sources.r1.batchDurationMillis = 2000
    7. a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
    8. a1.sources.r1.kafka.topics = topic_db
    9. a1.sources.r1.kafka.consumer.group.id = flume
    10. a1.sources.r1.setTopicHeader = true
    11. a1.sources.r1.topicHeader = topic
    12. a1.sources.r1.interceptors = i1
    13. a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimestampAndTableNameInterceptor$Builder
    14. a1.channels.c1.type = file
    15. a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
    16. a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
    17. a1.channels.c1.maxFileSize = 2146435071
    18. a1.channels.c1.capacity = 1000000
    19. a1.channels.c1.keep-alive = 6
    20. ## sink1
    21. a1.sinks.k1.type = hdfs
    22. a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
    23. a1.sinks.k1.hdfs.filePrefix = db
    24. a1.sinks.k1.hdfs.round = false
    25. a1.sinks.k1.hdfs.rollInterval = 10
    26. a1.sinks.k1.hdfs.rollSize = 134217728
    27. a1.sinks.k1.hdfs.rollCount = 0
    28. a1.sinks.k1.hdfs.fileType = CompressedStream
    29. a1.sinks.k1.hdfs.codeC = gzip
    30. ## 拼装
    31. a1.sources.r1.channels = c1
    32. a1.sinks.k1.channel= c1

    【原因】

    最后发现是我的taildir_position.json之前在/opt/module/flume/jobs/collectionProject路径下(不对),导致flume读不到这个文件的位置,所以无法从kafka将数据读到hdfs

    【解决】

    将这个文件移动到/opt/module/flume的路径下:

    [atguigu@hadoop102 collectionProject]$ mv taildir_position.json /opt/module/flume

    再次启动flume

    [atguigu@hadoop102 flume]$ bin/flume-ng agent -n a1 -c conf/ -f jobs/collectionProject/kafka_to_hdfs_db.conf

    MySQL数据库的增量数据已从kafka成功同步到hdfs~

  • 相关阅读:
    【目标检测论文解读复现NO.34】基于改进 YOLOv5s 的苹果叶片小目标病害轻量化检测方法
    NSSCTF做题(10)
    信息安全实验——口令破解技术
    高等教育学:态度与品德的形成
    Spark获取DataFrame中列的方式--col,$,column,apply
    springboot项目中获取业务功能的导入数据模板文件
    不学51直接学stm32可以吗?学stm32需要哪些基础?
    AndroidStudio 新建工程的基本修改及事件添加
    JVM八股文
    Python代码的编写运行方式简介
  • 原文地址:https://blog.csdn.net/yuanlaishidahuaa/article/details/137978252