• 基于python用telnet连接设备+查询指定文件内容的脚本


    啊我又在写脚本…
    这次是为了在不同网段下的设备查询帧率和GPIO是否挂起
    记录一些tips吧
    1.要在不同的网段下面找到所有在线的摄像机设备
    这个问题我想了几种解决方式,比如ping所有的ip然后抓取所有可以ping通的,然后检查是否能打开telnet协议,如果不行就认为它不是摄像机设备。但是在实践中这个方法的问题在于并不是每个设备的telnet协议都是可以通过外部的脚本打开的。所有误判率非常高。还有一种方式就是用内网udp进行广播并且发送报文里加上开启telnet的命令,得到回应的报文里获取ip。这个方法就比较靠谱,奈何我计网天天逃课…这个方式实现了以后再写一篇博文记录吧…
    最后我用了公司内部的测试软件拉取所有在线摄像机的所有信息内容存成txt,然后用split把ip裁剪出来的本方法。非常快捷但是如果出现了新的ip或者ip变动可能无法及时发现…暂且这样
    2.telnetlib库里面的命令执行返回值
    如果要读取指令执行之后的返回值,最先想到的是self.session.read_very_eager().decode(‘ascii’)这个函数,不过为了抓取返回值,可以将执行和返回结果直接封装成一个方法。然后得到返回值。如果执行一次程序而read_very_eager()了两次,第二次就只能抓到None。

     def execute_command(self, command, time_wait):
            # 执行命令并且等待
            self.session.write(command + b'\n')
            time.sleep(time_wait)
            # 获取命令结果
            command_result = self.session.read_very_eager().decode('ascii')
            return command_result
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.grep和awk
    对命令返回值处理和检索的时候要用的,两个可以连起来用,grep定位行,awk定位列,可以精确到某一个字符的抓取。
    并且在多个索引字检索的时候grep可以连用。
    在vpss这个文件里面先寻找行里面又RGB_888_PLANNAR和# 6的,再在检索到底的这一行里面把第11列的值定位出来,就是我需要的帧率的值。检查GPIO的状态也同理。

    "grep 'RGB_888_PLANAR ' vpss | grep '# 6' | awk '{print $11}'"
    
    • 1
    "grep 'gpio-433' gpio|awk '{print $6}'"
    
    • 1

    4.console里的执行结果写到文件里面
    这个就很简单了就直接在print里面加一个file=f,f为打开的规定了名字的文件。不过这样的话是直接写到文件里面的,不在console里面显示了,为了更方便看到结果我就写了两边。这样不是很优雅。

        with open(nowTime + ' logger.txt', 'w') as f:
     		print("********************** PROGRAM CONTINUE ***********************", file=f)
    
    • 1
    • 2

    代码记录

    # 一些全局要用的变量
    port = 23
    USER = 'root'
    PSW_1 = "*****"
    PSW_2 = "168****"
    nowTime = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime(time.time()))
    ip_list = []
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    def pingHost(host):
        # for i in range(1, 256):
        status1 = 'PING SUCCESS'
        status2 = 'PING FAILED'
        nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        p = os.popen("ping " + host + " -n 2")
        line = p.read()
        if "无法访问目标主机" in line:
            print("***********" + nowTime, host, status2 + "**********", file=f)
            return False
        else:
            print("***********" + nowTime, host, status1 + "**********", file=f)
            return True
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Telnet类,里面放最常用的几个方法

    class TelnetClient:
        def __init__(self):
            self.session = telnetlib.Telnet()
    
        # telnet连接
        def telnet_connect(self, client_addr, port_num):
            try:
                self.session = telnetlib.Telnet(client_addr, port=port_num)
                time.sleep(1)
                self.session.open(client_addr, port=port_num,timeout=3)  # telnet默认开启本行注释掉
                time.sleep(1)
            except:
                print("======================" + client_addr + " OPEN TELNET FAILED======================", file=f)
                print("======================" + client_addr + " OPEN TELNET FAILED======================")
                print("*************** 请在'摄像机测试程序'中手动开启telnet后重试 ******************************")
                # print("**********************THIS DEVICE IS NOT A IPC ***********************")
                # print("**********************THIS DEVICE IS NOT A IPC ***********************", file=f)
                print("********************** PROGRAM CONTINUE ***********************")
                print("********************** PROGRAM CONTINUE ***********************", file=f)
                return False
    
    
            try:
                self.session.read_until(b'cdyctx login:', timeout=2)
                time.sleep(2)
                self.session.write(USER.encode('ascii') + b'\n')
                time.sleep(2)
                self.session.read_until(b'Password:', timeout=2)
                time.sleep(2)
                self.session.write(PSW_1.encode('ascii') + b'\n')
                time.sleep(2)
                # command_result = self.session.read_some().decode('ascii')
                return True
            except:
                logging.warning('telnet 登陆失败')
                return False
    
    
        # telnet 退出登录
        def logout_telnet(self):
            self.session.close()
    
        def readState(self):
            command_result = self.session.read_very_eager().decode('ascii')
            print(command_result)
    
        # 执行需要返回值的指令
    
        def execute_command(self, command, time_wait):
            # 执行命令并且等待
            self.session.write(command + b'\n')
            time.sleep(time_wait)
            # 获取命令结果
            command_result = self.session.read_very_eager().decode('ascii')
            return command_result
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    执行核心任务的代码,在登陆的时候修改了一下如果登陆失败就尝试另一种密码的情况。
    判断了帧率如果大于15就算合格,gpio拉起的时候对应的字符为hi。
    对返回的结果的处理用到了map方法和切分,只要我们需要的一部分数据,注意数据类型的转换。

    def conn_work(devip):
        telnet_client = TelnetClient()
    
        # 1、 telnet连接
        if telnet_client.telnet_connect(devip,23):
    
            time.sleep(2)
            print("======================" + devip + " TELNET WORKING*=================")
            time.sleep(2)
    
            # 2、grep查找帧率
            login_res = telnet_client.session.read_very_eager().decode('ascii')
            if 'Login incorrect' in login_res:
                telnet_client.session.read_until(b'cdyctx login:', timeout=2)
                time.sleep(2)
                telnet_client.session.write(USER.encode('ascii') + b'\n')
                time.sleep(2)
                telnet_client.session.read_until(b'Password:', timeout=2)
                time.sleep(2)
                telnet_client.session.write(PSW_2.encode('ascii') + b'\n')
                time.sleep(2)
            else:
                pass
            telnet_client.execute_command(b'cd /proc/cvitek', 2)
            time.sleep(2)
    
            fr_str = "grep 'RGB_888_PLANAR ' vpss | grep '# 6' | awk '{print $11}'"
            fr_res = telnet_client.execute_command(bytes(fr_str, "utf-8"), 2)
           
            if ' No such file or directory' in fr_res:
                print("********************* NO CVIKET **************************************")
            elif "can\'t cd to /proc/cviket/'" in fr_res:
                print("********************* NO CVIKET **************************************")
    
            else:
                time.sleep(1)
                a, b, c = map(str,fr_res.split('\n'))
                b = b.strip()
    
                fr = int(b)
                if fr >= 15:
                    print("*****************" + devip + " FRAME RATE IS " + b + " *******************", file=f)
                    print("*****************" + devip + " FRAME RATE IS " + b + " *******************")
                    print("***********************FRAME RATE IS PASS!*******************", file=f)
                    print("***********************FRAME RATE IS PASS!*******************")
    
                elif fr < 15:
                    print("**********************" + devip + " FRAME RATE IS " + b + " ******************", file=f)
                    print("**********************" + devip + " FRAME RATE IS " + b + " ******************")
    
                    print("**********************FRAME RATE NOT PASS!*******************", file=f)
                    print("**********************FRAME RATE NOT PASS!*******************")
                else:
                    print("********************* NO FR FROM CVIKET **************************************")
    
                    time.sleep(2)
    
            # 3、检查gpio是否拉起
            gpio_str1 = "cd /sys/kernel/debug"
            telnet_client.execute_command(bytes(gpio_str1, "utf-8"), 1)
            time.sleep(1)
            gpio_str2 = "grep 'gpio-433' gpio|awk '{print $6}'"
            command_result2 = telnet_client.execute_command(bytes(gpio_str2, "utf-8"), 1)
            #
            # command_result2 = telnet_client.session.read_very_eager().decode('ascii')
            time.sleep(1)
            #print(command_result2+"!!!")
            A, B, C = map(str,command_result2.split('\n'))
    
    
            if 'hi' in B:
                print('*********************** GPIO-344 IS HIGH *****************************', file=f)
                print('*********************** GPIO-344 IS HIGH *****************************')
    
            elif 'lo' in B:
               # print(command_result2)
                print('*********************** GPIO-344 IS LOW *********************************', file=f)
                print('*********************** GPIO-344 IS LOW *********************************')
            else:
               # print(command_result2)
                print('********************** NO GPIO *********************************', file=f)
                print('********************* NO GPIO *********************************')
        else:
            # telnet_client.logout_telnet()
            pass
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    在开始之前ping一下检验是否在线

    def start_work(devIp):
        ping_result = pingHost(devIp)
        if ping_result:
            print("***********START TELNET CONNECT " + devIp + "***********", file=f)
            conn_work(devIp)
    
        else:
            input("*********************PRESS ENTER TO TRY AGAIN**********************", file=f)
            conn_work(devIp)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    if __name__ == '__main__':
        with open(nowTime + ' logger.txt', 'w') as f:
            print("*******************AUTO CHECK PROGRAM IS LOADING*******************")
            print("************BUILD DATE 2022-6-27 MADE BY MTY FROM YCTX************")
            print("******************************************************************")
    
         
            print("************************ IP LIST IS SEARCHING ********************************")
            print("************************ IP LIST IS SEARCHING ********************************", file=f)
            txt_name = input("请输入导出表的名称(例如:net2.txt):")
            with open(txt_name) as ipc:
                for line in ipc.readlines():  ##readlines(),函数把所有的行都读取进来;
                    ipc_line = line.strip()  ##删除行后的换行符,img_file 就是每行的内容啦
                    ipc_id = ipc_line.split(',')
                    id = ipc_id[2]
                    print(id)
                    print(id,file=f)
                    ip_list.append(id)
            for i in ip_list:
                start_work(i)
                print('\n', file=f)
                print()
        print("************************CHECK FINSHIED****************************")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    了解4-20mA信号
    Vue--Router--解决多路由复用同一组件页面不刷新问题
    Git——IDEA集成GitHub详细操作
    [重庆思庄每日技术分享]-从dg备库expdp导出数据
    深度学习之图像识别核心技术与案例实战
    SpringBoot配置文件 yaml的使用
    npm ERR! code ERESOLVE
    字节数组转成BLOB对象(不连接数据库)
    Ansible的基本配置
    (AS笔记)Android的原生网络请求工具类——亲测可用
  • 原文地址:https://blog.csdn.net/qq_42102190/article/details/125527250