• 基于python3协程的log传输工具


    python3协程概述

    协程的使用场景和多线程有一些重合。从某种意义上来讲能用协程实现的场景,多线程都可以予以取代。
    但是有些多线程可以实现的,协程解决不掉。
    协程本质还是单线程模型,只是在一些io操作阻塞返回的情况下去执行其他的任务。从而体现出多线程的假象。
    也就是说协程更多的用于一些有io需求的场景下。比如网络io,以及文件io。


    问题引入

    在管理服务器资源的时候,需要从不同的服务器上收集特定的文件。最直接就是手动scp对应的文档,次数多了,就显得十分冗长,也就变的让人十分厌烦。尤其是如果还需要额外的登录一层ssh,也就是说文件处在容器之中。
    在目标host增多的情况下,一般采用多线程策略,使用的资源就会上升,这也是如今异步协程出现的目的。可以在节省资源的基础上实现并发的表象。

    常规解决策略

    • shell脚本
    • python脚本加上多线程

    尝试新的解决策略

    • 个人面临的问题
      编译服务器对应的host,称之为Remote Host,在代码编译过程之后,会在LXC容器中运行相应的测试命令。测试运行完毕需要copy对应特定目录下的日志到目标服务器(DST Host),比如挂载在PC上的目录,或者打开ssh server的windows办公电脑,方便直接查阅log文件。

    方案设计

    使用python3引入的asyncio模块。可以很方便的使用python3脚本,引入协程的便捷。
    python3对协程的支持越来越好,当然也不是说其他的策略无法实现解决当前的问题,多线程解决方案永远是可行的。
    只是想换个思路和玩法。尝试一下python3的协程。

    直接上代码(将敏感信息隐藏到config.ini文件)

    • 语言 python3
    • 用到的模块asyncio, asyncssh .etc
    import asyncio, asyncssh
    import configparser
    import datetime
    import json
    import time
    
    config = configparser.ConfigParser()
    config.read('/home/chuanqin/script/conf.ini')
    
    # secret info
    SourceRemoteHost = config['SOURCE']['SourceRemoteHost']
    SourceRemotePassWord = config['SOURCE']['SourceRemotePassWord']
    SctLogPath = config['SOURCE']['SctLogPath']  # log path
    
    DefaultUserName = config['USER']['DefaultUserName']  # remote host user name
    ContainerUserName = config['USER']['ContainerUserName']   # LXC user name which running on remote host
    ContainerPassWord = config['USER']['ContainerPassWord']   # LXC password
    
    #command info
    '''
    below command used for reterive the hostname, LXC ip, start LXC container
    '''
    hostNameCmd = config['COMMAND']['hostNameCmd']
    LxcInfo = config['COMMAND']['lxcInfoCmd']
    LxcStart = config['COMMAND']['lxcStart']
    
    '''
    asyncssh moudle provide async ssh wrapper, under its scp interface, can transfer in
    one handler used for indicate how much byte tansfered(in bytes unit).
    below function used to convered to human readable format.
    '''
    def convertByteToHumanReadable(size):
        data = []
        unit = ['B', 'KB,', 'MB,', 'GB,']
        while size :
            data.append(size % 1024)
            size = size // 1024
        result = ''
        for index, item in enumerate(data):
            result = str(item) + unit[index] + result
        return result
    
    # handler tranfer to scp interface of asyncssh
    def progress_handler(src, dst, offset, size):
        if offset == size:
            print('copying %s to %s offset %s, size %s'%(src, dst, offset, convertByteToHumanReadable(size)))
    
    # get the LXC ip which host on remote host
    def reterive_ip(response):
        for line in response.splitlines():
            line = line.strip()
            if line.startswith('IP'):
                default_ip = line.split(':')[-1].strip()
                print('LXC container ip:' + default_ip)
                return default_ip
    
    # get the running state of LXC container
    def reterive_state(response):
        for line in response.splitlines():
            line = line.strip()
            if line.startswith('State'):
                default_state = line.split(':')[1].strip()
                if 'RUNNING' == default_state:
                    return True
                else :
                    return False
    
    # running local cmd by the asyncio on machine which running this python3 script
    async def local_run(cmd):
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE)
        stdout, _ = await proc.communicate()
        return stdout.decode().strip()
    
    async def get_timestamp():
        date = await local_run('date')
        print('date: %s' % date)
    
    async def get_local_hostname():
        local = await local_run(hostNameCmd)
        print('current hostname: %s'%(local))
    
    '''
    to save the transfer time, first will compress the sct(sub-component test) log
    '''
    async def compress_sct_logs(RemoteHost, userName, password):
        # below ssh to remote host
        async with asyncssh.connect(RemoteHost, username=userName, password=password, known_hosts=None) as conn:
            hostname = await conn.run('hostname', check=True)
            print('remote hostname:%s'%hostname.stdout)
            result = await conn.run(LxcInfo, check=True)
            containerIp = reterive_ip(result.stdout)
            if not reterive_state(result.stdout):
                print('current container not running, start it right now')
                await conn.run(LxcStart, check=True)
            # below ssh to LXC container based on Establish ssh to remote host
            async with conn.connect_ssh(host=containerIp, username=ContainerUserName, password=ContainerPassWord, known_hosts=None) as container:
                # compress the log
                await container.run('cd %s && rm logs.tar.gz'%SctLogPath)
                print('compressing the sct logs')
                await container.run('cd %s && tar -czvf logs.tar.gz logs'%SctLogPath, check=True)
                print('finish compress the sct logs')
        return containerIp
    
    # prepare the Dst hosts(which the sct logs will put), because its in async mode,
    # so we can scp log to many dst host in parallel
    async def prepareTheDsts():
        dsts = []
        for item in json.loads(config.get("DST", "dsts")):
            dsts.append((item['Host'], item['User'], item['Password'], item['Path']))
        return dsts
    
    async def execute_in_k3(RemoteHost, userName, password, dst, containerIp):
        async with asyncssh.connect(RemoteHost, username=userName, password=password, known_hosts=None) as conn:
            async with conn.connect_ssh(host=containerIp, username=ContainerUserName, password=ContainerPassWord, known_hosts=None) as container:
                await scpLogs(container, *dst)
    
    # implement the scp function
    async def scpLogs(container, dstHost, dstUserName, dstPassword, dstPath):
        async with asyncssh.connect(dstHost, username=dstUserName, password=dstPassword, known_hosts=None) as dst:
            await dst.run('rm -rf %s*'%dstPath)
            await asyncssh.scp((container, '%slogs.tar.gz'%SctLogPath), (dst, dstPath), progress_handler=progress_handler)
            await dst.run('cd %s && tar -zxvf logs.tar.gz'%dstPath, check=True)
    
    # async running in order
    async def sequence():
        containerIp = await compress_sct_logs(SourceRemoteHost, DefaultUserName, SourceRemotePassWord)
        dsts = await prepareTheDsts()
        tasks = (execute_in_k3(SourceRemoteHost, DefaultUserName, SourceRemotePassWord, dst, containerIp) for dst in dsts)
        await asyncio.gather(get_local_hostname(), *tasks)
        await get_timestamp()
    
    def main():
        asyncio.run(sequence())
    
    if __name__ == "__main__":
        start = time.time()
        main()
        end = time.time()
        print('the whole process cost:%s'%(str(datetime.timedelta(seconds = end - start))))
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
  • 相关阅读:
    Springboot 配置线程池创建线程和配置 @Async 异步操作线程池
    Project Costs
    7.2 IDA 破解实例
    Day39 JMeter的使用
    Vim入门教程
    NPM相关命令
    Pycharm连接远程服务器 导入远程包时本地显示红色解决方法
    详解环境变量
    最装逼的基准测试工具套件 - JMH
    php 获取每月开始结束时间,指定月份的开始结束时间戳
  • 原文地址:https://blog.csdn.net/u011233383/article/details/126813436