1.Python执行脚本从本地文件中读取docker镜像列表时,执行拉取docker镜像
要在Python中执行拉取Docker镜像的操作,可以使用subprocess模块来执行命令行命令。
以下是一个示例代码,它从本地文件中读取Docker仓库镜像列表,并使用docker pull命令拉取每个镜像:
- import subprocess
-
- def pull_docker_images(filename):
- with open(filename, 'r') as file:
- for line in file:
- image = line.strip()
- command = ['docker', 'pull', image]
- try:
- subprocess.run(command, check=True)
- print(f'Successfully pulled image: {image}')
- except subprocess.CalledProcessError as e:
- print(f'Error pulling image: {image}')
- print(e)
-
- # 从文件中读取镜像列表并拉取
- pull_docker_images('image_list.txt')
上述代码中,pull_docker_images函数接受一个文件名作为参数,并使用with open语句打开文件。然后,对于文件中的每一行,它使用strip方法去除行尾的空白字符,并将镜像名称存储在image变量中。
接下来,它构建一个命令列表,其中第一个元素是docker命令,第二个元素是pull命令,第三个元素是镜像名称。
然后,使用subprocess.run函数执行命令,并使用check=True参数来检查命令执行是否成功。如果命令执行成功,将打印成功消息;如果命令执行失败,将打印错误消息和异常。
最后,通过调用pull_docker_images函数,并传递要读取的镜像列表文件的名称,可以拉取所有镜像。请确保在运行此代码之前已经安装了Docker并配置了正确的环境。
- docker images | grep registry | sed 's/registry.baidu-int.com/registry.can6.com:5000\/baidu/g' | awk '{print "docker tag"" " $3" "$1":"$2}'|sh
-
- #push镜像
- for i in `docker images|grep registry.can6.com:5000/baidu |awk '{print $1":"$2}'`
- do
- docker push $i
- done
docker images|grep regis|awk '{print $1":"$2}'|while read image; do new_images=$(echo $image|sed s#registry.baidu-int.com#test.harbor.icbccs.com.cn:10443/baidu#g); docker tag $image $new_images; done
2.编写python脚本读取文本中的镜像列表
以下是一个简单的Python脚本,用于读取文本文件中的镜像列表:
- with open('image_list.txt', 'r') as f:
- for line in f:
- if line.startswith('image:'):
- image = line.strip().split(': ')[1]
- print(image)
假设镜像列表文件名为image_list.txt,该脚本将打开文件并逐行读取其内容。对于每一行,它会检查是否以image:开头,如果是,则提取出镜像名称并打印出来。请注意,此脚本假设每个镜像的名称都在以image:开头的行中,且名称是该行的第二个字段。如果实际情况不同,需要相应地修改代码。
3.Python脚本执行拉取docker镜像
可以使用Python的subprocess模块来执行命令行操作,比如拉取docker镜像。具体操作如下:
- import subprocess
-
- # 定义要拉取的镜像名称和标签
- image_name = "nginx"
- image_tag = "latest"
-
- # 拉取镜像
- process = subprocess.Popen(f"docker pull {image_name}:{image_tag}", shell=True, stdout=subprocess.PIPE)
- output, error = process.communicate()
-
- # 打印输出结果
- if error:
- print(f"拉取镜像失败:{error}")
- else:
- print(f"拉取镜像成功:{output.decode('utf-8')}")
在上述代码中,使用subprocess.Popen()方法执行命令行操作,其中shell参数需要设置为True,表示执行的是命令行脚本。stdout参数设置为subprocess.PIPE,表示将命令行输出结果保存到output变量中。最后使用communicate()方法获取命令行的输出结果,并将结果打印到控制台。
4.Python执行脚本从本地文件中读取docker仓库镜像列表时,执行拉取 docker 镜像
要在Python中执行拉取Docker镜像的操作,可以使用subprocess模块来执行命令行命令。
以下是一个示例代码,它从本地文件中读取Docker仓库镜像列表,并使用docker pull命令拉取每个镜像:
- import subprocess
-
- def pull_docker_images(filename):
- with open(filename, 'r') as file:
- for line in file:
- image = line.strip()
- command = ['docker', 'pull', image]
- try:
- subprocess.run(command, check=True)
- print(f'Successfully pulled image: {image}')
- except subprocess.CalledProcessError as e:
- print(f'Error pulling image: {image}')
- print(e)
-
- # 从文件中读取镜像列表并拉取
- pull_docker_images('image_list.txt')
上述代码中,pull_docker_images函数接受一个文件名作为参数,并使用with open语句打开文件。然后,对于文件中的每一行,它使用strip方法去除行尾的空白字符,并将镜像名称存储在image变量中。
接下来,它构建一个命令列表,其中第一个元素是docker命令,第二个元素是pull命令,第三个元素是镜像名称。
然后,使用subprocess.run函数执行命令,并使用check=True参数来检查命令执行是否成功。如果命令执行成功,将打印成功消息;如果命令执行失败,将打印错误消息和异常。
最后,通过调用pull_docker_images函数,并传递要读取的镜像列表文件的名称,可以拉取所有镜像。请确保在运行此代码之前已经安装了Docker并配置了正确的环境。
5.Python执行脚本从本地文件中读取docker镜像列表时,调用search接口查询结果,curl -X 'GET' \ 'https://harbor.com.cn/api/v2.0/search?q=python' \ -H 'accept: application/json'。将结果打印输出
你可以使用Python的requests库来发送HTTP请求并获取查询结果。以下是一个示例代码:
- import requests
-
- url = 'https://harbor.com.cn/api/v2.0/search?q=python'
- headers = {'accept': 'application/json'}
-
- response = requests.get(url, headers=headers)
- result = response.json()
-
- print(result)
这段代码发送一个GET请求到指定的URL,并使用指定的请求头。然后,将返回的响应解析为JSON格式,并将结果打印输出。请确保你已经安装了requests库,如果没有安装可以使用以下命令进行安装:
pip3 install requests
注意,你需要替换URL中的buddy为你要查询的镜像名称。
6.Python执行脚本从本地文件中读取docker镜像列表image_list时,调用search接口查询image,curl -X 'GET' \ 'https://harbor.com.cn/api/v2.0/search?q=buddy' \ -H 'accept: application/json'。将结果打印输出
你可以使用Python的requests库来发送HTTP请求并获取查询结果。以下是一个示例代码:
- import requests
-
- # 从文件中读取镜像列表
- with open('image_list.txt', 'r') as file:
- image_list = file.readlines()
-
- # 去除每个镜像名称中的换行符
- image_list = [image.strip() for image in image_list]
-
- # 遍历镜像列表,查询每个镜像的信息
- for image in image_list:
- url = f'https://harbor.com.cn/api/v2.0/search?q={image}'
- headers = {'accept': 'application/json'}
-
- response = requests.get(url, headers=headers)
- result = response.json()
-
- print(f'Result for image {image}:')
- print(result)
- print()
这段代码首先从文件中读取镜像列表,并将每个镜像的名称存储在一个列表中。然后,遍历该列表,为每个镜像构建请求URL,并发送GET请求。最后,将返回的响应解析为JSON格式,并将结果打印输出。
请确保你已经安装了requests库,如果没有安装可以使用以下命令进行安装:
pip3 install requests
同时,请将代码中的image_list.txt替换为你存储镜像列表的文件路径。
7.python3 docker镜像地址 截取仓库和镜像名
请使用以下代码截取仓库和镜像名:
- docker_image = "docker镜像地址"
- repository, image_name = docker_image.split("/")
这将把仓库和镜像名分别存储在repository和image_name变量中。
8.python3 docker镜像地址截去镜像地址,只留仓库镜像名
如果你想要截取仓库和镜像名,可以使用以下代码:
- docker_image = "docker镜像地址"
- repository, image_name = docker_image.split("/", 1)[-1].split(":")[0]
这将把仓库和镜像名分别存储在repository和image_name变量中。这里使用了split("/", 1)来限制分割次数为1,以确保只截取第一个斜杠之后的内容。然后使用split(":")[0]来截取冒号之前的内容,以去除标签信息。
9.Python执行脚本从本地文件中读取docker镜像列表image_list时,截去镜像地址,只留镜像名,调用search接口查询镜像image,curl -X 'GET' \ 'https://harbor.com.cn/api/v2.0/search?q=buddy' \ -H 'accept: application/json'。将结果打印输出
你可以使用以下代码从本地文件中读取docker镜像列表,并调用search接口查询镜像信息,并将结果打印输出:
- import requests
-
- # 读取docker镜像列表
- with open("image_list.txt", "r") as file:
- image_list = file.readlines()
-
- # 去除镜像地址,只保留镜像名
- image_names = [image.strip().split("/")[-1] for image in image_list]
-
- # 查询镜像信息并打印输出
- for image_name in image_names:
- url = f"https://harbor.com.cn/api/v2.0/search?q={image_name}"
- headers = {
- "accept": "application/json"
- }
- response = requests.get(url, headers=headers)
- result = response.json()
- print(result)
这段代码假设你将docker镜像列表保存在名为image_list.txt的文件中,每行一个镜像地址。首先,它读取文件内容并将每个镜像地址截取为镜像名。然后,对于每个镜像名,它构建查询URL并发送GET请求到Harbor的search接口。最后,它将返回的结果打印输出。
请注意,你需要安装requests库,可以通过pip3 install requests进行安装。另外,根据你的实际情况,你可能需要根据需要修改代码中的URL和headers。
9.使用python获取Apollo所有配置
- # -*- coding: utf-8 -*-
- import json
- import logging
- import sys
- import threading
- import time
-
- import requests
-
-
- # 获取Apollo配置
-
- class ApolloClient(object):
- def __init__(self, app_id, cluster='default', config_server_url='http://localhost:8080', timeout=35, ip=None):
- self.config_server_url = config_server_url
- self.appId = app_id
- self.cluster = cluster
- self.timeout = timeout
- self.stopped = False
- self.init_ip(ip)
-
- self._stopping = False
- self._cache = {}
- self._notification_map = {'application': -1}
-
- def init_ip(self, ip):
- if ip:
- self.ip = ip
- else:
- import socket
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.connect(('8.8.8.8', 53))
- ip = s.getsockname()[0]
- finally:
- s.close()
- self.ip = ip
-
- # Main method
- def get_value(self, key, default_val=None, namespace='application', auto_fetch_on_cache_miss=False):
- if namespace not in self._notification_map:
- self._notification_map[namespace] = -1
- logging.getLogger(__name__).info("Add namespace '%s' to local notification map", namespace)
-
- if namespace not in self._cache:
- self._cache[namespace] = {}
- logging.getLogger(__name__).info("Add namespace '%s' to local cache", namespace)
- # This is a new namespace, need to do a blocking fetch to populate the local cache
- self._long_poll()
-
- if key in self._cache[namespace]:
- return self._cache[namespace][key]
- else:
- if auto_fetch_on_cache_miss:
- return self._cached_http_get(key, default_val, namespace)
- else:
- return default_val
-
- # Start the long polling loop. Two modes are provided:
- # 1: thread mode (default), create a worker thread to do the loop. Call self.stop() to quit the loop
- # 2: eventlet mode (recommended), no need to call the .stop() since it is async
- def start(self, use_eventlet=False, eventlet_monkey_patch=False, catch_signals=True):
- # First do a blocking long poll to populate the local cache, otherwise we may get racing problems
- if len(self._cache) == 0:
- self._long_poll()
- if use_eventlet:
- import eventlet
- if eventlet_monkey_patch:
- eventlet.monkey_patch()
- eventlet.spawn(self._listener)
- else:
- if catch_signals:
- import signal
- signal.signal(signal.SIGINT, self._signal_handler)
- signal.signal(signal.SIGTERM, self._signal_handler)
- signal.signal(signal.SIGABRT, self._signal_handler)
- t = threading.Thread(target=self._listener)
- t.start()
-
- def stop(self):
- self._stopping = True
- logging.getLogger(__name__).info("Stopping listener...")
-
- def _cached_http_get(self, key, default_val, namespace='application'):
- url = '{}/configfiles/json/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace,
- self.ip)
- r = requests.get(url)
- if r.ok:
- data = r.json()
- self._cache[namespace] = data
- logging.getLogger(__name__).info('Updated local cache for namespace %s', namespace)
- else:
- data = self._cache[namespace]
-
- if key in data:
- return data[key]
- else:
- return default_val
-
- def _uncached_http_get(self, namespace='application'):
- url = '{}/configs/{}/{}/{}?ip={}'.format(self.config_server_url, self.appId, self.cluster, namespace, self.ip)
- r = requests.get(url)
- if r.status_code == 200:
- data = r.json()
-
- self._cache[namespace] = data['configurations']
- logging.getLogger(__name__).info('Updated local cache for namespace %s release key %s: %s',
- namespace, data['releaseKey'],
- repr(self._cache[namespace]))
- return data['configurations']
-
- def _signal_handler(self, signal, frame):
- logging.getLogger(__name__).info('You pressed Ctrl+C!')
- self._stopping = True
-
- def _long_poll(self):
- url = '{}/notifications/v2'.format(self.config_server_url)
- notifications = []
- for key in self._notification_map:
- notification_id = self._notification_map[key]
- notifications.append({
- 'namespaceName': key,
- 'notificationId': notification_id
- })
-
- r = requests.get(url=url, params={
- 'appId': self.appId,
- 'cluster': self.cluster,
- 'notifications': json.dumps(notifications, ensure_ascii=False)
- }, timeout=self.timeout)
-
- logging.getLogger(__name__).debug('Long polling returns %d: url=%s', r.status_code, r.request.url)
-
- if r.status_code == 304:
- # no change, loop
- logging.getLogger(__name__).debug('No change, loop...')
- return
-
- if r.status_code == 200:
- data = r.json()
- for entry in data:
- ns = entry['namespaceName']
- nid = entry['notificationId']
- logging.getLogger(__name__).info("%s has changes: notificationId=%d", ns, nid)
-
- self._uncached_http_get(ns)
- self._notification_map[ns] = nid
- else:
- logging.getLogger(__name__).warn('Sleep...')
- time.sleep(self.timeout)
-
- def _listener(self):
- logging.getLogger(__name__).info('Entering listener loop...')
- while not self._stopping:
- self._long_poll()
-
- logging.getLogger(__name__).info("Listener stopped!")
- self.stopped = True
-
-
- def getConfData(config_server_url, appId, cluster, namespace):
- url = '{}/configs/{}/{}/{}'.format(config_server_url, appId, cluster, namespace)
- r = requests.get(url)
- if r.status_code == 200:
- data = r.json()
-
- return data['configurations']
-
-
- if __name__ == '__main__':
-
- appids = []
- with open("./appid.txt", 'r') as f:
- appids = f.readlines()
- appids = [appid.strip() for appid in appids]
-
- config_server_url = "http://1.1.1.1:8080"
- cluster = "DEV"
- namespace = "application"
- ip = "none"
-
- for appid in appids:
- if appid.strip() != "":
- # print(appid)
- data=getConfData(config_server_url,appid,cluster,namespace)
-
- # data=json.loads(str(data))
- # print(type(data))
- # properties_data = "\n".join([f"{key}={value}" for key, value in data.items()])
- #
- with open("D:/pythonProject/"+appid+".txt","w+") as f:
- f.write(str(data))
-
-
10.在Apollo中批量给新创建的用户授可编辑权限
- # !/usr/bin/env python
- # -*-coding:utf-8 -*-
-
- """
- # File : apollo.py
- # Time :2023/09/19/001 16:55
- # Author :theo.wu
- # version :python 3.8
- # Description:模拟Apollo登陆,获取所有envs/appid添加wutao账号编辑权限
- """
-
- import requests
- import json
-
- class apollo:
- def __init__(self):
- self.username = "apollo"
- self.password = " "
- self.apollourl = "https://apollo-azero.cn"
- self.addauth = " "
- self.session = requests.session()
- self.headers = self.setheaders()
-
- def login(self):
- url = '{}/signin'.format(self.apollourl)
- payload = {
- "username": self.username,
- "password": self.password,
- "login-submit": "登录"
- }
- self.session.post(url=url,data=payload)
- res = self.session.cookies
- return (res.get_dict()['JSESSIONID'])
-
- def setheaders(self):
- session = self.login()
- myheaders = {
- "Accept": "application/json, text/plain, */*",
- "Cookie": " ",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36"
- }
- return (myheaders)
-
- def getenvs(self):
- url = 'http://{}/envs'.format(self.apollourl)
- head = self.headers
- res = requests.get(url=url,headers=head)
- return (res.text)
-
- def getappid(self):
- url = 'http://{}/apps'.format(self.apollourl)
- params = {'page': 0}
- res = requests.get(url, params=params,headers=self.headers)
- date = json.loads(res.text)
- allapp = [d['appId'] for d in date]
- return (allapp)
-
- def addauth(self,appid):
- uri = 'http://{0}/apps/{1}/namespaces/application/roles/ModifyNamespace'.format(self.apollourl,appid)
- playload = self.addauth
- try:
- response = requests.post(url=uri,data=playload,headers=self.headers)
- response.raise_for_status()
- except requests.exceptions.HTTPError as error:
- json_data = error.response.json()
- error_message = json_data['message']
- print(appid,error_message)
-
- if __name__ == '__main__':
- apollo_obj = apollo()
- for i in apollo_obj.getappid():
- apollo_obj.addauth(i)
11.在Apollo中批量给新创建的用户授于发布权限
12.需求
批量解析域名的dns,获取负载均衡器地址
13.
| 业务组 | 真实金额消费 | ||||||||||||||||||
| 413 | 420 | ||||||||||||||||||
| 002 | 15 | ||||||||||||||||||
| 408 | 101.5 | ||||||||||||||||||
| 413 | 15 |
大概就是读取业务组列表和金额两列,然后按照业务维度来叠加成本。
输出一个5行20列的excel表格,其中第一列是业务组,第20列是真实金额消费。
以下是一个包含 5 行和 20 列的 Excel 表格,用于展示不同业务组的真实金额消费。您可以根据需要进行调整。
```lua
| 业务组 | 消费项1 | 消费项2 | 消费项3 | 消费项4 | 消费项5 | 消费项6 | 消费项7 | 消费项8 | 消费项9 | 消费项10 | 消费项11 | 消费项12 | 消费项13 | 消费项14 | 消费项15 | 消费项16 | 消费项17 | 消费项18 | 消费项19 | 真实金额 |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| A | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 10000 |
| B | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 5000 |
| A | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 20000 |
| B | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 3000 |
| C | 0 | 0 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 15000 |
```
使用python读取上面excel表格的业务组和真实消费金额两列,然后按照业务组来叠加成本。
这个表格中的每个业务组都有一个唯一的颜色,让它们更容易被识别。请根据需要对这个示例表格进行调整。
要使用 Python 读取 Excel 表格的业务组和真实消费金额两列,并按照业务组叠加成本,您可以使用 `pandas` 库。首先,确保已经安装了 `pandas`。如果没有,可以使用以下命令安装:
```bash
pip install pandas
```
接下来,您可以使用以下代码读取 Excel 表格并按照业务组叠加成本:
```python
import pandas as pd
# 假设您已经将 Excel 表格保存为 'input.xlsx'
data = pd.read_excel('input.xlsx')
# 创建一个包含业务组和真实消费金额的新数据框
group_cost = data['业务组'] + '_' + data['真实消费金额']
# 按照业务组对消费金额进行汇总
group_cost_sum = data.groupby('业务组').agg({'真实消费金额': 'sum'})
# 显示结果
print(group_cost_sum)
```
这段代码将读取 Excel 表格,并根据业务组对真实消费金额进行汇总。结果将在命令行或终端中显示。
- package main
-
- import (
- "fmt"
- "strconv"
-
- "github.com/360EntSecGroup-Skylar/excelize"
- )
-
- func main() {
- // 加载 Excel 文件
- f, err := excelize.OpenFile("F:\\soundai\\input1.xlsx")
- if err != nil {
- fmt.Println(err)
- return
- }
-
- // 读取业务组和真实金额两列数据
- rows := f.GetRows("Sheet1")
-
- // 创建一个映射来存储每个业务组的真实金额总和
- groupSum := make(map[string]float64)
-
- // 遍历每一行数据
- for rowIndex, row := range rows {
- // 跳过表头,表头从第2行开始
- if rowIndex < 1 {
- continue
- }
- // 提取业务组和真实金额
- group := row[0]
- amount := row[19]
-
- // 将真实金额从字符串转换为整数
- amountInt, err := strconv.ParseFloat(amount, 64)
- if err != nil {
- fmt.Println(err)
- return
- }
-
- // 计算每个业务组的真实金额总和
- groupSum[group] += amountInt
- }
-
- // 输出每个业务组的真实金额总和
- for group, sum := range groupSum {
- fmt.Printf("业务组: %s, 真实金额总和: %v\n", group, sum)
- }
- }