• Python 自动化: eip、cen监控数据对接到 grafana


    aee5f7f96bddf6ef6234e681ce476de6.gif

    新钛云服已累计为您分享775篇技术干货

    01dfdfdb72ab2c3c291a81b7c818f59f.gif

    概览

    日常运维中,我们有时需要关注阿里云中 EIP 和 CEN 的监控数据,如果每次登录到平台查看,不太方便。

    可以通过 API 获取监控数据,并输入到 influxDB,然后再到 Grafana 中展示,以便进行实施监控和可视化。

    第一步:准备工作

    在开始之前,我们需要确保已经完成以下准备工作

    准备阿里云的EIP和CEN实例

    这一步省略

    解如何获取EIP和CEN数据

    了解如何获取 EIP 和 CEN 数据

    我的方式是 EIP 通过 EIP 产品的 API 获取的,调试链接如下

    https://next.api.aliyun.com/api/Vpc/2016-04-28/DescribeEipMonitorData?params={"RegionId":"cn-hangzhou"}

    输入 RegionId 和 AllocationId 等必选信息后,复制平台生成的代码,进行更改,下文会介绍如何更改

    295419130a16788a6c73a4a8564c10d3.png

    ‍‍‍

    CEN 的监控数据未找到具体的 API,但可以通过云监控的数据获取,也是很方便的,链接如下

    https://api.aliyun.com/api/Cms/2019-01-01/DescribeMetricData

    获取 CEN 的具体数据时,可以通过 https://cms.console.aliyun.com/metric-meta/acs_cen/cen_tr?spm=a2c4g.11186623.0.0.252476ab1Ldq0T 得到

    实际上,EIP 的数据也可以通过云监控获取

    安装Python和所需的依赖库

    下面示例的版本是截止发文时间最新版本,实际使用时,可以登录到上面的阿里云开放平台查看最新的版本

    1. pip install alibabacloud_vpc20160428==5.1.0
    2. pip install alibabacloud_cms20190101==2.0.11

    ‍‍‍

    安装InfluxDB,并进行初始化配置

    1. 为方便使用,我这里是使用 Docker 运行的 Influxdb

    1. cd /data/influxdb
    2. # 生成初始的配置文件
    3. docker run --rm influxdb:2.7.1 influxd print-config > config.yml
    4. # 启动容器
    5. docker run --name influxdb -d -p 8086:8086 --volume `pwd`/influxdb2:/var/lib/influxdb2 --volume `pwd`/config.yml:/etc/influxdb2/config.yml influxdb:2.7.1

    2. 安装完成后,可通过 http://ip:8086 登录到 Influxdb

    3. 创建 bucket

    只需要创建一个 bucket 就可以了,bucket 类似 MySQL 的 database

    4. 获取 API Token,在 Python 插入数据时会用到

    安装Grafana,并进行基本的配置

    省略

    第二步:获取API访问凭证

    为了能够通过API访问阿里云的 EIP 和 CEN 数据,我们需要获取访问凭证。具体步骤如下

    1. 登录阿里云控制台

    2. 创建 RAM 用户并分配相应的权限

    3. 获取 RAM 用户的 Access Key ID 和 Access Key Secret

    第三步:编写Python脚本

    使用Python编写脚本来获取 EIP 和 CEN 的监控数据,并将其存储到 InfluxDB 中

    本文仅展示部分代码,如需完整的代码,请联系本公众号获取~

    调整从阿里云复制的示例代码

    1. 修改构造函数,可以传如 access_key_id 和 access_key_secret

    1. def __init__(self, access_key_id: str=access_key_id, access_key_secret: str=access_key_secret):
    2. self.access_key_id = access_key_id
    3. self.access_key_secret = access_key_secret

    ‍‍‍

    2. 修改获取 eip 数据的函数

    1. def get_eip_monitor_data(self, region_id, allocation_id, start_time: str, end_time: str):
    2. '''
    3. 参考文档:
    4. https://api.aliyun.com/api/Vpc/2016-04-28/DescribeEipMonitorData?params={%22RegionId%22:%22cn-hangzhou%22}
    5. Args:
    6. region_id (_type_): _description_
    7. allocation_id (_type_): _description_
    8. start_time (str): utc时间
    9. end_time (_type_): utc时间
    10. Yields:
    11. _type_: _description_
    12. eip_tx: 流出的流量。单位: Byte
    13. eip_rx: 流入的流量。单位: Byte
    14. '''
    15. # 请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
    16. # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考,建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
    17. client = self.create_client(endpoint=f'vpc.{region_id}.aliyuncs.com', access_key_id=self.access_key_id, access_key_secret=self.access_key_secret)
    18. describe_eip_monitor_data_request = vpc_20160428_models.DescribeEipMonitorDataRequest(
    19. region_id=region_id,
    20. allocation_id=allocation_id,
    21. start_time=start_time,
    22. end_time=end_time
    23. )
    24. log.debug(msg=describe_eip_monitor_data_request)
    25. runtime = util_models.RuntimeOptions()
    26. log.debug(msg=runtime)
    27. try:
    28. # 复制代码运行请自行打印 API 的返回值
    29. results = client.describe_eip_monitor_data_with_options(describe_eip_monitor_data_request, runtime).body.eip_monitor_datas.eip_monitor_data
    30. for result in results:
    31. yield result
    32. except Exception as error:
    33. log.error(msg=error)
    34. return UtilClient.assert_as_string(error.message)

    ‍‍‍

    3. 修改获取 cen 数据的函数

    1. def get_cen_monitor_data(self, namespace, metric_name, start_time: str, end_time: str):
    2. # 请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
    3. # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考,建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
    4. client = self.create_client(access_key_id=self.access_key_id, access_key_secret=self.access_key_secret)
    5. describe_metric_list_request = cms_20190101_models.DescribeMetricListRequest(
    6. namespace=namespace,
    7. metric_name=metric_name,
    8. start_time=start_time,
    9. end_time=end_time,
    10. )
    11. runtime = util_models.RuntimeOptions()
    12. try:
    13. # 复制代码运行请自行打印 API 的返回值
    14. return client.describe_metric_list_with_options(describe_metric_list_request, runtime).body.datapoints
    15. except Exception as error:
    16. # 如有需要,请打印 error
    17. UtilClient.assert_as_string(error.message)

    ‍‍‍

    编写InfluxDB相关的代码

    将 InfluxDB 的写入代码独立出来可以方便后续其他业务的调用

    下面的代码在获取 token 时,使用了 1password,可视情况进行修改,例如通过环境变量的方式获取 Token

    1. #!/usr/bin/env python3
    2. import influxdb_client, time
    3. import datetime
    4. from influxdb_client import InfluxDBClient, Point, WritePrecision
    5. from influxdb_client.client.write_api import SYNCHRONOUS
    6. from modules.onepassword import OnePassword
    7. my1p = OnePassword()
    8. class InfluxClient:
    9. token = my1p.get_item_by_title(title='my_influxdb')['api']
    10. def __init__(self, url: str='http://10.1.1.1:8086', org: str='tyun', token: str=token):
    11. self.url = url
    12. self.org = org
    13. self.token = token
    14. def create_client(self):
    15. return influxdb_client.InfluxDBClient(url=self.url, token=self.token, org=self.org)
    16. def write_aliyun_eip(self, bucket: str='example', table_name: str='test1', location: str=None, eip_tx: int=None, eip_rx: int=None, time_stamp: str=None):
    17. write_api = self.create_client().write_api(write_options=SYNCHRONOUS)
    18. point = (
    19. Point(table_name)
    20. .tag("location", location)
    21. .field("eip_tx", eip_tx)
    22. .field("eip_rx", eip_rx)
    23. .time(time_stamp)
    24. )
    25. write_api.write(bucket=bucket, org=self.org, record=point)
    26. def write_cen(self, bucket: str='example', table_name: str='test1', location: str=None, tr_instance_id: str=None, value: int=None, time_stamp: str=None):
    27. write_api = self.create_client().write_api(write_options=SYNCHRONOUS)
    28. point = (
    29. Point(table_name)
    30. .tag("location", location)
    31. .tag("tr_instance_id", tr_instance_id)
    32. .field("value", value)
    33. .time(time_stamp)
    34. )
    35. write_api.write(bucket=bucket, org=self.org, record=point)
    36. def main():
    37. influx_client = InfluxClient()
    38. for i in range(5):
    39. influx_client.write_data(bucket='example', table_name='test1', location='hangzhou', EipBandwidth=i, EipFlow=i)
    40. time.sleep(1)
    41. if __name__ == '__main__':
    42. main()

    ‍‍‍

    编写主程序

    1. 获取 eip 并插入到 influxdb

    1. #!/usr/bin/env python3
    2. from collections import namedtuple
    3. from modules.aliyun.eip import Eip
    4. from modules.database.influxdb.write import InfluxClient
    5. from modules.tools.my_time import MyDatetime as my_time
    6. eip = Eip()
    7. influx_client = InfluxClient()
    8. def insert_data(region_id, location, table_name, allocation_id, start_time, end_time):
    9. '''
    10. _summary_
    11. Args:
    12. region_id (_type_): _description_
    13. location (_type_): _description_
    14. table_name (_type_): _description_
    15. allocation_id (_type_): _description_
    16. start_time (_type_): _description_
    17. interval (int, optional): 取值的范围, 默认是5.
    18. '''
    19. eip_datas = eip.get_eip_monitor_data(region_id=region_id, allocation_id=allocation_id, start_time=start_time, end_time=end_time)
    20. for eip_data in eip_datas:
    21. # print(eip_data)
    22. influx_client.write_aliyun_eip(bucket='example',
    23. table_name=table_name,
    24. location=location,
    25. eip_rx=eip_data.eip_rx,
    26. eip_tx=eip_data.eip_tx,
    27. time_stamp=eip_data.time_stamp)
    28. Instance = namedtuple('Instance', ['region_id', 'allocation_id', 'bandwidth', 'env'])
    29. hangzhou = Instance(region_id='hangzhou', allocation_id='eip-xxxxxxxxx', bandwidth='100m', env='prod')
    30. eip_site_list = [hangzhou]
    31. for eip_site in eip_site_list:
    32. insert_data(region_id=f'cn-{eip_site.region_id}',
    33. location=f'cn-{eip_site.region_id}',
    34. table_name='eip',
    35. allocation_id=eip_site.allocation_id,
    36. start_time=my_time.get_utc_now_str_offset(offset=-60*10),
    37. end_time=my_time.get_utc_now_str()
    38. )

    ‍‍‍

    2. 获取 cen 数据并插入到 influxdb

    1. #!/usr/bin/env python3
    2. import ast
    3. from modules.aliyun.metrics import Metrics
    4. from modules.database.influxdb.write import InfluxClient
    5. from modules.tools.my_time import MyDatetime as my_time
    6. from modules.logger.client import LogClient
    7. metrics = Metrics()
    8. influx_client = InfluxClient()
    9. log = LogClient(app='example_traffic')
    10. def tr_instance_id_to_location(tr_instance_id):
    11. if tr_instance_id == 'tr-xxxxxxxxxxxxx':
    12. location = 'hangzhou'
    13. bandwidth = '20m'
    14. else:
    15. location = 'none'
    16. return location, bandwidth
    17. metric_names = ['AttachmentOutRate', 'AttachmentInRate']
    18. for metric_name in metric_names:
    19. results = metrics.get_cen_monitor_data(namespace='acs_cen',
    20. metric_name=metric_name,
    21. start_time=my_time.get_utc_now_str_offset(offset=-60*10),
    22. end_time=my_time.get_utc_now_str())
    23. log.debug(msg=results)
    24. for result in ast.literal_eval(results):
    25. result['metric_name'] = metric_name
    26. trInstanceId = result['trInstanceId']
    27. result['location'] = tr_instance_id_to_location(tr_instance_id=trInstanceId)[0]
    28. result['bandwidth'] = tr_instance_id_to_location(tr_instance_id=trInstanceId)[1]
    29. log.info(msg=metric_name + ' ' + my_time.timestamp_to_str(timestamp=result['timestamp']) + ' ' + ' ' + result['location'] + ' ' + str(result['Value']))
    30. influx_client.write_cen(bucket='example',
    31. table_name=metric_name,
    32. location=result['location'],
    33. tr_instance_id=result['trInstanceId'],
    34. value=result['Value'],
    35. time_stamp=my_time.timestamp_to_str(timestamp=result['timestamp']))

    第四步:配置Grafana

    Grafana中配置 InfluxDB 数据源,并创建相应的仪表盘来展示 EIP 和 CEN 的监控数据。具体步骤如下:

    1. 添加 InfluxDB 数据源,并配置连接信息

      我用的是 Flux 的查询语言,配置数据源时,需要注意以下事项

    • 数据源名字推荐使用:InfluxDB-Flux,注明是 Flux 类型的数据源

    • InfluxDB Details 填写 Organization、Token、Default Bucket 即可

    • 不用填写 HTTP 认证

    创建仪表盘,配置 eip 和 cen 的查询语句

    ‍· EIP 接收方向的流量

    1. from(bucket: "example")
    2. |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    3. |> filter(fn: (r) => r["_measurement"] == "eip")
    4. |> filter(fn: (r) => r["_field"] == "eip_rx")
    5. |> filter(fn: (r) => r["location"] == "cn-hangzhou")
    6. |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
    7. |> map(fn: (r) => ({ r with _value: r._value / 8 }))
    8. |> yield(name: "last")

    · EIP 发送方向的流量

    1. from(bucket: "example")
    2. |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    3. |> filter(fn: (r) => r["_measurement"] == "eip")
    4. |> filter(fn: (r) => r["_field"] == "eip_tx")
    5. |> filter(fn: (r) => r["location"] == "cn-hangzhou")
    6. |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
    7. |> map(fn: (r) => ({ r with _value: r._value / 8 }))
    8. |> yield(name: "last")

    ‍‍‍

    · CEN 发送方向的流量

    1. from(bucket: "example")
    2. |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    3. |> filter(fn: (r) => r["_measurement"] == "AttachmentOutRate")
    4. |> filter(fn: (r) => r["_field"] == "value")
    5. |> filter(fn: (r) => r["location"] == "hangzhou")
    6. |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
    7. |> yield(name: "last")

    ‍‍‍

    · CEN 接收方向流量

    1. from(bucket: "example")
    2. |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    3. |> filter(fn: (r) => r["_measurement"] == "AttachmentInRate")
    4. |> filter(fn: (r) => r["_field"] == "value")
    5. |> filter(fn: (r) => r["location"] == "hangzhou")
    6. |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
    7. |> yield(name: "last")

    ‍‍‍

    eip 和 cen 的数据单位都是 bit/sec(SI)

    建议配置 Grafana 面板的 Thresholds

    100M为 100000000,配置后会显示一条红线,可以更直观的看到流量的占用情况

    总结

    通过本文的步骤,我们可以通过API获取阿里云 EIP 和 CEN 的监控数据,将其存储到 InfluxDB,并通过 Grafana 进行实时监控和可视化。这为我们提供了一种自动化的方式来监控和管理阿里云网络资源。

        推荐阅读   

    814b981bafa1181a22fd55abed0a766d.png

    4e3736b1f16a68fb5f675670083ccae2.png

        推荐视频    

  • 相关阅读:
    【洛谷】P3835 【模板】可持久化平衡树
    猿创征文 |【SpringBoot2】基础配置详解
    【类、抽象与继承】
    成长&工作&思考
    使用dlib,OpenCV和Python进行人脸识别—人眼瞌睡识别
    elasticsearch访问9200端口 提示需要登陆
    $set小概率的赋值失败问题都被我碰上了
    MetaObjectHandler的使用
    所有Linux发行版存在shim漏洞;多个国家黑客利用AI进行网络攻击;美国挫败俄方网络间谍活动网络 | 安全周报 0216
    C语言入门 Day_14 for循环
  • 原文地址:https://blog.csdn.net/NewTyun/article/details/134432426