• 基于python用telnet连接设备+查询指定文件内容的脚本


    啊我又在写脚本…
    这次是为了在不同网段下的设备查询帧率和GPIO是否挂起
    记录一些tips吧
    1.要在不同的网段下面找到所有在线的摄像机设备
    这个问题我想了几种解决方式,比如ping所有的ip然后抓取所有可以ping通的,然后检查是否能打开telnet协议,如果不行就认为它不是摄像机设备。但是在实践中这个方法的问题在于并不是每个设备的telnet协议都是可以通过外部的脚本打开的。所有误判率非常高。还有一种方式就是用内网udp进行广播并且发送报文里加上开启telnet的命令,得到回应的报文里获取ip。这个方法就比较靠谱,奈何我计网天天逃课…这个方式实现了以后再写一篇博文记录吧…
    最后我用了公司内部的测试软件拉取所有在线摄像机的所有信息内容存成txt,然后用split把ip裁剪出来的本方法。非常快捷但是如果出现了新的ip或者ip变动可能无法及时发现…暂且这样
    2.telnetlib库里面的命令执行返回值
    如果要读取指令执行之后的返回值,最先想到的是self.session.read_very_eager().decode(‘ascii’)这个函数,不过为了抓取返回值,可以将执行和返回结果直接封装成一个方法。然后得到返回值。如果执行一次程序而read_very_eager()了两次,第二次就只能抓到None。

     def execute_command(self, command, time_wait):
            # 执行命令并且等待
            self.session.write(command + b'\n')
            time.sleep(time_wait)
            # 获取命令结果
            command_result = self.session.read_very_eager().decode('ascii')
            return command_result
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.grep和awk
    对命令返回值处理和检索的时候要用的,两个可以连起来用,grep定位行,awk定位列,可以精确到某一个字符的抓取。
    并且在多个索引字检索的时候grep可以连用。
    在vpss这个文件里面先寻找行里面又RGB_888_PLANNAR和# 6的,再在检索到底的这一行里面把第11列的值定位出来,就是我需要的帧率的值。检查GPIO的状态也同理。

    "grep 'RGB_888_PLANAR ' vpss | grep '# 6' | awk '{print $11}'"
    
    • 1
    "grep 'gpio-433' gpio|awk '{print $6}'"
    
    • 1

    4.console里的执行结果写到文件里面
    这个就很简单了就直接在print里面加一个file=f,f为打开的规定了名字的文件。不过这样的话是直接写到文件里面的,不在console里面显示了,为了更方便看到结果我就写了两边。这样不是很优雅。

        with open(nowTime + ' logger.txt', 'w') as f:
     		print("********************** PROGRAM CONTINUE ***********************", file=f)
    
    • 1
    • 2

    代码记录

    # 一些全局要用的变量
    port = 23
    USER = 'root'
    PSW_1 = "*****"
    PSW_2 = "168****"
    nowTime = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime(time.time()))
    ip_list = []
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    def pingHost(host):
        # for i in range(1, 256):
        status1 = 'PING SUCCESS'
        status2 = 'PING FAILED'
        nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        p = os.popen("ping " + host + " -n 2")
        line = p.read()
        if "无法访问目标主机" in line:
            print("***********" + nowTime, host, status2 + "**********", file=f)
            return False
        else:
            print("***********" + nowTime, host, status1 + "**********", file=f)
            return True
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Telnet类,里面放最常用的几个方法

    class TelnetClient:
        def __init__(self):
            self.session = telnetlib.Telnet()
    
        # telnet连接
        def telnet_connect(self, client_addr, port_num):
            try:
                self.session = telnetlib.Telnet(client_addr, port=port_num)
                time.sleep(1)
                self.session.open(client_addr, port=port_num,timeout=3)  # telnet默认开启本行注释掉
                time.sleep(1)
            except:
                print("======================" + client_addr + " OPEN TELNET FAILED======================", file=f)
                print("======================" + client_addr + " OPEN TELNET FAILED======================")
                print("*************** 请在'摄像机测试程序'中手动开启telnet后重试 ******************************")
                # print("**********************THIS DEVICE IS NOT A IPC ***********************")
                # print("**********************THIS DEVICE IS NOT A IPC ***********************", file=f)
                print("********************** PROGRAM CONTINUE ***********************")
                print("********************** PROGRAM CONTINUE ***********************", file=f)
                return False
    
    
            try:
                self.session.read_until(b'cdyctx login:', timeout=2)
                time.sleep(2)
                self.session.write(USER.encode('ascii') + b'\n')
                time.sleep(2)
                self.session.read_until(b'Password:', timeout=2)
                time.sleep(2)
                self.session.write(PSW_1.encode('ascii') + b'\n')
                time.sleep(2)
                # command_result = self.session.read_some().decode('ascii')
                return True
            except:
                logging.warning('telnet 登陆失败')
                return False
    
    
        # telnet 退出登录
        def logout_telnet(self):
            self.session.close()
    
        def readState(self):
            command_result = self.session.read_very_eager().decode('ascii')
            print(command_result)
    
        # 执行需要返回值的指令
    
        def execute_command(self, command, time_wait):
            # 执行命令并且等待
            self.session.write(command + b'\n')
            time.sleep(time_wait)
            # 获取命令结果
            command_result = self.session.read_very_eager().decode('ascii')
            return command_result
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    执行核心任务的代码,在登陆的时候修改了一下如果登陆失败就尝试另一种密码的情况。
    判断了帧率如果大于15就算合格,gpio拉起的时候对应的字符为hi。
    对返回的结果的处理用到了map方法和切分,只要我们需要的一部分数据,注意数据类型的转换。

    def conn_work(devip):
        telnet_client = TelnetClient()
    
        # 1、 telnet连接
        if telnet_client.telnet_connect(devip,23):
    
            time.sleep(2)
            print("======================" + devip + " TELNET WORKING*=================")
            time.sleep(2)
    
            # 2、grep查找帧率
            login_res = telnet_client.session.read_very_eager().decode('ascii')
            if 'Login incorrect' in login_res:
                telnet_client.session.read_until(b'cdyctx login:', timeout=2)
                time.sleep(2)
                telnet_client.session.write(USER.encode('ascii') + b'\n')
                time.sleep(2)
                telnet_client.session.read_until(b'Password:', timeout=2)
                time.sleep(2)
                telnet_client.session.write(PSW_2.encode('ascii') + b'\n')
                time.sleep(2)
            else:
                pass
            telnet_client.execute_command(b'cd /proc/cvitek', 2)
            time.sleep(2)
    
            fr_str = "grep 'RGB_888_PLANAR ' vpss | grep '# 6' | awk '{print $11}'"
            fr_res = telnet_client.execute_command(bytes(fr_str, "utf-8"), 2)
           
            if ' No such file or directory' in fr_res:
                print("********************* NO CVIKET **************************************")
            elif "can\'t cd to /proc/cviket/'" in fr_res:
                print("********************* NO CVIKET **************************************")
    
            else:
                time.sleep(1)
                a, b, c = map(str,fr_res.split('\n'))
                b = b.strip()
    
                fr = int(b)
                if fr >= 15:
                    print("*****************" + devip + " FRAME RATE IS " + b + " *******************", file=f)
                    print("*****************" + devip + " FRAME RATE IS " + b + " *******************")
                    print("***********************FRAME RATE IS PASS!*******************", file=f)
                    print("***********************FRAME RATE IS PASS!*******************")
    
                elif fr < 15:
                    print("**********************" + devip + " FRAME RATE IS " + b + " ******************", file=f)
                    print("**********************" + devip + " FRAME RATE IS " + b + " ******************")
    
                    print("**********************FRAME RATE NOT PASS!*******************", file=f)
                    print("**********************FRAME RATE NOT PASS!*******************")
                else:
                    print("********************* NO FR FROM CVIKET **************************************")
    
                    time.sleep(2)
    
            # 3、检查gpio是否拉起
            gpio_str1 = "cd /sys/kernel/debug"
            telnet_client.execute_command(bytes(gpio_str1, "utf-8"), 1)
            time.sleep(1)
            gpio_str2 = "grep 'gpio-433' gpio|awk '{print $6}'"
            command_result2 = telnet_client.execute_command(bytes(gpio_str2, "utf-8"), 1)
            #
            # command_result2 = telnet_client.session.read_very_eager().decode('ascii')
            time.sleep(1)
            #print(command_result2+"!!!")
            A, B, C = map(str,command_result2.split('\n'))
    
    
            if 'hi' in B:
                print('*********************** GPIO-344 IS HIGH *****************************', file=f)
                print('*********************** GPIO-344 IS HIGH *****************************')
    
            elif 'lo' in B:
               # print(command_result2)
                print('*********************** GPIO-344 IS LOW *********************************', file=f)
                print('*********************** GPIO-344 IS LOW *********************************')
            else:
               # print(command_result2)
                print('********************** NO GPIO *********************************', file=f)
                print('********************* NO GPIO *********************************')
        else:
            # telnet_client.logout_telnet()
            pass
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    在开始之前ping一下检验是否在线

    def start_work(devIp):
        ping_result = pingHost(devIp)
        if ping_result:
            print("***********START TELNET CONNECT " + devIp + "***********", file=f)
            conn_work(devIp)
    
        else:
            input("*********************PRESS ENTER TO TRY AGAIN**********************", file=f)
            conn_work(devIp)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    if __name__ == '__main__':
        with open(nowTime + ' logger.txt', 'w') as f:
            print("*******************AUTO CHECK PROGRAM IS LOADING*******************")
            print("************BUILD DATE 2022-6-27 MADE BY MTY FROM YCTX************")
            print("******************************************************************")
    
         
            print("************************ IP LIST IS SEARCHING ********************************")
            print("************************ IP LIST IS SEARCHING ********************************", file=f)
            txt_name = input("请输入导出表的名称(例如:net2.txt):")
            with open(txt_name) as ipc:
                for line in ipc.readlines():  ##readlines(),函数把所有的行都读取进来;
                    ipc_line = line.strip()  ##删除行后的换行符,img_file 就是每行的内容啦
                    ipc_id = ipc_line.split(',')
                    id = ipc_id[2]
                    print(id)
                    print(id,file=f)
                    ip_list.append(id)
            for i in ip_list:
                start_work(i)
                print('\n', file=f)
                print()
        print("************************CHECK FINSHIED****************************")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    生成式 AI 如何释放开发者的生产力?
    ag-Grid Enterprise v28.2.1 企业版注册版
    Spring(七)- Spring Bean的生命周期
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    「记录」MMDetection入门篇
    日常学习-20240710
    基于XML管理(bean Spring的入门案例、IOC容器创建对象的方式、获取Bean的三种方式)
    uni-app引入海康威视h5player实现视频监控的播放
    32位x86处理器
    2225. 找出输掉零场或一场比赛的玩家
  • 原文地址:https://blog.csdn.net/qq_42102190/article/details/125527250