• 混合编程之多线程


    前言: 小白一个, 没有系统性的学习过python网络通信,只是单纯的cv加修改代码,仅作留念以及参考用,感谢互联网博主们和bito插件,使得chatGPT得以免费使用.
    另外该多线程传输图片的速度比没有多线程执行还满,后续不对python服务端做优化,而改为C++服务端实现.写出来继续再分享把

    前篇博客地址

    python客户端

    采用生存者消费者模式模式2joinablequeue库.
    客户端实现还是比较简单的,麻烦在server端

    import pickle
    import time
    from multiprocessing import Process, JoinableQueue
    from queue import Queue
    
    from multiprocessing.connection import Client, Listener
    
    from client_apart import draw_box
    from image import plot_boxes, img_encode
    import os
    from natsort import ns, natsorted
    
    host = 'localhost'
    port = 9006
    total_time = 0
    
    
    def img_product(img_queue, path, path_mode='image'):
        if path_mode == 'image':
            img = img_encode(path)
            img_obj = {'frame_num': 1, 'image': img}  # need frame_num?
            img_queue.put(img_obj)
        elif path_mode == 'dir':
            dir_list = os.listdir(path)
            files = natsorted(dir_list, alg=ns.PATH)  # 顺序读取文件名
            i = 1
            for filename in files:
                img_path = path + '/' + filename
                img = img_encode(img_path)
                img_obj = {'frame_num': i, 'image': img}  # need frame_num?
                i += 1
                img_queue.put(img_obj)
            img_queue.put({'frame_num': 0, 'image': "end"})  # end signal 
        img_queue.join()
    
    
    def server_consumer(img_queue):
        # 1. send data
        while True:
            img_obj = img_queue.get()
            if img_obj is None:
                client.close()  # avoid connection-reset-by-peer
                break  # exit end
            data_bytes = pickle.dumps(img_obj)
            start = int(round(time.time() * 1000))
            start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            client.send(data_bytes)  # 40ms/per send img
            # print('send cost time: ', (end - start))
            img_queue.task_done()
            try:
                det_result = client.recv()
                end = int(round(time.time() * 1000))
                end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
                print('recv cost time: ', (end-start))
            except EOFError:
                break
            det_result = pickle.loads(det_result)
            draw_box(det_result, img_obj)
    
    
    if __name__ == '__main__':
        client = Client((host, port))
        Listener()
        img_dir = './data'
        one_img = './data/Enterprise001.jpg'
        mode = 'dir'
    
        img_jq = JoinableQueue()
        producer = Process(target=img_product, args=(img_jq, img_dir, mode,))
        consumer = Process(target=server_consumer, args=(img_jq,))
        consumer.daemon = True  # set daemon but not set join()
    
        producer.start()
        consumer.start()
    
        producer.join()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    Python服务端

    此处接受图片和发送结果线程处在一个类内,使得可以共享一个Listener,来自chatGPT的idea.

    from ctypes import *
    import ctypes
    import time
    import pickle
    from PHeader import *
    
    import cv2
    import numpy as np
    from multiprocessing.connection import Listener
    from multiprocessing import JoinableQueue, Process, Queue, connection
    import threading
    
    
    class ListenerThread(threading.Thread):
        def __init__(self, address, data_handler, lib):
            super().__init__()
            self.daemon = True
            self.address = address
            self.data_handler = data_handler
            self.lib = lib
            self.listener = Listener(self.address, backlog=5)
            self.conn = None
            self.is_running = True
            
        def run(self):
            print('Listening on', self.listener.address)
            while self.is_running:
                try:
                    self.conn = self.listener.accept()  # ready to accept data continually
                    print('Connected by', self.listener.last_accepted)
                    t1 = threading.Thread(target=self.receive_data)
                    t2 = threading.Thread(target=self.send_result)
                    t1.start()
                    t2.start()
                    t1.join()
                    t2.join()
                except OSError as e:
                    if e.errno != 98:  # Address already in use
                        raise
                    print('Address already in use, retrying in 1 second...')
                    time.sleep(1)
            
    
        def destroy_Model(self):  # when qt send a signal
            self.lib.interface_destoryThread()
    
        def receive_data(self):
            time_cost1 = 0
            while True:
                try:
                    start = int(round(time.time() * 1000))
                    received_bytes = self.conn.recv()  # recv Client data
                    end = int(round(time.time() * 1000))
                    print('recv time cost: ', end-start)
                    time_cost1 += end-start
                    img_dict = pickle.loads(received_bytes)
                    if img_dict["frame_num"] == 0:
                        print("receive's thread already receive all data, close thread!!")
                        print("recv: ", time_cost1)
                        self.lib.interface_setEmptyFlag()  # make send thread break
                        break
                    img_dict['image'] = cv2.imdecode(img_dict['image'], cv2.IMREAD_COLOR)
                    self.data_handler.sendImgtoC(self.lib, img_dict, 0)  # prepare to send img
                except EOFError:
                    print('Connection closed')
                    self.conn.close()
                    break
    
        def send_result(self):
            time_cost1 = 0
            time_cost2 = 0
            while True:
                self.lib.interface_getClsQueue.restype = ObjClassifyOutput
                start = int(round(time.time() * 1000))
                output = self.lib.interface_getClsQueue()  # get result from model
                end = int(round(time.time() * 1000))
                time_cost1 += end-start
                print('get cls time cost: ', end-start)
                if output.object_list.object_num >= 0:
                    cls_result = self.data_handler.CtoP(output)
                    cls_result = pickle.dumps(cls_result)
                    start = int(round(time.time() * 1000))
                    self.conn.send(cls_result)
                    end = int(round(time.time() * 1000))
                    print('send time cost: ', end-start)
                    time_cost2 += end-start
                elif output.object_list.object_num == -1:   # queue is empty for now
                    time.sleep(0.04)
                    continue
                elif output.object_list.object_num == -2:   # all data is classify
                    print("send's thread alreay handle all data, close thread!!")
                    print("cls: ", time_cost1, ' send: ', time_cost2)
                    # self.close()
                    break
    
        def close(self):  # useless for now
            self.conn.close()
            # self.listener.close()
            self.run()
    
    
    class DataHandler:
        def __init__(self):
            self.data = None
    
        def CtoP(self, output):  # 将模型结果解析为python列表
            # [cv_object_list: [cv_object: [cv_box: [] ]]]
            cv_object_list = []
            cls_out = []
            obj_list = output.object_list
            if obj_list.object_num != 0:
                for i in range(obj_list.object_num):
                    cv_box = []
                    cv_object = []
                    obj = obj_list.object[i]
                    # bbox
                    cv_box.append(obj.bbox.left_top_x)
                    cv_box.append(obj.bbox.left_top_y)
                    cv_box.append(obj.bbox.w)
                    cv_box.append(obj.bbox.h)
                    cv_object.append(cv_box)
                    # classes/objectness/prob
                    cv_object.append(obj.classes)
                    cv_object.append(obj.objectness)
                    prob = POINTER(c_float)(obj.prob)
                    cv_object.append(prob.contents.value)
                    # cv_object
                    cv_object_list.append(cv_object)
                cv_object_list.append(obj_list.object_num)
    
                # cv_object_list
                cls_out.append(cv_object_list)
            return cls_out
    
        def sendImgtoC(self, lib, img_dict, i):
            lib.interface_receive.argtypes = [PythonMat]
            # 1. combine info to img
            pi = PythonMat()
            pi.frame_num[0] = img_dict["frame_num"]
            img = img_dict['image']
            # 1.1 set width/height
            PARAM = c_int * 32
            height = PARAM()
            height[0] = img.shape[0]
            pi.height = height
            width = PARAM()
            width[0] = img.shape[1]
            pi.width = width
            # 1.2 set Mat
            frame_data = np.asarray(img, dtype=np.uint8)
            frame_data = frame_data.ctypes.data_as(c_char_p)
            pi.frame[0] = frame_data
            # 2. send img to detect model
            lib.interface_receive(pi)
    
    
    if __name__ == '__main__':
        address = ('localhost', 9006)
        data_handler = DataHandler()
        ll = ctypes.cdll.LoadLibrary
        lib = ll("../../lib/libDetClsController.so")  # create a C lib
        listener_thread = ListenerThread(address, data_handler, lib)
        listener_thread.start()
        try:
            det_process = threading.Thread(target=lib.interface_initDetectImage)
            cls_process = threading.Thread(target=lib.interface_initClassifyImage)
            det_process.start()
            cls_process.start()
            det_process.join()  # need a break signal
            cls_process.join()
        except KeyboardInterrupt:
            # 程序被强制关闭
            print('Program terminated')
            # 关闭ListenerThread对象
            listener_thread.is_running = False
            listener_thread.join()
        finally:
            # 关闭ListenerThread对象
            listener_thread.is_running = False
            listener_thread.join()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181

    C++模型调用

    此处涉及其他人员写的代码,做修改处理

    typedef struct 
    {
    int width[CV_MAX_BATCH_SIZE];
    int height[CV_MAX_BATCH_SIZE];
    char* frame[CV_MAX_BATCH_SIZE]; 
    int frame_num[CV_MAX_BATCH_SIZE]; 
    }PythonMat;
    
    // 多线程控制相关
    mutex mtxQueueDet;  // mutex for detect queue
    mutex mtxQueueImg;  // mutex for image queue
    mutex mtxQueueCls;  // mutex for classify queue
    mutex mtxif;
    queue<> queueDetOut;// Det output queue
    queue<> queueClsOut;// Det classify queue
    queue<cv::Mat> queueMat;
    bool DetectFlag = true;
    bool ClassifyFlag = true;
    bool empty_flag = false;
    
    void receiveImg(PythonMat &img)
    {
        cv::Mat frame(img.height[0], img.width[0], CV_8UC3, img.frame[0]);
    
        mtxQueueImg.lock();
        queueMat.push(frame);
        cout << "frame num: "<<  img.frame_num[0] << endl;
        mtxQueueImg.unlock();
    }
    
    void DetectImage()
    {
        Detect detect_output;
        Detect detectmodel;
        detectmodel.Init(config_path, 1);
        cv::Mat frame;
        while(1)
            { 
                if (queueMat.empty()) 
                {
                    if(!DetectFlag)
                    {
                        break;
                    }
                    usleep(2000);
                    continue;
                }
                mtxQueueImg.lock();
                frame = queueMat.front();
                queueMat.pop();
                mtxQueueImg.unlock();
    
                detect_output = detectmodel.Run(detect_input);
                
                // lock
                mtxQueueDet.lock();
                queueDetOut.push(detect_output);
                cout << "detect run !!" << endl;
                mtxQueueDet.unlock();
            }
        return;
    }
    
    void ClassifyImage()
    {
        Classify objclassify;
        objclassify.Init(config_path, 1);
        ClassifyInput input;
        ClassifyOutput output;
        cv::Mat frame;
        while(1)
        {
            if (queueDetOut.empty()) 
            {
                if(!ClassifyFlag)
                {
                    break;
                }
    			usleep(2000);
                continue;
    		}
            
            mtxQueueDet.lock();
            detect_result = queueDetOut.front();
            queueDetOut.pop();
            mtxQueueDet.unlock();
            
            output = objclassify.Run(input);
            
            mtxQueueCls.lock();
            queueClsOut.push(output);
            mtxQueueCls.unlock();
        }
        return;
    }
    
    
    ClassifyOutput getClsQueue(){
        ObjClassifyOutput output;
        if (queueClsOut.empty()){
            usleep(2000);
            output.object_list.object_num = -1;  // -1 is now empty; -2 is all empty 
            if (empty_flag){
                output.object_list.object_num = -2;
                empty_flag = false;
            }
            return output;
        }
        mtxQueueCls.lock();
        output = queueClsOut.front();
        queueClsOut.pop();
        cout << "cls_out pop " << output.object_list.object_num << endl;
        mtxQueueCls.unlock();
        return output;
        
    }
    
    extern "C" {
    int i = 0;
    void interface_initDetectImage(){
        // if (flag ) exit thread detect/classify 
        DetectImage();
    }
    void interface_initClassifyImage(){
        ClassifyImage();
    }
    void interface_receive(PythonMat &img){
        printf("run %d times\n", i++);
        receiveImg(img); 
    }
    void interface_setEmptyFlag(){
        empty_flag = true;
    }
    void testThread(){
        printf("C++ method!!\n");
    }
    ClassifyOutput interface_getClsQueue(){
        ClassifyOutput output;
        output = getClsQueue();
        return output;
    }
    
    void interface_destoryThread(){
        DetectFlag = false;
        ClassifyFlag = false;
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147

    至此这个混合编程项目结束,但是server接收到client耗时居然300ms,需要优化…

    参考博客

    1. 共享内存
    2. Address already in use
    3. connection-reset-by-peer 我是把client.close()加上就不报错
    4. (最终版)linux下python和c++相互调用共享内存通信
    5. 常见python通信错误
      1. EOFError
      2. OSError: handle is closed
      3. Broken pipe
  • 相关阅读:
    【安信可ESP-12K模组】
    双网卡网络设置:有线网卡优先级高于无线网卡
    em与rem的区别
    QT_字符串相关操作_QString
    springboot启动打印controller映射url
    Centos系统常见配置(详细)总结
    使用接口根据关键词取视频列表详情
    Java泛型中的 “?super T“ 与 “?extends T“ 有何不同
    IP地址查询
    【Android】画面卡顿优化列表流畅度一
  • 原文地址:https://blog.csdn.net/Z_timer/article/details/132643886