协程的使用场景和多线程有一些重合。从某种意义上来讲能用协程实现的场景,多线程都可以予以取代。
但是有些多线程可以实现的,协程解决不掉。
协程本质还是单线程模型,只是在一些io操作阻塞返回的情况下去执行其他的任务。从而体现出多线程的假象。
也就是说协程更多的用于一些有io需求的场景下。比如网络io,以及文件io。
在管理服务器资源的时候,需要从不同的服务器上收集特定的文件。最直接就是手动scp对应的文档,次数多了,就显得十分冗长,也就变的让人十分厌烦。尤其是如果还需要额外的登录一层ssh,也就是说文件处在容器之中。
在目标host增多的情况下,一般采用多线程策略,使用的资源就会上升,这也是如今异步协程出现的目的。可以在节省资源的基础上实现并发的表象。
使用python3引入的asyncio模块。可以很方便的使用python3脚本,引入协程的便捷。
python3对协程的支持越来越好,当然也不是说其他的策略无法实现解决当前的问题,多线程解决方案永远是可行的。
只是想换个思路和玩法。尝试一下python3的协程。
import asyncio, asyncssh
import configparser
import datetime
import json
import time
config = configparser.ConfigParser()
config.read('/home/chuanqin/script/conf.ini')
# secret info
SourceRemoteHost = config['SOURCE']['SourceRemoteHost']
SourceRemotePassWord = config['SOURCE']['SourceRemotePassWord']
SctLogPath = config['SOURCE']['SctLogPath'] # log path
DefaultUserName = config['USER']['DefaultUserName'] # remote host user name
ContainerUserName = config['USER']['ContainerUserName'] # LXC user name which running on remote host
ContainerPassWord = config['USER']['ContainerPassWord'] # LXC password
#command info
'''
below command used for reterive the hostname, LXC ip, start LXC container
'''
hostNameCmd = config['COMMAND']['hostNameCmd']
LxcInfo = config['COMMAND']['lxcInfoCmd']
LxcStart = config['COMMAND']['lxcStart']
'''
asyncssh moudle provide async ssh wrapper, under its scp interface, can transfer in
one handler used for indicate how much byte tansfered(in bytes unit).
below function used to convered to human readable format.
'''
def convertByteToHumanReadable(size):
data = []
unit = ['B', 'KB,', 'MB,', 'GB,']
while size :
data.append(size % 1024)
size = size // 1024
result = ''
for index, item in enumerate(data):
result = str(item) + unit[index] + result
return result
# handler tranfer to scp interface of asyncssh
def progress_handler(src, dst, offset, size):
if offset == size:
print('copying %s to %s offset %s, size %s'%(src, dst, offset, convertByteToHumanReadable(size)))
# get the LXC ip which host on remote host
def reterive_ip(response):
for line in response.splitlines():
line = line.strip()
if line.startswith('IP'):
default_ip = line.split(':')[-1].strip()
print('LXC container ip:' + default_ip)
return default_ip
# get the running state of LXC container
def reterive_state(response):
for line in response.splitlines():
line = line.strip()
if line.startswith('State'):
default_state = line.split(':')[1].strip()
if 'RUNNING' == default_state:
return True
else :
return False
# running local cmd by the asyncio on machine which running this python3 script
async def local_run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, _ = await proc.communicate()
return stdout.decode().strip()
async def get_timestamp():
date = await local_run('date')
print('date: %s' % date)
async def get_local_hostname():
local = await local_run(hostNameCmd)
print('current hostname: %s'%(local))
'''
to save the transfer time, first will compress the sct(sub-component test) log
'''
async def compress_sct_logs(RemoteHost, userName, password):
# below ssh to remote host
async with asyncssh.connect(RemoteHost, username=userName, password=password, known_hosts=None) as conn:
hostname = await conn.run('hostname', check=True)
print('remote hostname:%s'%hostname.stdout)
result = await conn.run(LxcInfo, check=True)
containerIp = reterive_ip(result.stdout)
if not reterive_state(result.stdout):
print('current container not running, start it right now')
await conn.run(LxcStart, check=True)
# below ssh to LXC container based on Establish ssh to remote host
async with conn.connect_ssh(host=containerIp, username=ContainerUserName, password=ContainerPassWord, known_hosts=None) as container:
# compress the log
await container.run('cd %s && rm logs.tar.gz'%SctLogPath)
print('compressing the sct logs')
await container.run('cd %s && tar -czvf logs.tar.gz logs'%SctLogPath, check=True)
print('finish compress the sct logs')
return containerIp
# prepare the Dst hosts(which the sct logs will put), because its in async mode,
# so we can scp log to many dst host in parallel
async def prepareTheDsts():
dsts = []
for item in json.loads(config.get("DST", "dsts")):
dsts.append((item['Host'], item['User'], item['Password'], item['Path']))
return dsts
async def execute_in_k3(RemoteHost, userName, password, dst, containerIp):
async with asyncssh.connect(RemoteHost, username=userName, password=password, known_hosts=None) as conn:
async with conn.connect_ssh(host=containerIp, username=ContainerUserName, password=ContainerPassWord, known_hosts=None) as container:
await scpLogs(container, *dst)
# implement the scp function
async def scpLogs(container, dstHost, dstUserName, dstPassword, dstPath):
async with asyncssh.connect(dstHost, username=dstUserName, password=dstPassword, known_hosts=None) as dst:
await dst.run('rm -rf %s*'%dstPath)
await asyncssh.scp((container, '%slogs.tar.gz'%SctLogPath), (dst, dstPath), progress_handler=progress_handler)
await dst.run('cd %s && tar -zxvf logs.tar.gz'%dstPath, check=True)
# async running in order
async def sequence():
containerIp = await compress_sct_logs(SourceRemoteHost, DefaultUserName, SourceRemotePassWord)
dsts = await prepareTheDsts()
tasks = (execute_in_k3(SourceRemoteHost, DefaultUserName, SourceRemotePassWord, dst, containerIp) for dst in dsts)
await asyncio.gather(get_local_hostname(), *tasks)
await get_timestamp()
def main():
asyncio.run(sequence())
if __name__ == "__main__":
start = time.time()
main()
end = time.time()
print('the whole process cost:%s'%(str(datetime.timedelta(seconds = end - start))))