• flume数据无法发送


    关注 码龄 粉丝数 原力等级 -- 被采纳 被点赞 采纳率 废柴太乙 2024-05-12 18:28 采纳率: 0% 浏览 16 首页/ 大数据 / flume数据无法发送 flumekafka大数据 请问FlumeRpcClientExample运行时遇到这种问题该怎么解决? rg.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 0.0.0.0, port: 44444 }: Failed to send event at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:249) at org.example.readcsv.FlumeRpcClientExample.lambda$sendCSVFile$0(FlumeRpcClientExample.java:50) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 0.0.0.0, port: 44444 }: Avro RPC call returned Status: FAILED at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:390) at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:296) at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:237) ... 6 more 0 FlumeRpcClientExample的代码,数据无法发送,到while (!executor.isTerminated()) {} // 等待所有任务完成,就不能正常运行,求各位大神帮帮忙。 package org.example.readcsv; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import java.io.*; import java.nio.file.*; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FlumeRpcClientExample { private static final String FLUME_HOST = "0.0.0.0"; private static final int[] FLUME_PORTS = {44444, 44445, 44446}; // 替换为Flume监听的端口 private static final String FLUME_HEADERS = ""; // 添加需要的Flume头部信息 public static void main(String[] args) { String folderPath = "E:\\app\\dist"; sendCSVFile(folderPath); } private static void sendCSVFile(String folderPath) { try { File folder = new File(folderPath); File[] listOfFiles = folder.listFiles(); RpcClient[] clients = new RpcClient[3]; clients[0]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[0]); clients[1]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[1]); clients[2]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[2]); // 首先发送所有已经有的数据 int count; int total=0; ExecutorService executor = Executors.newFixedThreadPool(30); // 创建一个固定大小的线程池 for (File file : listOfFiles) { if (file.isFile() && file.getName().endsWith(".csv")) { executor.submit(() -> { // 在一个新的线程中执行 int count_port = 0; try { BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "gbk")); String line = reader.readLine(); count_port = 0; while (line != null) { RpcClient client = clients[count_port % FLUME_PORTS.length]; Event flumeEvent = EventBuilder.withBody(line.getBytes()); client.append(flumeEvent); line = reader.readLine(); count_port++; } reader.close(); } catch (Exception e) { e.printStackTrace(); } System.out.println(count_port); }); } } executor.shutdown(); // 关闭线程池 while (!executor.isTerminated()) {} // 等待所有任务完成 System.out.println(total); for (File file : listOfFiles) { if (file.isFile() && file.getName().endsWith(".csv")) { BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "gbk")); String line = reader.readLine(); count=0; while (line != null) { System.out.println("当前的数据: " + line); RpcClient client = clients[count % FLUME_PORTS.length]; Event flumeEvent = EventBuilder.withBody(line.getBytes()); client.append(flumeEvent); line = reader.readLine(); count++; total++; } reader.close(); } System.out.println(total); } WatchService watchService = FileSystems.getDefault().newWatchService(); // 注册要监视的路径和事件类型到WatchService对象中 Path dirPath = folder.toPath(); dirPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY); // 创建一个Map对象,用于存储每个文件的BufferedReader和上一次长度 Map readers = new HashMap<>(); Map lastLengths = new HashMap<>(); while (true) { WatchKey watchKey = watchService.take(); // 获取一个WatchEventList对象 for (WatchEvent event : watchKey.pollEvents()) { // 判断事件类型 if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { // 如果是文件修改事件,则读取文件的新内容 Path filePath = dirPath.resolve((Path) event.context()); if (filePath.toString().endsWith(".csv")) { File file = filePath.toFile(); long length = file.length(); if (!readers.containsKey(filePath.toString())) { // 如果是新的文件,则创建一个新的BufferedReader对象 RandomAccessFile raf = new RandomAccessFile(file, "r"); readers.put(filePath.toString(), new BufferedReader(new InputStreamReader(new FileInputStream(raf.getFD()), "GBK"))); lastLengths.put(filePath.toString(), 0L); } BufferedReader reader; long lastLength = lastLengths.get(filePath.toString()); if (length > lastLength) { // 文件有新增的数据 RandomAccessFile raf = new RandomAccessFile(file, "r"); raf.seek(lastLength); // 将文件指针移动到上一次读取的位置 reader = new BufferedReader(new InputStreamReader(new FileInputStream(raf.getFD()), "GBK")); // 从上一次读取的位置开始创建一个新的BufferedReader对象 String line; count=0; while ((line = reader.readLine()) != null) { RpcClient client = clients[count % FLUME_PORTS.length]; Event flumeEvent = EventBuilder.withBody(line.getBytes()); client.append(flumeEvent); System.out.println(line); count++; } // 更新文件的上一次长度 lastLengths.put(filePath.toString(), length); } } } } // 重置watchKey状态为ready,以便继续监听事件 if (!watchKey.reset()) { break; } } } catch (IOException | InterruptedException | EventDeliveryException e) { e.printStackTrace(); } } } 展开全部 收起 写回答 好问题 0 提建议 追加酬金 关注问题 微信扫一扫 点击复制链接 分享 邀请回答 编辑 收藏 删除 结题 收藏 举报 追加酬金 (90%的用户在追加酬金后获得了解决方案) 当前问题酬金 ¥ 0 (可追加 ¥500) ¥ 15¥ 20¥ 50¥ 100¥ 200 支付方式 扫码支付 二维码出错 点击刷新 支付金额 15 元 提供问题酬金的用户不参与问题酬金结算和分配 支付即为同意 《付费问题酬金结算规则》 结题 再想想 删除 再等等

    4条回答 默认 最新

    • 笑面虎 2024-05-12 18:28
      关注

      以下内容由CHATGPT及笑面虎共同生成、有用望采纳:


      【您想要解决的问题】:您遇到的问题是在使用 Flume 的 FlumeRpcClientExample 运行时,数据无法通过 NettyAvroRpcClient 发送到配置的主机和端口。

      【图片内容】:您上传的图片包含了错误日志信息,显示了 EventDeliveryException 异常,指出在尝试通过 NettyAvroRpcClient 发送事件时失败了。错误发生在 NettyAvroRpcClient.append 方法调用时,并且是由于 Avro RPC 调用返回了 FAILED 状态。

      【问题出现原因】:这个问题可能由以下几个原因引起:

      1. 主机地址错误:日志中显示的主机地址是 0.0.0.0,这通常是一个通配地址,可能不是您想要连接的实际地址。
      2. 端口不可用:端口 44444 可能没有在目标主机上监听,或者防火墙设置阻止了连接。
      3. 网络问题:可能存在网络连接问题,导致客户端无法到达服务器。
      4. Flume 配置问题:Flume 的配置可能不正确,导致无法建立 RPC 连接。

      【问题解决方案】:为了解决这个问题,您可以尝试以下步骤:

      1. 检查主机地址:确保您使用的是正确的主机地址,而不是 0.0.0.0
      2. 检查端口:确认端口 44444 在目标主机上是开放的,并且Flume的配置文件中指定了正确的端口。
      3. 检查网络连接:确保网络连接没有问题,您可以尝试 ping 目标主机以确认连接。
      4. 检查防火墙设置:确保没有防火墙规则阻止了到端口 44444 的连接。
      5. 检查 Flume 配置:确保 Flume 的配置文件(如 flume.confflume.properties)中的相关配置是正确的。

      【提供代码】:由于这个问题是关于配置和环境设置的,而不是代码本身的问题,所以没有具体的代码可以提供。但是,您可能需要检查和修改您的 Flume 配置文件。

      【代码运行方式】:如果您需要修改配置文件,通常您需要做的是:

      1. 找到您的 Flume 配置文件。
      2. 使用文本编辑器打开它并进行必要的修改。
      3. 保存文件并重新启动 Flume 服务。

      【代码预期运行结果】:如果配置正确,网络连接正常,那么您应该能够看到数据成功发送到 Flume 代理,而不会遇到 EventDeliveryException 异常。

      【推荐相关链接】:

      请根据这些步骤检查和调整您的配置,如果问题仍然存在,您可能需要提供更多的信息或者联系 Flume 社区以获得更专业的帮助。

      展开全部

      评论 编辑记录
    • 专家-皆知 2024-05-12 18:28
      关注

      检查网络连接、服务端口、配置文件是否正确。

      以上回复有部分引自免费微信小程序:皆我百晓生。 如问题尚未解决,还请稍待,如有疑问请回复进一步沟通。

      评论
      专家-皆知 回复 废柴太乙 2024-05-12 18:29

      根据你提供的异常信息,看起来你在尝试使用Flume的NettyAvroRpcClient从一个CSV文件向Flume agent发送事件时遇到了问题。错误消息表明连接失败,并且Avro RPC调用返回了状态FAILED。这可能是由于几个原因,包括网络问题、配置问题或者是目标Flume agent未运行。

      以下是针对这个问题的一些排查步骤和可能的解决方案:

      1. 检查Flume agent: 确保目标Flume agent正在运行并监听在指定的主机(0.0.0.0)和端口(44444)。你可以通过命令行工具或监控工具查看Flume agent的状态。如果你使用的是Apache Flume 1.x,可以通过flume-ng status命令来查看;如果是Flume 2.x及以上版本,可以使用fluent-cat /var/log/flume2/flume.log查看日志。

      2. 检查网络连接: 确认源机器(运行FlumeRpcClientExample的机器)和目标Flume agent机器之间有正确的网络连接。你可以使用ping命令测试它们之间的连通性:ping 0.0.0.0(假设这是正确的目标地址)。

      3. 验证配置: 检查FlumeRpcClientExample中的配置是否正确,确保Flume client指向的主机名和端口号与实际Flume agent配置一致。在Java代码中,这通常是在初始化NettyAvroRpcClient时设置的。

        // 假设这是你的初始化代码片段
        String flumeHost = "your_flume_host"; // 替换为实际的Flume agent主机名
        int flumePort = 44444; // 如果不是默认的9444,替换为实际端口号
        NettyAvroRpcClient client = new NettyAvroRpcClient(flumeHost, flumePort);
        
      4. 检查防火墙和安全组规则: 如果在网络环境中,确认没有防火墙或安全组阻止了从源机器到Flume agent的通信。你需要允许来自源IP的入站流量访问Flume agent监听的端口。

      5. 事件序列化问题: 如果事件序列化有问题,也可能导致此错误。确保使用的序列化格式与Flume agent配置中的序列化器匹配。例如,如果Flume agent配置为使用JSON序列化,而你发送的是Avro格式的数据,那么就会出错。

      6. 检查Flume agent配置: 查看Flume agent的配置文件(通常是conf/flume-conf.properties),确保它正确地定义了接收端点(source)以及对NettyAvroSink的配置。

        # 示例配置
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
        
        # Source configuration
        a1.sources.r1.type = spooldir
        a1.sources.r1.spoolDir = /path/to/your/csv/files/
        
        # Sink configuration
        a1.sinks.k1.type = nettyavro
        a1.sinks.k1.channel = c1
        a1.sinks.k1.hostname = 0.0.0.0
        a1.sinks.k1.port = 44444
        
        # Channel configuration
        a1.channels.c1.type = memory
        
      7. 调试代码: 使用更详细的日志级别进行调试,以便获取更多关于错误的上下文信息。在Flume agent和客户端代码中增加日志输出,可以帮助定位具体问题所在。

      对于代码修改,请确保你已经添加了相应的日志框架依赖,比如Log4j或SLF4J。然后,在代码中添加适当的日志语句。

      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class FlumeRpcClientExample {
          private static final Logger logger = LoggerFactory.getLogger(FlumeRpcClientExample.class);
      
          public void sendCSVFile(String csvFilePath) {
              // ...
              try {
                  // 在关键部分添加日志输出
                  logger.info("Connecting to Flume agent at {}:{}", flumeHost, flumePort);
                  client.connect();
                  // ...
                  logger.debug("Sending event {}", event);
                  client.append(event);
                  // ...
              } catch (IOException e) {
                  logger.error("Error sending events to Flume agent", e);
              }
              // ...
          }
      
          // 其他方法...
      }
      

      完成以上步骤后,重新运行程序并观察日志输出,以找到具体的错误原因。如果问题仍然存在,请提供更多的日志信息以便进一步分析。

      回复
    • 关注

      晚上好🌙🌙🌙
      本答案参考ChatGPT-3.5

      根据错误提示信息来看,Flume无法发送数据,具体原因是Avro RPC调用返回了失败状态。这个问题可能会有多个方面的原因,下面提供一些常见的解决方案:

      1. 检查网络连接是否正常,确保Flume客户端能够连接到Flume代理服务器。
      2. 检查Flume代理服务器的配置是否正确,主要是检查配置文件中的端口号、IP地址等信息是否正确。
      3. 升级Flume版本,可能是当前版本存在某些bug,导致数据无法发送。可以尝试升级到最新版本或者稳定版本。
      4. 检查Flume客户端的配置文件是否正确,主要是检查source、channel、sink等组件的配置是否正确。
      5. 若以上几个方面都检查过了仍然无法解决问题,可以尝试停止Flume代理服务器,然后重启服务并重新启动Flume客户端,看是否能够解决问题。

      可以尝试以上这些解决方案,排查Flume数据无法发送的问题。

      评论
    • 秦问雨nb 2024-05-22 16:15
      关注

      检测你的配置文件是否正常被使用,kafka有没有配置成功,在docker中各个容器能不能互相通讯

      评论
    编辑
    预览

    报告相同问题?

  • 相关阅读:
    网络工程师回顾学习
    CVPR2020:Seeing Through Fog Without Seeing Fog
    IntelliJ_IDEA的下载和安装的准备
    异常检测 | MATLAB实现基于支持向量机和孤立森林的数据异常检测(结合t-SNE降维和DBSCAN聚类)
    IDEA——工程项目的两种窗口开发模式
    Java IO学习笔记(二):字节流与字符流
    电子时钟制作(瑞萨RA)(6)----配置RTC时钟及显示时间
    创新家庭办公室:打造完美工作空间的秘诀
    Python武器库开发-flask篇之URL重定向(二十三)
    Kafka 多种跨 IDC 灾备方案调研对比
  • 原文地址:https://ask.csdn.net/questions/8102308