• 用Prometheus和Grafana监控Java Spring应用


    最近要对一些业务流程进行端到端的监控,这些业务是由几个微服务构成,微服务都是Java Spring编写的,我们需要了解整个业务涉及的各个模块的流量统计,性能状况,例如总共有多少次业务请求调用,多少次成功或失败的回复,每个步骤的耗时是多少等等。因此我也研究了一下如何在Java Spring应用中输出统计指标,通过Prometheus来统一收集指标,并在Grafana中通过不同的报表来呈现这些信息。

    首先我们先定义一个简单的业务流程,假设我们有两个Spring的应用,一个是提供业务请求接口的HTTP调用,在收到业务请求后,把里面携带的信息发送到Kafka。另一个应用是订阅Kafka的消息,获取应用一发出的业务数据,并进行处理。

    应用一

    在start.spring.io网站里面新建一个应用,artifact的名字为kafka-sender-example,Dependancies里面选择Apache kafka for spring, Actuator, Spring Web。打开生成的项目文件,添加一个名为RemoteCommandController的类,实现一个http接口,代码如下:

    1. package cn.roygao.kafkasenderexample;
    2. import java.util.Collections;
    3. import java.util.Map;
    4. import java.util.UUID;
    5. import java.util.concurrent.ExecutionException;
    6. import java.util.concurrent.TimeUnit;
    7. import java.util.concurrent.TimeoutException;
    8. import java.util.logging.Logger;
    9. import org.apache.kafka.clients.producer.ProducerRecord;
    10. import org.springframework.beans.factory.annotation.Autowired;
    11. import org.springframework.http.ResponseEntity;
    12. import org.springframework.kafka.core.KafkaTemplate;
    13. import org.springframework.web.bind.annotation.PostMapping;
    14. import org.springframework.web.bind.annotation.RequestBody;
    15. import org.springframework.web.bind.annotation.RestController;
    16. import com.alibaba.fastjson.JSONObject;
    17. @RestController
    18. public class RemoteCommandController {
    19. @Autowired
    20. private KafkaTemplate template;
    21. private final static Logger LOGGER = Logger.getLogger(RemoteCommandController.class.getName());
    22. @PostMapping("/sendcommand")
    23. public ResponseEntity> sendCommand(@RequestBody JSONObject commandMsg) {
    24. String requestId = UUID.randomUUID().toString();
    25. String vin = commandMsg.getString("vin");
    26. String command = commandMsg.getString("command");
    27. LOGGER.info("Send command to vehicle:" + vin + ", command:" + command);
    28. Map requestIdObj = Collections.singletonMap("requestId", requestId);
    29. ProducerRecord record = new ProducerRecord<>("remotecommand", 1, command);
    30. try {
    31. System.out.println(System.currentTimeMillis());
    32. template.send(record).get(10, TimeUnit.SECONDS);
    33. }
    34. catch (ExecutionException e) {
    35. LOGGER.info("Error");
    36. LOGGER.info(e.getMessage());
    37. }
    38. catch (TimeoutException | InterruptedException e) {
    39. LOGGER.info("Timeout");
    40. LOGGER.info(e.getMessage());
    41. }
    42. return ResponseEntity.accepted().body(requestIdObj);
    43. }
    44. }

    这个代码很简单,提供了一个POST的/sendcommand的接口,用户调用这个接口,提供车辆的VIN号和要发送的指令信息,收到请求之后,将把这些业务请求信息转发到Kafka的消息主题。这里用到了KafkaTemplate来进行消息的发送。为此,定义一个名为KafkaSender的配置类,代码如下:

    1. package cn.roygao.kafkasenderexample;
    2. import java.util.HashMap;
    3. import java.util.Map;
    4. import org.apache.kafka.clients.admin.NewTopic;
    5. import org.apache.kafka.clients.producer.ProducerConfig;
    6. import org.apache.kafka.common.serialization.IntegerSerializer;
    7. import org.apache.kafka.common.serialization.StringSerializer;
    8. import org.springframework.context.annotation.Bean;
    9. import org.springframework.context.annotation.Configuration;
    10. import org.springframework.kafka.config.TopicBuilder;
    11. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    12. import org.springframework.kafka.core.KafkaTemplate;
    13. import org.springframework.kafka.core.ProducerFactory;
    14. @Configuration
    15. public class KafkaSender {
    16. @Bean
    17. public NewTopic topic() {
    18. return TopicBuilder.name("remotecommand")
    19. .build();
    20. }
    21. @Bean
    22. public ProducerFactory producerFactory() {
    23. return new DefaultKafkaProducerFactory<>(producerConfigs());
    24. }
    25. @Bean
    26. public Map producerConfigs() {
    27. Map props = new HashMap<>();
    28. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    29. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    30. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    31. // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    32. return props;
    33. }
    34. @Bean
    35. public KafkaTemplate kafkaTemplate() {
    36. return new KafkaTemplate(producerFactory());
    37. }
    38. }

    代码里面定义了Kafka服务器的地址,消息主题等配置。

    运行./mvnw clean package进行编译打包。

    应用二

    在start.spring.io网站里面新建一个应用,artifact的名字为kafka-sender-example,Dependancies里面选择Apache kafka for spring, Actuator。打开生成的项目文件,新建一个名为RemoteCommandHandler的类,实现接收Kafka信息的功能,代码如下:

    1. package cn.roygao.kafkareceiverexample;
    2. import java.util.concurrent.TimeUnit;
    3. import org.springframework.kafka.annotation.KafkaListener;
    4. import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
    5. import org.springframework.stereotype.Component;
    6. import io.micrometer.core.instrument.MeterRegistry;
    7. import io.micrometer.core.instrument.Timer;
    8. @Component
    9. public class RemoteCommandHandler {
    10. private Timer timer;
    11. public RemoteCommandHandler(MeterRegistry registry) {
    12. this.timer = Timer
    13. .builder("kafka.process.latency")
    14. .publishPercentiles(0.15, 0.5, 0.95)
    15. .publishPercentileHistogram()
    16. .register(registry);
    17. }
    18. @KafkaListener(id = "myId", topics = "remotecommand")
    19. public void listen(String in, ConsumerRecordMetadata meta) {
    20. long latency = System.currentTimeMillis()-meta.timestamp();
    21. timer.record(latency, TimeUnit.MILLISECONDS);
    22. }
    23. }

    这里类的构造函数需要传入一个MeterRetistry的对象,然后新建一个Timer对象,这是Micrometer提供的四种Metric之一,可以用来记录时长的信息。把这个Timer注册到MeterRegistry。

    在listen方法中,定义了从Kafka的消息主题订阅消息,获取消息的metadata中的生成时间的时间戳,并与当前的时间进行比较,计算出从消息生成到消息消费的耗时,然后用timer来进行计算。Timer会按照之前的定义进行不同百分位区间的分布统计。

    同样我们也需要定义一个Kafka的配置类,代码如下:

    1. package cn.roygao.kafkareceiverexample;
    2. import java.util.HashMap;
    3. import java.util.Map;
    4. import org.apache.kafka.clients.producer.ProducerConfig;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. import org.springframework.kafka.annotation.EnableKafka;
    8. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    9. import org.springframework.kafka.config.KafkaListenerContainerFactory;
    10. import org.springframework.kafka.core.ConsumerFactory;
    11. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    12. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
    13. @Configuration
    14. @EnableKafka
    15. public class KafkaConfig {
    16. @Bean
    17. KafkaListenerContainerFactory>
    18. kafkaListenerContainerFactory() {
    19. ConcurrentKafkaListenerContainerFactory factory =
    20. new ConcurrentKafkaListenerContainerFactory<>();
    21. factory.setConsumerFactory(consumerFactory());
    22. factory.setConcurrency(3);
    23. factory.getContainerProperties().setPollTimeout(3000);
    24. return factory;
    25. }
    26. @Bean
    27. public ConsumerFactory consumerFactory() {
    28. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    29. }
    30. @Bean
    31. public Map consumerConfigs() {
    32. Map props = new HashMap<>();
    33. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    34. props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
    35. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    36. return props;
    37. }
    38. }

    在application.properties文件中添加以下配置:

    1. spring.kafka.consumer.auto-offset-reset=earliest
    2. server.port=7777
    3. management.endpoints.web.exposure.include=health,info,prometheus
    4. management.endpoints.enabled-by-default=true
    5. management.endpoint.health.show-details: always

    然后运行./mvnw clean package进行编译打包。

    启动Kafka

    这里我采用Docker的方式来启动Kafka,compose文件的内容如下:

    1. ---
    2. version: '2'
    3. services:
    4. zookeeper:
    5. image: confluentinc/cp-zookeeper:6.1.0
    6. hostname: zookeeper
    7. container_name: zookeeper
    8. ports:
    9. - "2181:2181"
    10. environment:
    11. ZOOKEEPER_CLIENT_PORT: 2181
    12. ZOOKEEPER_TICK_TIME: 2000
    13. broker:
    14. image: confluentinc/cp-server:6.1.0
    15. hostname: broker
    16. container_name: broker
    17. depends_on:
    18. - zookeeper
    19. ports:
    20. - "9092:9092"
    21. - "9101:9101"
    22. environment:
    23. KAFKA_BROKER_ID: 1
    24. KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    25. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    26. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
    27. KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
    28. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    29. KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    30. KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
    31. KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
    32. KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    33. KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    34. KAFKA_JMX_PORT: 9101
    35. KAFKA_JMX_HOSTNAME: localhost
    36. KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    37. CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
    38. CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
    39. CONFLUENT_METRICS_ENABLE: 'true'
    40. CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    41. schema-registry:
    42. image: confluentinc/cp-schema-registry:6.1.0
    43. hostname: schema-registry
    44. container_name: schema-registry
    45. depends_on:
    46. - broker
    47. ports:
    48. - "8081:8081"
    49. environment:
    50. SCHEMA_REGISTRY_HOST_NAME: schema-registry
    51. SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
    52. SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    53. connect:
    54. image: cnfldemos/cp-server-connect-datagen:0.4.0-6.1.0
    55. hostname: connect
    56. container_name: connect
    57. depends_on:
    58. - broker
    59. - schema-registry
    60. ports:
    61. - "8083:8083"
    62. environment:
    63. CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
    64. CONNECT_REST_ADVERTISED_HOST_NAME: connect
    65. CONNECT_REST_PORT: 8083
    66. CONNECT_GROUP_ID: compose-connect-group
    67. CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
    68. CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
    69. CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
    70. CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
    71. CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
    72. CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
    73. CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
    74. CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
    75. CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
    76. CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    77. # CLASSPATH required due to CC-2422
    78. CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.1.0.jar
    79. CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
    80. CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    81. CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
    82. CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    83. control-center:
    84. image: confluentinc/cp-enterprise-control-center:6.1.0
    85. hostname: control-center
    86. container_name: control-center
    87. depends_on:
    88. - broker
    89. - schema-registry
    90. - connect
    91. - ksqldb-server
    92. ports:
    93. - "9021:9021"
    94. environment:
    95. CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
    96. CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
    97. CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
    98. CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
    99. CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
    100. CONTROL_CENTER_REPLICATION_FACTOR: 1
    101. CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
    102. CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
    103. CONFLUENT_METRICS_TOPIC_REPLICATION: 1
    104. PORT: 9021
    105. ksqldb-server:
    106. image: confluentinc/cp-ksqldb-server:6.1.0
    107. hostname: ksqldb-server
    108. container_name: ksqldb-server
    109. depends_on:
    110. - broker
    111. - connect
    112. ports:
    113. - "8088:8088"
    114. environment:
    115. KSQL_CONFIG_DIR: "/etc/ksql"
    116. KSQL_BOOTSTRAP_SERVERS: "broker:29092"
    117. KSQL_HOST_NAME: ksqldb-server
    118. KSQL_LISTENERS: "http://0.0.0.0:8088"
    119. KSQL_CACHE_MAX_BYTES_BUFFERING: 0
    120. KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
    121. KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
    122. KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    123. KSQL_KSQL_CONNECT_URL: "http://connect:8083"
    124. KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
    125. KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
    126. KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
    127. ksqldb-cli:
    128. image: confluentinc/cp-ksqldb-cli:6.1.0
    129. container_name: ksqldb-cli
    130. depends_on:
    131. - broker
    132. - connect
    133. - ksqldb-server
    134. entrypoint: /bin/sh
    135. tty: true
    136. ksql-datagen:
    137. image: confluentinc/ksqldb-examples:6.1.0
    138. hostname: ksql-datagen
    139. container_name: ksql-datagen
    140. depends_on:
    141. - ksqldb-server
    142. - broker
    143. - schema-registry
    144. - connect
    145. command: "bash -c 'echo Waiting for Kafka to be ready... && \
    146. cub kafka-ready -b broker:29092 1 40 && \
    147. echo Waiting for Confluent Schema Registry to be ready... && \
    148. cub sr-ready schema-registry 8081 40 && \
    149. echo Waiting a few seconds for topic creation to finish... && \
    150. sleep 11 && \
    151. tail -f /dev/null'"
    152. environment:
    153. KSQL_CONFIG_DIR: "/etc/ksql"
    154. STREAMS_BOOTSTRAP_SERVERS: broker:29092
    155. STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
    156. STREAMS_SCHEMA_REGISTRY_PORT: 8081
    157. rest-proxy:
    158. image: confluentinc/cp-kafka-rest:6.1.0
    159. depends_on:
    160. - broker
    161. - schema-registry
    162. ports:
    163. - 8082:8082
    164. hostname: rest-proxy
    165. container_name: rest-proxy
    166. environment:
    167. KAFKA_REST_HOST_NAME: rest-proxy
    168. KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
    169. KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
    170. KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

    运行nohup docker compose up > ./kafka.log 2>&1 &即可启动。在浏览器输入localhost:9021,可以在控制台界面观看Kafka的相关信息。

    分别运行应用一和应用二,然后调用POST http://localhost:8080/remotecommand接口发送业务请求,例如以下的命令:

    1. curl --location --request POST 'http://localhost:8080/sendcommand' \
    2. --header 'Content-Type: application/json' \
    3. --data-raw '{
    4. "vin": "ABC123",
    5. "command": "engine-start"
    6. }'

    在Kafka的控制台可以看到有一个remotecommand的消息主题,并且有一条信息发送和被消费。

    启动Prometheus和Grafana

    同样采用docker compose的方式来启动,compose文件内容如下:

    1. services:
    2. prometheus:
    3. image: prom/prometheus-linux-amd64
    4. #network_mode: host
    5. container_name: prometheus
    6. restart: unless-stopped
    7. volumes:
    8. - ./config:/etc/prometheus/
    9. command:
    10. - '--config.file=/etc/prometheus/prometheus.yaml'
    11. ports:
    12. - 9090:9090
    13. grafana:
    14. image: grafana/grafana
    15. user: '472'
    16. #network_mode: host
    17. container_name: grafana
    18. restart: unless-stopped
    19. links:
    20. - prometheus:prometheus
    21. volumes:
    22. - ./data/grafana:/var/lib/grafana
    23. environment:
    24. - GF_SECURITY_ADMIN_PASSWORD=admin
    25. ports:
    26. - 3000:3000
    27. depends_on:
    28. - prometheus

    在这个compose文件的目录下新建一个config目录,里面存放prometheus的配置文件,内容如下:

    1. scrape_configs:
    2. - job_name: 'Spring Boot Application input'
    3. metrics_path: '/actuator/prometheus'
    4. scrape_interval: 2s
    5. static_configs:
    6. - targets: ['172.17.0.1:7777']
    7. labels:
    8. application: 'My Spring Boot Application'

    这里面的targets配置的是应用二暴露的地址,metrics_path是采集指标的路径。

    在compose文件的目录下新建一个data/grafana目录,挂载给Grafana的文件目录,注意这里需要用chmod 777来修改目录权限,不然Grafana会报权限错误。

    运行nohup docker compose up > ./prometheus.log 2>&1 &运行即可。

    打开localhost:9090可以访问prometheus的页面,然后我们可以输入kafka进行搜索,可以看到应用二上报的kafka_process_latency的指标数据,按照我们的定义进行了0.15,0.5, 0.95这三个百分位区间的统计。

    打开localhost:3000可以访问Grafana的页面,配置datasource,选择Prometheus这个容器的地址,然后save&test。之后可以新建一个dashboard,然后可以在报表里面显示kafka_process_latency的指标图形。

    【未完待续】,还要增加对Http接口调用的Counter metric,以及在Grafana定义更多的报表,包括其他服务指标等等。

  • 相关阅读:
    tomcat下载安装及配置教程
    QT压缩解压文件
    Leetcode129. 求根到叶子节点数字之和
    【vue.js】使用高德地图选择省市区后,再点击确认当前选择的位置
    svelte组件:svelte3.x自定义美化虚拟滚动条组件svelte-scrollbar
    linux修改root密码
    《向量数据库指南》——提高向量数据库Milvus Cloud 2.3的运行效率
    开源和闭源软件对开发的影响
    C++内存管理:其三、new和delete的行为拆分
    云小课|基于华为云WAF的日志运维分析,构筑设备安全的城墙
  • 原文地址:https://blog.csdn.net/gzroy/article/details/127769341