• 使用Flink AggregateFunction计算股票的平均值报错


    关注 码龄 粉丝数 原力等级 -- 被采纳 被点赞 采纳率 苏默曦 2024-06-18 17:32 采纳率: 50% 浏览 23 首页/ 大数据 / 使用Flink AggregateFunction计算股票的平均值报错 flink大数据 使用Flink AggregateFunction计算股票的平均值股票: AggregateFunction计算idea运行出现报错如下; 收起 写回答 好问题 0 提建议 追加酬金 关注问题 微信扫一扫 点击复制链接 分享 邀请回答 编辑 收藏 删除 结题 收藏 举报 追加酬金 (90%的用户在追加酬金后获得了解决方案) 当前问题酬金 ¥ 0 (可追加 ¥500) ¥ 15¥ 20¥ 50¥ 100¥ 200 支付方式 扫码支付 二维码出错 点击刷新 支付金额 15 元 提供问题酬金的用户不参与问题酬金结算和分配 支付即为同意 《付费问题酬金结算规则》 结题 再想想 删除 再等等

    2条回答 默认 最新

    • 专家-赤兔[在线] 全栈领域优质创作者 2024-06-18 17:32
      关注

      引自免费微信小程序:皆我百晓生

      从你提供的信息来看,主要的问题在于使用Flink的AggregateFunction时遇到了异常。异常信息显示为NullPointerException,这通常意味着在某个方法调用中,尝试操作了一个空对象。在Flink的上下文中,这可能发生在AggregateFunction的实现中,尤其是在createAccumulatoraccumulatemerge等方法中处理了空值。

      针对你的问题,首先检查AggregateFunction的实现,确保所有必要的方法都正确实现了,并且在这些方法中处理了所有可能的边界情况,尤其是对于可能为null的对象。以下是一个可能的AverageAggregate实现示例,用于计算每只股票的平均价格:

      import org.apache.flink.api.common.functions.AggregateFunction;
      import org.apache.flink.api.common.typeinfo.TypeInformation;
      import org.apache.flink.api.common.typeinfo.Types;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.api.java.tuple.Tuple3;
      
      public class AverageAggregate implements AggregateFunction<StockPrice, Tuple3>, Tuple2<String, Double>> {
      
          @Override
          public Tuple3Double, Integer> createAccumulator() {
              return new Tuple3<>("", 0.0, 0);
          }
      
          @Override
          public Tuple3Double, Integer> add(StockPrice stockPrice, Tuple3Double, Integer> accumulator) {
              if (accumulator.f0.equals("")) {
                  accumulator.f0 = stockPrice.symbol;
              }
              accumulator.f1 += stockPrice.price;
              accumulator.f2++;
              return accumulator;
          }
      
          @Override
          public Tuple3Double, Integer> merge(Tuple3Double, Integer> a, Tuple3Double, Integer> b) {
              a.f1 += b.f1;
              a.f2 += b.f2;
              return a;
          }
      
          @Override
          public Tuple2Double> getResult(Tuple3Double, Integer> accumulator) {
              return new Tuple2<>(accumulator.f0, accumulator.f1 / accumulator.f2);
          }
      
          @Override
          public TypeInformationDouble, Integer>> getAccumulatorType() {
              return Types.TUPLE(Types.STRING, Types.DOUBLE, Types.INT);
          }
      
          @Override
          public TypeInformationDouble>> getResultType() {
              return Types.TUPLE(Types.STRING, Types.DOUBLE);
          }
      }
      

      此外,从你提供的日志中,还看到了关于SLF4J的警告,这可能是因为你的项目中缺少相应的日志框架依赖。尽管这个警告与NullPointerException没有直接关系,但最好还是解决它,以避免未来的潜在问题。你可以在pom.xmlbuild.gradle中添加对应的SLF4J绑定依赖,例如logback-classiclog4j-over-slf4j

      最后,检查StockPrice类的构造函数中的错误,即:

      public static StockPrice of(String symbol, double price, Long ts, int volume) {
          return new StockPrice(symbol, price, ts, volume, mediaStatus:"); 
      }
      

      这里mediaStatus未被赋值,应该传递一个默认值或从其他地方获取其值,否则会抛出编译错误。

      确保所有的类和方法都被正确地导入和定义,且无语法错误。如果问题仍然存在,建议仔细检查数据源和数据流处理过程中的每一个步骤,确保数据的正确性和完整性。

      展开全部

      评论 编辑记录
    • 关注

      下午好🌅🌅🌅
      本答案参考ChatGPT-3.5

      根据您提供的代码,出现了一些报错。首先,根据报错信息,看到了java.lang.NullPointerException。这种错误通常是因为在程序执行过程中某个对象为null,而后对该对象进行了操作。

      根据代码,我们可以看到这个错误出现在AggregateFunctionExample类的测试方法中。那么首先要检查的是测试方法中的代码。确保传入的参数不为null,并且正确使用了AggregateFunction进行聚合计算。

      在对AggregateFunction进行聚合操作时,需要实现createAccumulatoraccumulate等方法。您可以检查一下这些方法的实现是否正确,例如确保聚合器的初始值正确设置。

      另外,还要注意检查我们引用的包是否正确,特别是Slf4j相关的包。可以确认是否正确引入了Slf4j相关的依赖,并且这些依赖版本是否与Flink版本兼容。

      综上所述,您可以按照以下方式来解决问题:

      1. 检查测试方法中的代码是否正确,确保传入的参数不为null,并正确使用了AggregateFunction进行聚合操作。
      2. 检查AggregateFunction的实现,确保createAccumulatoraccumulate等方法的实现正确。
      3. 确保正确引入了Slf4j相关的依赖,并且这些依赖版本与Flink版本兼容。

      希望这些解决方案对您有所帮助,如果问题仍然存在,请提供更多的错误日志以及相关代码,以便更好地帮助您解决问题。

      评论
    编辑
    预览

    报告相同问题?

  • 相关阅读:
    XXL-Job学习笔记
    12-IO流1-File类、绝(相)对路径、
    【无标题】清空吗
    HTML静态网页成品作业(HTML+CSS+JS)——我的家乡福州介绍网页(3个页面)
    Java线程池理解与学习
    idea添加文档注释
    四川省大学生金融科技建模大赛-模型复现和点评
    JVM之强软弱虚引用
    windows11录屏功能详解,记录你的精彩时刻
    JMX监控Kafka offset,kafka流量监控,kafka metrics
  • 原文地址:https://ask.csdn.net/questions/8120526