• python+ipc+改造过的插线板写一个控制ipc疯狂上下电的脚本


    材料准备:

    测试用的摄像机固件及对应的摄像机,改造过的插线板(与ipc结合,通过控制gpio的值可以控制上下电)

    脚本需求&流程:

    1、将需要测试的ipc升级到指定固件,与继电器连接的控制ipc设置在同一局域网,通过两个指令控制上电和断电
    2、对被控制的ipc的udp广播进行捕捉和分析,捕获报文内容进行分析和判断
    3、出现问题有问题的时不再上下电

    1.telnet类,里面写有常用的方法

    class TelnetClient:
        def __init__(self):
            self.session = telnetlib.Telnet()
    
        # telnet连接
        # telnet连接
        def telnet_connect(self, client_addr, port_num):
            try:
                self.session = telnetlib.Telnet(client_addr, port=port_num)
                self.session.open(client_addr, port=port_num)  # telnet默认开启本行注释掉
            except:
                logging.warning('%s网络连接失败' % client_addr)
                return False
            # 延时两秒再收取返回结果,给服务端足够响应时间
            time.sleep(2)
            # 获取登录结果
            # read_very_eager()获取到的是上次获取之后本次获取之前的所有输出
            command_result = self.session.read_very_eager().decode('ascii')
            print('telnet result:', command_result)
            try:
                self.session.read_until(b'cdyctx login:', timeout=2)
                self.session.write(USER.encode('ascii') + b'\n')
                command_result = self.session.read_very_eager().decode('ascii')
                print(command_result)
                time.sleep(2)
                self.session.read_until(b'Password:', timeout=2)
                self.session.write(PSW_1.encode('ascii') + b'\n')
                time.sleep(5)
                command_result = self.session.read_very_eager().decode('ascii')
                print(command_result)
    
            except:
                logging.warning('telnet 登陆失败')
                return False
            return True
    
        # 读取当前的状态
        def readState(self):
            command_result = self.session.read_very_eager().decode('ascii')
            print(command_result)
       
        # 执行需要返回值的指令
        def execute_command(self, command, time_wait):
            # 执行命令并且等待
            self.session.write(command + b'\n')
            time.sleep(time_wait)
            # 获取命令结果
            command_result = self.session.read_very_eager().decode('ascii')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    2.等待函数

    def wait_read(client, nwait):
        time.sleep(nwait)
        client.readState()
    
    • 1
    • 2
    • 3

    3.监听指定的udp端口的广播,捕获报文,并判断指定字段的值
    这里address[0]接收到的是ip,res = int(str(data, ‘utf-8’)[-2])拆分出来想要的某个字段

    def udp_check():
        import socket
        # 创建流式socket
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        # s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 默认本机监听10122端口
        s.bind(('0.0.0.0', 10122))
    
        # print('wait recv...')
        # 接收消息
        while True:
            # print('===============LISTENING FROM UDP PORT==================')
            data, address = s.recvfrom(1024)
            # print(' [recv form %s:%d]:%s' % (address[0], address[1], data))
            addr = address[0]
            res = int(str(data, 'utf-8')[-2])
            rev_dict[addr] = res
            time.sleep(0.01)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    4.负责控制插线板开关的一个线程,要一直保活,不断接收udp的广播。当监听多个设备的时候,要对通过测试的设备数量进行判断,有某个机器出现异常就不再重启。记录重启数量,并且将记录写入文件。

    def telnet_thread():
        global reboot_num
        reboot_num = 0
        telnet_client = TelnetClient()
        print("telnet connetcing....")
        if telnet_client.telnet_connect(domip, 23):
            print("telnet connet success")
            # 连接成功,先关闭插板电源
            telnet_client.execute_command(bytes(str_off, "utf-8"), 2)
            wait_read(telnet_client, 0)
            print("===============================SWITHC OFF===============================")
            file_handle.write("===============================SWITHC OFF===============================\n")
            file_handle.flush()
    
            time.sleep(10)
    
            telnet_client.execute_command(bytes(str_on, "utf-8"), 2)
            wait_read(telnet_client, 0)
            print("===============================SWITHC  ON===============================")
            file_handle.write("===============================SWITHC  ON===============================\n")
            file_handle.flush()
    
    
        else:
            print('telnet connect failed\n')
    
        while True:
            check_num = 0
            un_num = 0
            for key in rev_dict:
                if rev_dict[key] == 1:
                    check_num += 1
                elif rev_dict[key] == 2:
                    un_num += 1
    
            if check_num == ipc_count:
                reboot_num += 1
                print("==============================REBOOT NUM:" + str(reboot_num) + "==============================")
                file_handle.write(
                    "==============================REBOOT NUM:" + str(reboot_num) + "==============================\n")
                file_handle.flush()
    
                # 连接成功,先关闭插板电源
                telnet_client.execute_command(bytes(str_off, "utf-8"), 2)
                time.sleep(10)
    
                telnet_client.execute_command(bytes(str_on, "utf-8"), 2)
                # 15秒后在重新开启
                for key in rev_dict:
                    rev_dict[key] = 0
            else:
                if un_num > 0:
                    print("==============================NO NEED REBOOT=============================")
                    file_handle.write("==============================NO NEED REBOOT=============================\n")
                    file_handle.flush()
    
                else:
                    time.sleep(20)
    
            time.sleep(0.05)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    **5.读入输入的设备的ip,**split后记录数量,将ip作为key的dict全部置零

    def get_input():
        global ipc_count
        ipc_num = input("请输入监听ip,多个请以';'间隔\n")
        if '.' not in ipc_num:
            get_input()
        else:
            ipc_list = ipc_num.split(";")
            if len(ipc_list) > 0:
                ipc_count = len(ipc_list)
                for item in ipc_list:
                    rev_dict[item] = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    6.main函数执行,telnet_thread_handle线程始终保活,保持telnet的连接,并且处理udp的报文和相关的判断。udp_thread_handle只负责将报文分析后写入dict

    if __name__ == '__main__':
        print("=======================TESTING STARTING==================")
        global ipc_count
        global file_handle
        file = os.getcwd()
        file_handle = open(file + '/record.txt', 'w')
    
        get_input()
        print("LISTEN IPC个数:" + str(ipc_count))
        file_handle.write("LISTEN IPC个数:" + str(ipc_count) + "\n")
        file_handle.flush()
    
        telnet_thread_handle = threading.Thread(target=telnet_thread)
        udp_thread_handle = threading.Thread(target=udp_check)
        telnet_thread_handle.start()
        udp_thread_handle.start()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    一些迷思:
    1原本设计在telnet类里面写入开关的控制方法类,但是没法控制,所有写在了线程里
    2原本习惯的写文件的方式也不好用了,换用了flush,原因也未知…

  • 相关阅读:
    java Lock接口
    智领云CEO彭锋:DataOps,大数据的新战线
    NPM 包开发与优化全面指南
    Kotlin语法学习(二)
    【WPF应用30】WPF中的ListBox控件详解
    学会使用这五大电阻!
    深入浅出理解 AI 生图模型
    高并发 MySQL 性能优化指南,自取
    PostgreSQL Array 数组类型与 FreeSql 打出一套【组合拳】
    黑马-设计模式-笔记(未完)
  • 原文地址:https://blog.csdn.net/qq_42102190/article/details/127844282