• 解决flume采集日志使用KafkaChannel写不到hdfs的问题


    问题:

    在hadoop102中执行lg.sh命令(一个脚本,可以生成日志)后,发现日志没有传输到hdfs

    解决:

    第一步:

    首先,先观察一下我们的zookeeper,kafka,flume,hadoop集群是否启动,要把这些集群都打开来。

    第二步:

    检查一下消费flume所在的集群,也就是hadoop104中的/opt/module/flume/conf目录下的kafka-flume-hdfs.conf文件

    这是我的配置文件信息。供大家参考

    1. ## 组件
    2. a1.sources=r1
    3. a1.channels=c1
    4. a1.sinks=k1
    5. #配置kafkasource
    6. ## source1
    7. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    8. a1.sources.r1.batchSize = 5000
    9. a1.sources.r1.batchDurationMillis = 2000
    10. a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    11. a1.sources.r1.kafka.topics=topic_log
    12. #配置时间拦截器
    13. a1.sources.r1.interceptors = i1
    14. a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
    15. #配置filechannel
    16. a1.channels.c1.type = file
    17. a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    18. a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
    19. ##配置sink1
    20. a1.sinks.k1.type = hdfs
    21. a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    22. a1.sinks.k1.hdfs.filePrefix = log-
    23. a1.sinks.k1.hdfs.round = false
    24. #控制生成的小文件
    25. a1.sinks.k1.hdfs.rollInterval = 10
    26. a1.sinks.k1.hdfs.rollSize = 134217728
    27. a1.sinks.k1.hdfs.rollCount = 0
    28. ## 控制输出文件是原生文件。
    29. a1.sinks.k1.hdfs.fileType = CompressedStream
    30. a1.sinks.k1.hdfs.codeC = lzop
    31. ## 拼装
    32. a1.sources.r1.channels = c1
    33. a1.sinks.k1.channel= c1

    这里注意:在配置时间拦截器的一部分中,我的引用名是com.atguigu.flume.interceptor.TimeStampInterceptor

    也就是我创建后面的maven工程时,用的包名(idea软件包)

    这里,大家的包名可能不一样,根据自己创建的包名进行修改即可。

    第三步:

    创建maven工程,时间拦截器。

    com.atguigu.flume.interceptor包下创建TimeStampInterceptor类,然后实现如下代码

    1. package com.atguigu.flume.interceptor;
    2. import com.alibaba.fastjson.JSONObject;
    3. import org.apache.flume.Context;
    4. import org.apache.flume.Event;
    5. import org.apache.flume.interceptor.Interceptor;
    6. import java.nio.charset.StandardCharsets;
    7. import java.util.ArrayList;
    8. import java.util.List;
    9. import java.util.Map;
    10. public class TimeStampInterceptor implements Interceptor {
    11. private ArrayList events = new ArrayList<>();
    12. @Override
    13. public void initialize() {
    14. }
    15. @Override
    16. public Event intercept(Event event) {
    17. Map headers = event.getHeaders();
    18. String log = new String(event.getBody(), StandardCharsets.UTF_8);
    19. JSONObject jsonObject = JSONObject.parseObject(log);
    20. String ts = jsonObject.getString("ts");
    21. headers.put("timestamp", ts);
    22. return event;
    23. }
    24. @Override
    25. public List intercept(List list) {
    26. events.clear();
    27. for (Event event : list) {
    28. events.add(intercept(event));
    29. }
    30. return events;
    31. }
    32. @Override
    33. public void close() {
    34. }
    35. public static class Builder implements Interceptor.Builder {
    36. @Override
    37. public Interceptor build() {
    38. return new TimeStampInterceptor();
    39. }
    40. @Override
    41. public void configure(Context context) {
    42. }
    43. }
    44. }

    然后将代码打包

    打包过程:

    点击idea右端的maven,双击package,即可打包

     打包后,点击以下步骤

     这时候,会弹出一个目录

    (重点,我就是这里出错了) 然后将这个jar包放到hadoop104的/opt/module/flume/lib文件夹下面,然后分发给hadoop102,hadoop103中。

    最后,重新启动一下集群,就可以了。


    附:(抛出一下异常,说明你的消费flume内存不够,只要修改配置就可以了)

     

     

  • 相关阅读:
    天润融通:AI助手助力Klarna实现多语言客户服务革新
    css中新型的边框设置属性border-inline
    Android12及所有版本解决没有system读写权限(只需要magisk面具)
    Java中Iterator和Iterable的区别
    【SQL报错注入】简介、相关函数、利用方法
    HDFS完全分布式搭建及Hadoop HA集群搭建
    dmp(dump)转储文件
    MindSponge分子动力学模拟——计算单点能(2023.08)
    基于注解的声明式事务
    猿创征文|推荐几款常用开发利器
  • 原文地址:https://blog.csdn.net/JiaXingNashishua/article/details/126114954