• 笨方法学Python


    1.Python执行脚本从本地文件中读取docker镜像列表时,执行拉取docker镜像

    要在Python中执行拉取Docker镜像的操作,可以使用subprocess模块来执行命令行命令。

    以下是一个示例代码,它从本地文件中读取Docker仓库镜像列表,并使用docker pull命令拉取每个镜像:

    1. import subprocess
    2. def pull_docker_images(filename):
    3. with open(filename, 'r') as file:
    4. for line in file:
    5. image = line.strip()
    6. command = ['docker', 'pull', image]
    7. try:
    8. subprocess.run(command, check=True)
    9. print(f'Successfully pulled image: {image}')
    10. except subprocess.CalledProcessError as e:
    11. print(f'Error pulling image: {image}')
    12. print(e)
    13. # 从文件中读取镜像列表并拉取
    14. pull_docker_images('image_list.txt')

    上述代码中,pull_docker_images函数接受一个文件名作为参数,并使用with open语句打开文件。然后,对于文件中的每一行,它使用strip方法去除行尾的空白字符,并将镜像名称存储在image变量中。

    接下来,它构建一个命令列表,其中第一个元素是docker命令,第二个元素是pull命令,第三个元素是镜像名称。

    然后,使用subprocess.run函数执行命令,并使用check=True参数来检查命令执行是否成功。如果命令执行成功,将打印成功消息;如果命令执行失败,将打印错误消息和异常。

    最后,通过调用pull_docker_images函数,并传递要读取的镜像列表文件的名称,可以拉取所有镜像。请确保在运行此代码之前已经安装了Docker并配置了正确的环境。

    docker批量修改tag

    1. docker images | grep registry | sed 's/registry.baidu-int.com/registry.can6.com:5000\/baidu/g' | awk '{print "docker tag"" " $3" "$1":"$2}'|sh
    2. #push镜像
    3. for i in `docker images|grep registry.can6.com:5000/baidu |awk '{print $1":"$2}'`
    4. do
    5. docker push $i
    6. done

    批量修改

    docker images|grep regis|awk '{print $1":"$2}'|while read image; do new_images=$(echo $image|sed s#registry.baidu-int.com#test.harbor.icbccs.com.cn:10443/baidu#g); docker tag $image $new_images; done

    2.编写python脚本读取文本中的镜像列表

    以下是一个简单的Python脚本,用于读取文本文件中的镜像列表:

    1. with open('image_list.txt', 'r') as f:
    2. for line in f:
    3. if line.startswith('image:'):
    4. image = line.strip().split(': ')[1]
    5. print(image)

    假设镜像列表文件名为image_list.txt,该脚本将打开文件并逐行读取其内容。对于每一行,它会检查是否以image:开头,如果是,则提取出镜像名称并打印出来。请注意,此脚本假设每个镜像的名称都在以image:开头的行中,且名称是该行的第二个字段。如果实际情况不同,需要相应地修改代码。

    3.Python脚本执行拉取docker镜像

    可以使用Python的subprocess模块来执行命令行操作,比如拉取docker镜像。具体操作如下:

    1. import subprocess
    2. # 定义要拉取的镜像名称和标签
    3. image_name = "nginx"
    4. image_tag = "latest"
    5. # 拉取镜像
    6. process = subprocess.Popen(f"docker pull {image_name}:{image_tag}", shell=True, stdout=subprocess.PIPE)
    7. output, error = process.communicate()
    8. # 打印输出结果
    9. if error:
    10. print(f"拉取镜像失败:{error}")
    11. else:
    12. print(f"拉取镜像成功:{output.decode('utf-8')}")

    在上述代码中,使用subprocess.Popen()方法执行命令行操作,其中shell参数需要设置为True,表示执行的是命令行脚本。stdout参数设置为subprocess.PIPE,表示将命令行输出结果保存到output变量中。最后使用communicate()方法获取命令行的输出结果,并将结果打印到控制台。

    4.Python执行脚本从本地文件中读取docker仓库镜像列表时,执行拉取 docker 镜像

    要在Python中执行拉取Docker镜像的操作,可以使用subprocess模块来执行命令行命令。

    以下是一个示例代码,它从本地文件中读取Docker仓库镜像列表,并使用docker pull命令拉取每个镜像:

    1. import subprocess
    2. def pull_docker_images(filename):
    3. with open(filename, 'r') as file:
    4. for line in file:
    5. image = line.strip()
    6. command = ['docker', 'pull', image]
    7. try:
    8. subprocess.run(command, check=True)
    9. print(f'Successfully pulled image: {image}')
    10. except subprocess.CalledProcessError as e:
    11. print(f'Error pulling image: {image}')
    12. print(e)
    13. # 从文件中读取镜像列表并拉取
    14. pull_docker_images('image_list.txt')

    上述代码中,pull_docker_images函数接受一个文件名作为参数,并使用with open语句打开文件。然后,对于文件中的每一行,它使用strip方法去除行尾的空白字符,并将镜像名称存储在image变量中。

    接下来,它构建一个命令列表,其中第一个元素是docker命令,第二个元素是pull命令,第三个元素是镜像名称。

    然后,使用subprocess.run函数执行命令,并使用check=True参数来检查命令执行是否成功。如果命令执行成功,将打印成功消息;如果命令执行失败,将打印错误消息和异常。

    最后,通过调用pull_docker_images函数,并传递要读取的镜像列表文件的名称,可以拉取所有镜像。请确保在运行此代码之前已经安装了Docker并配置了正确的环境。

    5.Python执行脚本从本地文件中读取docker镜像列表时,调用search接口查询结果,curl -X 'GET' \ 'https://harbor.com.cn/api/v2.0/search?q=python' \ -H 'accept: application/json'。将结果打印输出

    你可以使用Python的requests库来发送HTTP请求并获取查询结果。以下是一个示例代码:

    1. import requests
    2. url = 'https://harbor.com.cn/api/v2.0/search?q=python'
    3. headers = {'accept': 'application/json'}
    4. response = requests.get(url, headers=headers)
    5. result = response.json()
    6. print(result)

    这段代码发送一个GET请求到指定的URL,并使用指定的请求头。然后,将返回的响应解析为JSON格式,并将结果打印输出。请确保你已经安装了requests库,如果没有安装可以使用以下命令进行安装:

    pip3 install requests
    

    注意,你需要替换URL中的buddy为你要查询的镜像名称。

    6.Python执行脚本从本地文件中读取docker镜像列表image_list时,调用search接口查询image,curl -X 'GET' \ 'https://harbor.com.cn/api/v2.0/search?q=buddy' \ -H 'accept: application/json'。将结果打印输出

    你可以使用Python的requests库来发送HTTP请求并获取查询结果。以下是一个示例代码:

    1. import requests
    2. # 从文件中读取镜像列表
    3. with open('image_list.txt', 'r') as file:
    4. image_list = file.readlines()
    5. # 去除每个镜像名称中的换行符
    6. image_list = [image.strip() for image in image_list]
    7. # 遍历镜像列表,查询每个镜像的信息
    8. for image in image_list:
    9. url = f'https://harbor.com.cn/api/v2.0/search?q={image}'
    10. headers = {'accept': 'application/json'}
    11. response = requests.get(url, headers=headers)
    12. result = response.json()
    13. print(f'Result for image {image}:')
    14. print(result)
    15. print()

    这段代码首先从文件中读取镜像列表,并将每个镜像的名称存储在一个列表中。然后,遍历该列表,为每个镜像构建请求URL,并发送GET请求。最后,将返回的响应解析为JSON格式,并将结果打印输出。

    请确保你已经安装了requests库,如果没有安装可以使用以下命令进行安装:

    pip3 install requests
    

    同时,请将代码中的image_list.txt替换为你存储镜像列表的文件路径。

    7.python3 docker镜像地址 截取仓库和镜像名

    请使用以下代码截取仓库和镜像名:

    1. docker_image = "docker镜像地址"
    2. repository, image_name = docker_image.split("/")

    这将把仓库和镜像名分别存储在repositoryimage_name变量中。

    8.python3 docker镜像地址截去镜像地址,只留仓库镜像名

    如果你想要截取仓库和镜像名,可以使用以下代码:

    1. docker_image = "docker镜像地址"
    2. repository, image_name = docker_image.split("/", 1)[-1].split(":")[0]

    这将把仓库和镜像名分别存储在repositoryimage_name变量中。这里使用了split("/", 1)来限制分割次数为1,以确保只截取第一个斜杠之后的内容。然后使用split(":")[0]来截取冒号之前的内容,以去除标签信息。

    9.Python执行脚本从本地文件中读取docker镜像列表image_list时,截去镜像地址,只留镜像名,调用search接口查询镜像image,curl -X 'GET' \ 'https://harbor.com.cn/api/v2.0/search?q=buddy' \ -H 'accept: application/json'。将结果打印输出

    你可以使用以下代码从本地文件中读取docker镜像列表,并调用search接口查询镜像信息,并将结果打印输出:

    1. import requests
    2. # 读取docker镜像列表
    3. with open("image_list.txt", "r") as file:
    4. image_list = file.readlines()
    5. # 去除镜像地址,只保留镜像名
    6. image_names = [image.strip().split("/")[-1] for image in image_list]
    7. # 查询镜像信息并打印输出
    8. for image_name in image_names:
    9. url = f"https://harbor.com.cn/api/v2.0/search?q={image_name}"
    10. headers = {
    11. "accept": "application/json"
    12. }
    13. response = requests.get(url, headers=headers)
    14. result = response.json()
    15. print(result)

    这段代码假设你将docker镜像列表保存在名为image_list.txt的文件中,每行一个镜像地址。首先,它读取文件内容并将每个镜像地址截取为镜像名。然后,对于每个镜像名,它构建查询URL并发送GET请求到Harbor的search接口。最后,它将返回的结果打印输出。

    请注意,你需要安装requests库,可以通过pip3 install requests进行安装。另外,根据你的实际情况,你可能需要根据需要修改代码中的URL和headers。

    9.使用python获取Apollo所有配置

    1. # -*- coding: utf-8 -*-
    2. import json
    3. import logging
    4. import sys
    5. import threading
    6. import time
    7. import requests
    8. # 获取Apollo配置
    9. class ApolloClient(object):
    10. def __init__(self, app_id, cluster='default', config_server_url='http://localhost:8080', timeout=35, ip=None):
    11. self.config_server_url = config_server_url
    12. self.appId = app_id
    13. self.cluster = cluster
    14. self.timeout = timeout
    15. self.stopped = False
    16. self.init_ip(ip)
    17. self._stopping = False
    18. self._cache = {}
    19. self._notification_map = {'application': -1}
    20. def init_ip(self, ip):
    21. if ip:
    22. self.ip = ip
    23. else:
    24. import socket
    25. try:
    26. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    27. s.connect(('8.8.8.8', 53))
    28. ip = s.getsockname()[0]
    29. finally:
    30. s.close()
    31. self.ip = ip
    32. # Main method
    33. def get_value(self, key, default_val=None, namespace='application', auto_fetch_on_cache_miss=False):
    34. if namespace not in self._notification_map:
    35. self._notification_map[namespace] = -1
    36. logging.getLogger(__name__).info("Add namespace '%s' to local notification map", namespace)
    37. if namespace not in self._cache:
    38. self._cache[namespace] = {}
    39. logging.getLogger(__name__).info("Add namespace '%s' to local cache", namespace)
    40. # This is a new namespace, need to do a blocking fetch to populate the local cache
    41. self._long_poll()
    42. if key in self._cache[namespace]:
    43. return self._cache[namespace][key]
    44. else:
    45. if auto_fetch_on_cache_miss:
    46. return self._cached_http_get(key, default_val, namespace)
    47. else:
    48. return default_val
    49. # Start the long polling loop. Two modes are provided:
    50. # 1: thread mode (default), create a worker thread to do the loop. Call self.stop() to quit the loop
    51. # 2: eventlet mode (recommended), no need to call the .stop() since it is async
    52. def start(self, use_eventlet=False, eventlet_monkey_patch=False, catch_signals=True):
    53. # First do a blocking long poll to populate the local cache, otherwise we may get racing problems
    54. if len(self._cache) == 0:
    55. self._long_poll()
    56. if use_eventlet:
    57. import eventlet
    58. if eventlet_monkey_patch:
    59. eventlet.monkey_patch()
    60. eventlet.spawn(self._listener)
    61. else:
    62. if catch_signals:
    63. import signal
    64. signal.signal(signal.SIGINT, self._signal_handler)
    65. signal.signal(signal.SIGTERM, self._signal_handler)
    66. signal.signal(signal.SIGABRT, self._signal_handler)
    67. t = threading.Thread(target=self._listener)
    68. t.start()
    69. def stop(self):
    70. self._stopping = True
    71. logging.getLogger(__name__).info("Stopping listener...")
    72. def _cached_http_get(self, key, default_val, namespace='application'):
    73. url = '{}/configfiles/json/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace,
    74. self.ip)
    75. r = requests.get(url)
    76. if r.ok:
    77. data = r.json()
    78. self._cache[namespace] = data
    79. logging.getLogger(__name__).info('Updated local cache for namespace %s', namespace)
    80. else:
    81. data = self._cache[namespace]
    82. if key in data:
    83. return data[key]
    84. else:
    85. return default_val
    86. def _uncached_http_get(self, namespace='application'):
    87. url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace, self.ip)
    88. r = requests.get(url)
    89. if r.status_code == 200:
    90. data = r.json()
    91. self._cache[namespace] = data['configurations']
    92. logging.getLogger(__name__).info('Updated local cache for namespace %s release key %s: %s',
    93. namespace, data['releaseKey'],
    94. repr(self._cache[namespace]))
    95. return data['configurations']
    96. def _signal_handler(self, signal, frame):
    97. logging.getLogger(__name__).info('You pressed Ctrl+C!')
    98. self._stopping = True
    99. def _long_poll(self):
    100. url = '{}/notifications/v2'.format(self.config_server_url)
    101. notifications = []
    102. for key in self._notification_map:
    103. notification_id = self._notification_map[key]
    104. notifications.append({
    105. 'namespaceName': key,
    106. 'notificationId': notification_id
    107. })
    108. r = requests.get(url=url, params={
    109. 'appId': self.appId,
    110. 'cluster': self.cluster,
    111. 'notifications': json.dumps(notifications, ensure_ascii=False)
    112. }, timeout=self.timeout)
    113. logging.getLogger(__name__).debug('Long polling returns %d: url=%s', r.status_code, r.request.url)
    114. if r.status_code == 304:
    115. # no change, loop
    116. logging.getLogger(__name__).debug('No change, loop...')
    117. return
    118. if r.status_code == 200:
    119. data = r.json()
    120. for entry in data:
    121. ns = entry['namespaceName']
    122. nid = entry['notificationId']
    123. logging.getLogger(__name__).info("%s has changes: notificationId=%d", ns, nid)
    124. self._uncached_http_get(ns)
    125. self._notification_map[ns] = nid
    126. else:
    127. logging.getLogger(__name__).warn('Sleep...')
    128. time.sleep(self.timeout)
    129. def _listener(self):
    130. logging.getLogger(__name__).info('Entering listener loop...')
    131. while not self._stopping:
    132. self._long_poll()
    133. logging.getLogger(__name__).info("Listener stopped!")
    134. self.stopped = True
    135. def getConfData(config_server_url, appId, cluster, namespace):
    136. url = '{}/configs/{}/{}/{}'.format(config_server_url, appId, cluster, namespace)
    137. r = requests.get(url)
    138. if r.status_code == 200:
    139. data = r.json()
    140. return data['configurations']
    141. if __name__ == '__main__':
    142. appids = []
    143. with open("./appid.txt", 'r') as f:
    144. appids = f.readlines()
    145. appids = [appid.strip() for appid in appids]
    146. config_server_url = "http://1.1.1.1:8080"
    147. cluster = "DEV"
    148. namespace = "application"
    149. ip = "none"
    150. for appid in appids:
    151. if appid.strip() != "":
    152. # print(appid)
    153. data=getConfData(config_server_url,appid,cluster,namespace)
    154. # data=json.loads(str(data))
    155. # print(type(data))
    156. # properties_data = "\n".join([f"{key}={value}" for key, value in data.items()])
    157. #
    158. with open("D:/pythonProject/"+appid+".txt","w+") as f:
    159. f.write(str(data))

    10.在Apollo中批量给新创建的用户授可编辑权限

    1. # !/usr/bin/env python
    2. # -*-coding:utf-8 -*-
    3. """
    4. # File : apollo.py
    5. # Time :2023/09/19/001 16:55
    6. # Author :theo.wu
    7. # version :python 3.8
    8. # Description:模拟Apollo登陆,获取所有envs/appid添加wutao账号编辑权限
    9. """
    10. import requests
    11. import json
    12. class apollo:
    13. def __init__(self):
    14. self.username = "apollo"
    15. self.password = " "
    16. self.apollourl = "https://apollo-azero.cn"
    17. self.addauth = " "
    18. self.session = requests.session()
    19. self.headers = self.setheaders()
    20. def login(self):
    21. url = '{}/signin'.format(self.apollourl)
    22. payload = {
    23. "username": self.username,
    24. "password": self.password,
    25. "login-submit": "登录"
    26. }
    27. self.session.post(url=url,data=payload)
    28. res = self.session.cookies
    29. return (res.get_dict()['JSESSIONID'])
    30. def setheaders(self):
    31. session = self.login()
    32. myheaders = {
    33. "Accept": "application/json, text/plain, */*",
    34. "Cookie": " ",
    35. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36"
    36. }
    37. return (myheaders)
    38. def getenvs(self):
    39. url = 'http://{}/envs'.format(self.apollourl)
    40. head = self.headers
    41. res = requests.get(url=url,headers=head)
    42. return (res.text)
    43. def getappid(self):
    44. url = 'http://{}/apps'.format(self.apollourl)
    45. params = {'page': 0}
    46. res = requests.get(url, params=params,headers=self.headers)
    47. date = json.loads(res.text)
    48. allapp = [d['appId'] for d in date]
    49. return (allapp)
    50. def addauth(self,appid):
    51. uri = 'http://{0}/apps/{1}/namespaces/application/roles/ModifyNamespace'.format(self.apollourl,appid)
    52. playload = self.addauth
    53. try:
    54. response = requests.post(url=uri,data=playload,headers=self.headers)
    55. response.raise_for_status()
    56. except requests.exceptions.HTTPError as error:
    57. json_data = error.response.json()
    58. error_message = json_data['message']
    59. print(appid,error_message)
    60. if __name__ == '__main__':
    61. apollo_obj = apollo()
    62. for i in apollo_obj.getappid():
    63. apollo_obj.addauth(i)

    11.在Apollo中批量给新创建的用户授于发布权限

    12.需求

    批量解析域名的dns,获取负载均衡器地址

    13.

    业务组真实金额消费
    413420
    00215
    408101.5
    41315

    大概就是读取业务组列表和金额两列,然后按照业务维度来叠加成本。

    输出一个5行20列的excel表格,其中第一列是业务组,第20列是真实金额消费。

    以下是一个包含 5 行和 20 列的 Excel 表格,用于展示不同业务组的真实金额消费。您可以根据需要进行调整。
    ```lua
    | 业务组 | 消费项1 | 消费项2 | 消费项3 | 消费项4 | 消费项5 | 消费项6 | 消费项7 | 消费项8 | 消费项9 | 消费项10 | 消费项11 | 消费项12 | 消费项13 | 消费项14 | 消费项15 | 消费项16 | 消费项17 | 消费项18 | 消费项19 | 真实金额 |
    | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
    | A | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 10000 |
    | B | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 5000 |
    | A | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 20000 |
    | B | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 3000 |
    | C | 0 | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 15000 |
    ```

    使用python读取上面excel表格的业务组和真实消费金额两列,然后按照业务组来叠加成本。

    这个表格中的每个业务组都有一个唯一的颜色,让它们更容易被识别。请根据需要对这个示例表格进行调整。

     要使用 Python 读取 Excel 表格的业务组和真实消费金额两列,并按照业务组叠加成本,您可以使用 `pandas` 库。首先,确保已经安装了 `pandas`。如果没有,可以使用以下命令安装:
    ```bash
    pip install pandas
    ```
    接下来,您可以使用以下代码读取 Excel 表格并按照业务组叠加成本:
    ```python
    import pandas as pd

    # 假设您已经将 Excel 表格保存为 'input.xlsx'
    data = pd.read_excel('input.xlsx')

    # 创建一个包含业务组和真实消费金额的新数据框
    group_cost = data['业务组'] + '_' + data['真实消费金额']

    # 按照业务组对消费金额进行汇总
    group_cost_sum = data.groupby('业务组').agg({'真实消费金额': 'sum'})

    # 显示结果
    print(group_cost_sum)
    ```
    这段代码将读取 Excel 表格,并根据业务组对真实消费金额进行汇总。结果将在命令行或终端中显示。

    1. package main
    2. import (
    3. "fmt"
    4. "strconv"
    5. "github.com/360EntSecGroup-Skylar/excelize"
    6. )
    7. func main() {
    8. // 加载 Excel 文件
    9. f, err := excelize.OpenFile("F:\\soundai\\input1.xlsx")
    10. if err != nil {
    11. fmt.Println(err)
    12. return
    13. }
    14. // 读取业务组和真实金额两列数据
    15. rows := f.GetRows("Sheet1")
    16. // 创建一个映射来存储每个业务组的真实金额总和
    17. groupSum := make(map[string]float64)
    18. // 遍历每一行数据
    19. for rowIndex, row := range rows {
    20. // 跳过表头,表头从第2行开始
    21. if rowIndex < 1 {
    22. continue
    23. }
    24. // 提取业务组和真实金额
    25. group := row[0]
    26. amount := row[19]
    27. // 将真实金额从字符串转换为整数
    28. amountInt, err := strconv.ParseFloat(amount, 64)
    29. if err != nil {
    30. fmt.Println(err)
    31. return
    32. }
    33. // 计算每个业务组的真实金额总和
    34. groupSum[group] += amountInt
    35. }
    36. // 输出每个业务组的真实金额总和
    37. for group, sum := range groupSum {
    38. fmt.Printf("业务组: %s, 真实金额总和: %v\n", group, sum)
    39. }
    40. }

  • 相关阅读:
    ext文件系统
    Oracle/PLSQL: BFilename Function
    精选JAVA毕业设计56套——源码+论文完整资源
    Go基础18-理解方法的本质以选择正确的receiver类型
    《TCP/IP详解 卷一》第11章 DNS
    《棒球裁判》:走近棒球运动·球赛规则
    Spring注解@Qualifier的详细用法你知道几种「扩展点实战系列」- 第444篇
    msdn下载的系统怎么安装
    C++ 即将超越 Java,TIOBE 6 月编程语言排行榜发布!
    病例演讲比赛PPT模板
  • 原文地址:https://blog.csdn.net/niwoxiangyu/article/details/132943656