import os
import time
import datetime
import multiprocessing
def alarm_reminder(queue):
"""sudo apt-get install mpg123"""
while True:
# 从队列中获取消息
message = queue.get()
if message == "trigger":
try:
# 处理数据的代码
# 执行闹钟提醒的代码
print("Alarm reminder triggered")
os.system("mpg123 wastebag.mp3")
except Exception as e:
# 处理异常的代码
pass
elif message == "stop":
print("stop...............")
break
if __name__ == '__main__':
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=alarm_reminder, args=(queue,))
process.start()
# 将消息发送到队列中以触发报警提醒
queue.put("trigger")
print("end_time", datetime.datetime.now())
# 等待一段时间后向队列中发送停止信号
time.sleep(5)
queue.put("stop")
# 等待进程结束
# 在另一个 async 函数中等待子进程结束
process.join()
结果如下:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
import os
import datetime
import asyncio
import threading
async def alarm_reminder():
"""sudo apt-get install mpg123"""
try:
# 处理数据的代码
# 执行闹钟提醒的代码
print("start_time", datetime.datetime.now())
print("Alarm reminder triggered")
os.system("mpg123 wastebag.mp3")
print("end_time", datetime.datetime.now())
except Exception as e:
# 处理异常的代码
pass
async def send_order():
print("send_order................")
# 发送工单的函数
await asyncio.sleep(3)
def send_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(send_order())
loop.close()
def alarm_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(alarm_reminder())
loop.close()
if __name__ == '__main__':
thread1 = threading.Thread(target=send_thread, args=())
thread2 = threading.Thread(target=alarm_thread)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
import asyncio
import datetime
async def identify_networks():
print("detect model.............")
# 小包垃圾的识别
async def identify_wastebag():
print("#" * 100)
print("wastebag...............")
await identify_networks()
# 垃圾满溢的识别
async def identify_overflowed():
print("#" * 100)
print("overflowed...............")
await identify_networks()
async def start_run():
async with asyncio.Lock():
tasks = [identify_wastebag(), identify_overflowed()]
results = await asyncio.gather(*tasks)
print("Results:", results)
async def wastebag():
print("wastebag.............")
async with asyncio.Lock() as lock:
await start_run()
# -------------------------------#
# 初筛
# -------------------------------#
async def identify_networks_wet():
print("detect wet model............")
async def wet():
print("wet.........")
await identify_networks_wet()
async def main():
async with asyncio.Lock():
tasks = [wet(), wastebag()]
results = await asyncio.gather(*tasks)
print("Results:", results)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
此代码运行类似二叉树的形式,但是实际运行的时候需要使用while True进行无限循环,不能用两个while True同步进行两个异步。只能通过一个while True进行异步循环,在模型预测后进行分开。
async def identify_networks(img_name):
# fixme:2023-9-21 先进行粗筛
try:
print("img_name", img_name)
img = Image.open(img_name)
ip = img_name.split("/")[-2]
print("ip", ip)
print("*" * 100)
if ip == WASTEBAG_IP[0]:
cameraID = IP_CAMERAID_WASTEBAG[img_name.split("/")[-2]]
box = tuple(eval(CAMERAID_DICT_WASTEBAG[cameraID]))
image, result_list = yolo.detect_image(img, box)
print("wastebag result list...........", result_list)
if len(result_list[0]["result"]["cls"]) > 0:
exclusive_list, exclusive_box_list, image_one = identify_pic(img_name, img)
print("start wastebag.....................")
sep_cls1 = result_list[0]["result"]["cls"]
indices_wastebag1 = [i for i, x in enumerate(sep_cls1) if x == 1]
print(indices_wastebag1)
wastebag_result_list1 = format_extract_wastebag(result_list, indices_wastebag1)
if len(wastebag_result_list1[0]["result"]["cls"]) > 0:
# 实例化类
async with asyncio.Lock() as lock:
await compare_result(cameraID, wastebag_result_list1, exclusive_list, exclusive_box_list, flag=INDEX_WASTE_BAG, info="wastebag")
print("start overflowed.....................")
sep_cls2 = result_list[0]["result"]["cls"]
indices_overflowed2 = [i for i, x in enumerate(sep_cls2) if x == 2]
print(indices_overflowed2)
overflowed_result_list2 = format_extract_wastebag(result_list, indices_overflowed2)
if len(overflowed_result_list2[0]["result"]["cls"]) > 0:
# 实例化类
async with asyncio.Lock() as lock:
await compare_result(cameraID, overflowed_result_list2, exclusive_list, exclusive_box_list, flag=INDEX_OVERFLOWED, info="overflowed")
else:
print("#" * 100)
print("start wet.....................")
try:
img.seek(0)
# fixme: 2023-5-15 进行深度拷贝,防止初筛的结果影响精筛
img2 = Image.open(img_name)
cameraID = IP_CAMERAID_WET[img_name.split("/")[-2]]
box = tuple(eval(CAMERAID_DICT_WET[cameraID]))
re_image, result_list = yolo3.detect_image(img2, box)
print("result_list------------------------", result_list)
current_data = time.strftime("%Y-%m-%d", time.localtime())
fresult = open(os.path.join(SAVE_RESULT, current_data + "result" + ".txt"), "a")
if len(result_list[0]["result"]["x"]) > 0:
###########################################
# fixme: 2023-10-7 新增重复过筛
###########################################
# 实例化类
async with asyncio.Lock() as lock:
await compare_result_wet(cameraID, result_list, flag=INDEX_WET)
fresult.write(json.dumps(result_list[0]) + "\n")
else:
# 文件保存
save_res = result_save_dict(img_name, "No target...........")
noRes_logger.info("no_result_log{}".format(save_res))
print("No target...........")
except:
print(img_name, "can not open")
except:
print(img_name, "can not open")
参考: