• 基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)


    在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
    为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。

    1、socket通信客户端

    这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池 或者接收服务器返回的心跳数据;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。这里的关键点在于,有多个线程可以发送数据(thread_msg与run线程),但是只有一个线程可以接收数据(run函数),单一线程接收数据可以避免服务器发送的数据存在冲突(两个线程同时接收数据就会存在死锁)
    在这里插入图片描述

    #socket客户端
    class MySocket(Thread):
        def __init__(self,config):
            super().__init__()
            # 1.创建套接字
            self.tcp_socket = socket(AF_INET,SOCK_STREAM)
            self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
            # 2.准备连接服务器,建立连接
            self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
            self.serve_port = config["serve_port"]  #端口当前7900
            self.sleep_time = config["sleep_time"]
            print("connect to : ",self.serve_ip,self.serve_port)
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            self.lock = threading.RLock()
            
            self.taskpool=TaskPool()
    
            task_msg=threading.Thread(target=self.thread_msg)
            task_msg.daemon = True
            task_msg.start()
                #定时发送信息
        def run(self):
            while True:
                a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
                a=a.decode('utf-8')
                if len(a)<66:
                	#此时的数据为服务器返回的心跳数据
                	continue
                print("------主线程-----",a)
                jdata=json.loads(a)
                #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
                task=OCRTask(jdata)
                self.taskpool.append(task)
                
                json_data={  
                    "type":"OCR_STATE_ACK",
                    "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                    "streamAddr": jdata["streamAddr"]
                }
                #print( json_data)
                message = json.dumps(json_data)
                data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
                data=hex_to_bytes(data)
                self.send_msg(data)
    
        def check_connection(self):
            try:
                self.tcp_socket.getpeername()
                return True
            except socket.error:
                return False
        
        #定时发送心跳信息
        def thread_msg(self):
            while True:
                #message=input('You can say:')
                #json标注的模板
                json_data={  
                    "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                    "type":"HEARBEAT"
                }
                #print( json_data)
                message = json.dumps(json_data)
                data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
                data=hex_to_bytes(data)
    
                #进行定时发送
                self.send_msg(data)
    
        def send_msg(self,msg):
            if self.check_connection() is False:
                print('服务器掉线!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            try:
                #进行定时发送
                self.lock.acquire()
                self.tcp_socket.send(msg)
                self.lock.release()
            except ConnectionRefusedError:
                print('服务器拒绝本次连接!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except TimeoutError:
                print('连接超时!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
                print('智能终端无网络连接!!!!!')
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    2、任务池实现

    任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
    在这里插入图片描述
    其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。

    #ocr任务线程池
    class TaskPool:
        def __init__(self,sleep_time=0.5):
            self.pool=[]
            self.sleep_time=sleep_time
            task_msg=threading.Thread(target=self.remove_task)
            task_msg.daemon = True
            task_msg.start()
    
        #删除已经结束的任务
        def remove_task(self):
            while True:
                names=[]
                for task in self.pool:
                    if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除
                        task.stop()
                        self.pool.remove(task)
                    else:
                        names.append(task.taskname)
                if len(names)>0:
                    print(names)
                time.sleep(self.sleep_time)
                
        def append(self,ocrtask):
            if ocrtask.state==0:
                #终止任务
                self.stop(ocrtask)
            else:
                #启动任务
                ocrtask.start()
                self.pool.append(ocrtask)
    
        #终止任务
        def stop(self,ocrtask):
            for task in self.pool:
                if task.taskname==ocrtask.taskname:
                    task.stop()
                    self.pool.remove(task)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3、具体任务线程

    任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。

    #ocr任务
    class OCRTask(Thread):
        def __init__(self,json):
            super().__init__()
            self.streamAddr=json["streamAddr"]
            self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
            if json["state"]==2:
                self.count=9999999999999999999999999
            else:
                self.count=json["count"]
            if "taskname" in json.keys():
                self.taskname=json["taskname"]
            else:
                self.taskname=json["streamAddr"]
    
            self.jsonname=json["jsonname"]
            self.lock = threading.RLock()
    
        def run(self):
            while self.get_count()>0:
                print('run %s'%self.taskname,end='*')
                time.sleep(2)
                self.lock.acquire()
                self.count-=1
                self.lock.release()
            print('%s finish!! '%self.taskname)
    
        #获取任务的生存时间
        def get_count(self):
            self.lock.acquire()
            now_count=self.count
            self.lock.release()
            #削减count
            return now_count
    
        #停止任务
        def stop(self):
            self.lock.acquire()
            self.count=-1
            self.lock.release()
            #停止任务
            pass
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    4、完整代码与使用效果

    完整代码如下所示

    from socket import *
    import time,json
    import yaml
    import threading,struct
    from threading import Thread
     
    def hex_to_bytes(hex_str):
        """
        :param hex_str: 16进制字符串
        :return: byte_data 字节流数据
        """
        bytes_data = bytes()
        while hex_str :
            """16进制字符串转换为字节流"""
            temp = hex_str[0:2]
            s = int(temp, 16)
            bytes_data += struct.pack('B', s)
            hex_str = hex_str[2:]
        return bytes_data
    
    # 读取Yaml文件方法
    def read_yaml(yaml_path):
        with open(yaml_path, encoding="utf-8", mode="r") as f:
            result = yaml.load(stream=f,Loader=yaml.FullLoader)
            return result
    
    #ocr任务
    class OCRTask(Thread):
        def __init__(self,json):
            super().__init__()
            self.streamAddr=json["streamAddr"]
            self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别
            if json["state"]==2:
                self.count=9999999999999999999999999
            else:
                self.count=json["count"]
            if "taskname" in json.keys():
                self.taskname=json["taskname"]
            else:
                self.taskname=json["streamAddr"]
    
            self.jsonname=json["jsonname"]
            self.lock = threading.RLock()
    
        def run(self):
            while self.get_count()>0:
                print('run %s'%self.taskname,end='*')
                time.sleep(2)
                self.lock.acquire()
                self.count-=1
                self.lock.release()
            print('%s finish!! '%self.taskname)
    
        #获取任务的生存时间
        def get_count(self):
            self.lock.acquire()
            now_count=self.count
            self.lock.release()
            #削减count
            return now_count
    
        #停止任务
        def stop(self):
            self.lock.acquire()
            self.count=-1
            self.lock.release()
            #停止任务
            pass
    
    #ocr任务线程池
    class TaskPool:
        def __init__(self,sleep_time=0.5):
            self.pool=[]
            self.sleep_time=sleep_time
            task_msg=threading.Thread(target=self.remove_task)
            task_msg.daemon = True
            task_msg.start()
    
        #删除已经结束的任务
        def remove_task(self):
            while True:
                names=[]
                for task in self.pool:
                    if task.get_count()==0:
                        task.stop()
                        self.pool.remove(task)
                    else:
                        names.append(task.taskname)
                if len(names)>0:
                    print(names)
                time.sleep(self.sleep_time)
                
        def append(self,ocrtask):
            if ocrtask.state==0:
                #终止任务
                self.stop(ocrtask)
            else:
                #启动任务
                ocrtask.start()
                self.pool.append(ocrtask)
    
        #终止任务
        def stop(self,ocrtask):
            for task in self.pool:
                if task.taskname==ocrtask.taskname:
                    task.stop()
                    self.pool.remove(task)
    
    #socket客户端
    class MySocket(Thread):
        def __init__(self,config):
            super().__init__()
            # 1.创建套接字
            self.tcp_socket = socket(AF_INET,SOCK_STREAM)
            self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护
            # 2.准备连接服务器,建立连接
            self.serve_ip = config["serve_ip"]#当前"118.24.111,149"
            self.serve_port = config["serve_port"]  #端口当前7900
            self.sleep_time = config["sleep_time"]
            print("connect to : ",self.serve_ip,self.serve_port)
            self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            self.lock = threading.RLock()
            
            self.taskpool=TaskPool()
    
            task_msg=threading.Thread(target=self.thread_msg)
            task_msg.daemon = True
            task_msg.start()
                #定时发送信息
        
        #通信线程-用于接收服务器的指令
        def run(self):
            while True:
                a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1k
                a=a.decode('utf-8')
                if len(a)<66:
                	#服务器返回的心跳包,不予处理
                	continue
                print("------主线程-----",a)
                jdata=json.loads(a)
                #jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}
                task=OCRTask(jdata)
                self.taskpool.append(task)
                
                json_data={  
                    "type":"OCR_STATE_ACK",
                    "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                    "streamAddr": jdata["streamAddr"]
                }
                #print( json_data)
                message = json.dumps(json_data)
                data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
                data=hex_to_bytes(data)
                self.send_msg(data)
    
        #检测socket连接是否断开
        def check_connection(self):
            try:
                self.tcp_socket.getpeername()
                return True
            except socket.error:
                return False
        
        #定时发送心跳信息--子线程
        def thread_msg(self):
            while True:
                #message=input('You can say:')
                #json标注的模板
                json_data={  
                    "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致
                    "type":"HEARBEAT"
                }
                #print( json_data)
                message = json.dumps(json_data)
                data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()
                data=hex_to_bytes(data)
    
                #进行定时发送
                self.send_msg(data)
        #发送信息
        def send_msg(self,msg):
            if self.check_connection() is False:
                print('服务器掉线!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            try:
                #进行定时发送
                self.lock.acquire()
                self.tcp_socket.send(msg)
                self.lock.release()
            except ConnectionRefusedError:
                print('服务器拒绝本次连接!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except TimeoutError:
                print('连接超时!!!!!')
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
            except OSError:
                self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式
                print('智能终端无网络连接!!!!!')
    
    if "__main__"==__name__:
        #进行定时通信测试
        config=read_yaml("config.yaml")
        socket_client=MySocket(config)
        socket_client.start()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205

    使用效果如下所示,这里基于socket调试工具作为客户端

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    《算法通关村第二关——指定区间反转问题解析》
    Linux Gnome桌面无法打开终端Terminal
    06_SpingBoot 下的 Spring MVC
    Java中的JVM是什么?如何调优JVM的性能?
    野火开源资料
    Softmax 回归(PyTorch)
    数据库自增策略
    Burpsuite安装教程,附Link
    (23)STM32——硬件随机数发生器
    7X24即时新闻监测
  • 原文地址:https://blog.csdn.net/m0_74259636/article/details/136218649