• 【Flink metric(1)】Flink指标系统的系统性知识:获取metric以及注册自己的metric


    本文我们通过官网来整体了解下flink 指标系统的系统性支持

     

    本文主要关注:

    • 如何注册自定义指标,如何进行更新指标数据
    • 指标定义的层级:即指标的scope
    • 简单介绍,指标如何报告给外部系统、有哪些系统指标
    • 指标如何通过REST API获取
    • 在flink UI上创建Dashboard的方法

     

    Flink exposes a metric system that allows gathering and exposing metrics to external systems.

    flink 暴露了一个指标系统,可以收集和暴露指标给外部系统。

    一. Registering metrics:向flink注册新自己的metrics

    1. 注册metrics

    任何继承了RichFunction 的用户函数,都可以通过调用:getRuntimeContext().getMetricGroup() ,来访问flink的metric system。方法返回的MetricGroup可以用来创建和注册新的指标。

     

    2. Metric types:指标类型

    flink支持 Counters, Gauges, Histograms and Meters.等四种指标类型。

    2.1. Counter

    计数器 (Counter) 用于计数某个指标。

    • 可以使用 inc()/inc(long n)dec()/dec(long n) 方法来增加或减少当前值。
    • 可以通过在 MetricGroup 上调用 counter(String name) 来创建并注册一个计数器。
    public class MyMapper extends RichMapFunction<String, String> {
      private transient Counter counter;
    
      @Override
      public void open(Configuration config) {
        this.counter = getRuntimeContext()
          .getMetricGroup()
          .counter("myCounter");
      }
    
      @Override
      public String map(String value) throws Exception {
        this.counter.inc();
        return value;
      }
    }
    

    你也可以自己实现counter。

    public class MyMapper extends RichMapFunction<String, String> {
      private transient Counter counter;
    
      @Override
      public void open(Configuration config) {
        this.counter = getRuntimeContext()
          .getMetricGroup()
          .counter("myCustomCounter", new CustomCounter());
      }
    
      @Override
      public String map(String value) throws Exception {
        this.counter.inc();
        return value;
      }
    }
    

     

    2.2. Gauge

    可以提供任何数据类型,要使用Gauge你必须要实现Gauge接口,可以返回任何类型。

    public class MyMapper extends RichMapFunction<String, String> {
      private transient int valueToExpose = 0;
    
      @Override
      public void open(Configuration config) {
        getRuntimeContext()
          .getMetricGroup()
          .gauge("MyGauge", new Gauge<Integer>() {
            @Override
            public Integer getValue() {
              return valueToExpose;
            }
          });
      }
    
      @Override
      public String map(String value) throws Exception {
        valueToExpose++;
        return value;
      }
    }
    

     

    2.3. Histogram(ing)

    直方图(Histogram)用于测量长整型值的分布情况。

    可以通过在 MetricGroup 上调用 histogram(String name, Histogram histogram) 来注册一个直方图。

    public class MyMapper extends RichMapFunction<Long, Long> {
      private transient Histogram histogram;
    
      @Override
      public void open(Configuration config) {
        this.histogram = getRuntimeContext()
          .getMetricGroup()
          .histogram("myHistogram", new MyHistogram());
      }
    
      @Override
      public Long map(Long value) throws Exception {
        this.histogram.update(value);
        return value;
      }
    }
    

    ing

     

    2.4. Meter

    一个 Meter 用于测量平均吞吐量。

    • 可以使用 markEvent() 方法注册一个事件的发生。同时发生多个事件可以使用 markEvent(long n) 方法注册。
    • 在 MetricGroup 上调用 meter(String name, Meter meter) 来注册一个 Meter。

     

    二. Scope:指标作用域

    每个度量指标都被分配了一个标识符和一组键值对,用于报告该度量指标。
    这个标识符基于三个组件:在注册度量指标时的用户定义名称,一个可选的用户定义作用域,以及一个系统提供的作用域。

    例如,如果 A.B 是系统作用域,C.D 是用户作用域,E 是名称,那么度量指标的标识符将是 A.B.C.D.E。

    你可以通过在 Flink 配置文件中设置 metrics.scope.delimiter 键来配置标识符使用的分隔符(默认为 .)。

     

    1. User Scope

    你可以通过调用 MetricGroup#addGroup(String name),MetricGroup#addGroup(int name),或者 MetricGroup#addGroup(String key, String value) 来定义用户作用域。

    我们通过 MetricGroup#getMetricIdentifier 和 MetricGroup#getScopeComponents 方法返回的内容。

    counter = getRuntimeContext()
      .getMetricGroup()
      .addGroup("MyMetrics")
      .counter("myCounter");
    
    counter = getRuntimeContext()
      .getMetricGroup()
      .addGroup("MyMetricsKey", "MyMetricsValue")
      .counter("myCounter");
    

     

    2. System Scope ing

     

    3. User Variables

    你可以通过调用 MetricGroup#addGroup(String key, String value) 来定义一个用户变量。

    这个方法会影响 MetricGroup#getMetricIdentifier、MetricGroup#getScopeComponents 和 MetricGroup#getAllVariables() 返回的内容。

    counter = getRuntimeContext()
      .getMetricGroup()
      .addGroup("MyMetricsKey", "MyMetricsValue")
      .counter("myCounter");
    

     

    三. Reporter ing

    Flink 支持用户将 Flink 的各项运行时指标发送给外部系统。

     

    四. System metrics ing

    默认情况下,Flink会收集多个度量指标,这些指标能够深入了解当前的状态。

     

    五. REST API integration

    度量指标可以通过监控REST API查询。以下是可用端点列表及其示例JSON响应。

    序号metric类型API
    1特定实体的metric- /jobmanager/metrics
    - /taskmanagers//metrics
    - /jobs//metrics
    - /jobs//vertices//subtasks/
    2实体的聚合metric- /taskmanagers/metrics
    - /jobs/metrics
    - /jobs//vertices//subtasks/metrics
    - /jobs//vertices//jm-operator-metrics
    3实体子集上聚合的metric- /taskmanagers/metrics?taskmanagers=A,B,C
    - /jobs/metrics?jobs=D,E,F
    - /jobs//vertices//subtasks/metrics?subtask=1,2,3

     

    六. Dashboard integration

    可以在仪表板中可视化每个任务或操作符收集的度量指标。在作业的主页面上,选择“Metrics”选项卡。在顶部图表中选择一个任务后,您可以使用“添加度量指标”下拉菜单选择要显示的度量指标。如下图:

    • 任务度量指标列出为<子任务索引>.<度量名称>。
    • 操作符度量指标列出为 <子任务索引>.<操作符名称>.<度量名称>

    在这里插入图片描述

    • 每个度量指标将显示为单独的图表,其中 x 轴代表时间,y 轴表示测量值。
    • 所有图表每隔10秒自动更新一次,在导航到其他页面时仍会继续更新。
    • 可视化的度量指标数量没有限制,但是只有数值型度量指标可以被可视化显示。

     

  • 相关阅读:
    混沌系统在图像加密中的应用(基于量子混沌映射的伪随机数发生器)
    【Leetcode合集】9. 回文数
    十年Python老鸟总结的5条Python开发最佳实践
    单代号网络图
    N-151基于微信小程序校园学生活动管理平台
    这篇文章告诉你时光穿梭机特效从年轻变老制作软件
    基于Python的Linear classification实验报告
    剑指offer---Day5
    Class Activation Mapping(CAM)介绍
    Java中static关键字
  • 原文地址:https://blog.csdn.net/hiliang521/article/details/139907666