• python篇----进程+线程


    1、进程队列

    import os
    import time
    import datetime
    import multiprocessing
    
    
    def alarm_reminder(queue):
        """sudo apt-get install mpg123"""
        while True:
            # 从队列中获取消息
            message = queue.get()
            if message == "trigger":
                try:
                    # 处理数据的代码
                    # 执行闹钟提醒的代码
                    print("Alarm reminder triggered")
                    os.system("mpg123 wastebag.mp3")
                except Exception as e:
                    # 处理异常的代码
                    pass
            elif message == "stop":
                print("stop...............")
                break
    
    
    if __name__ == '__main__':
        queue = multiprocessing.Queue()
        process = multiprocessing.Process(target=alarm_reminder, args=(queue,))
        process.start()
    
        # 将消息发送到队列中以触发报警提醒
        queue.put("trigger")
        print("end_time", datetime.datetime.now())
        # 等待一段时间后向队列中发送停止信号
        time.sleep(5)
        queue.put("stop")
        # 等待进程结束
        # 在另一个 async 函数中等待子进程结束
        process.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    结果如下:
    在这里插入图片描述

    2、进程管道

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   
        p.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    3、线程+异步

    import os
    import datetime
    import asyncio
    import threading
    
    
    async def alarm_reminder():
        """sudo apt-get install mpg123"""
        try:
            # 处理数据的代码
            # 执行闹钟提醒的代码
            print("start_time", datetime.datetime.now())
            print("Alarm reminder triggered")
            os.system("mpg123 wastebag.mp3")
            print("end_time", datetime.datetime.now())
        except Exception as e:
            # 处理异常的代码
            pass
    
    
    async def send_order():
        print("send_order................")
        # 发送工单的函数
        await asyncio.sleep(3)
    
    
    def send_thread():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(send_order())
        loop.close()
    
    
    def alarm_thread():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(alarm_reminder())
        loop.close()
    
    
    if __name__ == '__main__':
        thread1 = threading.Thread(target=send_thread, args=())
        thread2 = threading.Thread(target=alarm_thread)
    
        thread1.start()
        thread2.start()
    
        thread1.join()
        thread2.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    4、异步

    import asyncio
    import datetime
    
    
    async def identify_networks():
        print("detect model.............")
    
    
    # 小包垃圾的识别
    async def identify_wastebag():
        print("#" * 100)
        print("wastebag...............")
        await identify_networks()
    
    
    # 垃圾满溢的识别
    async def identify_overflowed():
        print("#" * 100)
        print("overflowed...............")
        await identify_networks()
    
    
    async def start_run():
        async with asyncio.Lock():
            tasks = [identify_wastebag(), identify_overflowed()]
            results = await asyncio.gather(*tasks)
            print("Results:", results)
    
    
    async def wastebag():
        print("wastebag.............")
        async with asyncio.Lock() as lock:
            await start_run()
    
    
    # -------------------------------#
    # 初筛
    # -------------------------------#
    async def identify_networks_wet():
        print("detect wet model............")
    
    
    async def wet():
        print("wet.........")
        await identify_networks_wet()
    
    
    async def main():
        async with asyncio.Lock():
            tasks = [wet(), wastebag()]
            results = await asyncio.gather(*tasks)
            print("Results:", results)
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    此代码运行类似二叉树的形式,但是实际运行的时候需要使用while True进行无限循环,不能用两个while True同步进行两个异步。只能通过一个while True进行异步循环,在模型预测后进行分开。

    • 部分样例代码
    async def identify_networks(img_name):
        # fixme:2023-9-21 先进行粗筛
        try:
            print("img_name", img_name)
            img = Image.open(img_name)
            
            ip = img_name.split("/")[-2]
            print("ip", ip)
            print("*" * 100)
            if ip == WASTEBAG_IP[0]:
                cameraID = IP_CAMERAID_WASTEBAG[img_name.split("/")[-2]]
                box = tuple(eval(CAMERAID_DICT_WASTEBAG[cameraID]))
                image, result_list = yolo.detect_image(img, box)
                print("wastebag result list...........", result_list)
    
                if len(result_list[0]["result"]["cls"]) > 0:
                    exclusive_list, exclusive_box_list, image_one = identify_pic(img_name, img)
    
                    print("start wastebag.....................")
                    sep_cls1 = result_list[0]["result"]["cls"]
                    indices_wastebag1 = [i for i, x in enumerate(sep_cls1) if x == 1]
                    print(indices_wastebag1)
                    wastebag_result_list1 = format_extract_wastebag(result_list, indices_wastebag1)
    
                    if len(wastebag_result_list1[0]["result"]["cls"]) > 0:
                        # 实例化类
                        async with asyncio.Lock() as lock:
                            await compare_result(cameraID, wastebag_result_list1, exclusive_list, exclusive_box_list, flag=INDEX_WASTE_BAG, info="wastebag")
    
                    print("start overflowed.....................")
                    sep_cls2 = result_list[0]["result"]["cls"]
                    indices_overflowed2 = [i for i, x in enumerate(sep_cls2) if x == 2]
                    print(indices_overflowed2)
                    overflowed_result_list2 = format_extract_wastebag(result_list, indices_overflowed2)
    
                    if len(overflowed_result_list2[0]["result"]["cls"]) > 0:
                        # 实例化类
                        async with asyncio.Lock() as lock:
                            await compare_result(cameraID, overflowed_result_list2, exclusive_list, exclusive_box_list, flag=INDEX_OVERFLOWED, info="overflowed")
    
            else:
                print("#" * 100)
                print("start wet.....................")
                try:
                    img.seek(0)
                    # fixme: 2023-5-15 进行深度拷贝,防止初筛的结果影响精筛
                    img2 = Image.open(img_name)
                    cameraID = IP_CAMERAID_WET[img_name.split("/")[-2]]
                    box = tuple(eval(CAMERAID_DICT_WET[cameraID]))
                    re_image, result_list = yolo3.detect_image(img2, box)
                    print("result_list------------------------", result_list)
    
                    current_data = time.strftime("%Y-%m-%d", time.localtime())
                    fresult = open(os.path.join(SAVE_RESULT, current_data + "result" + ".txt"), "a")
    
                    if len(result_list[0]["result"]["x"]) > 0:
                        ###########################################
                        # fixme: 2023-10-7 新增重复过筛
                        ###########################################
                        # 实例化类
                        async with asyncio.Lock() as lock:
                            await compare_result_wet(cameraID, result_list, flag=INDEX_WET)
    
                        fresult.write(json.dumps(result_list[0]) + "\n")
                    else:
                        # 文件保存
                        save_res = result_save_dict(img_name, "No target...........")
                        noRes_logger.info("no_result_log{}".format(save_res))
                        print("No target...........")
                except:
                    print(img_name, "can not open")
        except:
            print(img_name, "can not open")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    参考:

    • https://pythonjishu.com/svaytwzqkujhfzy/
  • 相关阅读:
    汽车数据安全事件频发,用户如何保护隐私信息?
    雨云游戏云面板服使用教程&我的世界Forge服务端开服教程(翼龙面板)
    听GPT 讲Rust源代码--library/std(7)
    计算机毕业设计django基于python的汽车租赁系统(源码+系统+mysql数据库+Lw文档)
    安安静静地一天!
    Java关于接口
    简述@RequestParam与@RequestBody参数注解
    虾皮shopee官方平台开放api接口采集商品数据信息获取产品详情数据根据关键词推荐获取商品列表、优惠价、销量调试示例
    瑞芯微RV1126——ffmpeg环境搭建
    接雨水问题
  • 原文地址:https://blog.csdn.net/m0_46825740/article/details/133377178