提示:博主取舍了很多大佬的博文并亲测有效,分享笔记邀大家共同学习讨论
之前的内容已经尽可能简单、详细的介绍CPU【Pytorch2ONNX】和GPU【Pytorch2ONNX】俩种模式下Pytorch模型转ONNX格式的流程,本博文根据自己的学习和需求进一步讲解ONNX模型的部署。onnx模型博主将使用PyInstaller进行打包部署,PyInstaller是一个用于将Python脚本打包成独立可执行文件的工具,【入门篇】中已经进行了最基本的使用讲解。之前博主在【快速部署ONNX模型】中分别介绍了CPU模式和GPU模式下onnx模型打包成可执行文件的教程,本博文将进一步介绍在CPU模式下使用多线程对ONNX模型进行快速部署。
系列学习目录:
【CPU】Pytorch模型转ONNX模型流程详解
【GPU】Pytorch模型转ONNX格式流程详解
【ONNX模型】快速部署
【ONNX模型】多线程快速部署
【ONNX模型】opencv_cpu调用onnx
【ONNX模型】opencv_gpu调用onnx
创建一个纯净的、没有多余的第三方库和模块的小型Python环境,抛开任何pytorch相关的依赖,只使用onnx模型完成测试。
# name 环境名、3.x Python的版本
conda create -n deploy python==3.10
# 激活环境
activate deploy
# 安装onnx
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple onnx
# 安装GPU版
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple onnxruntime-gpu==1.15.0
# 下载安装Pyinstaller模块
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple Pyinstaller
# 根据个人情况安装包,博主这里需要安装piilow
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple Pillow
多线程是一种并发编程的技术,通过同时执行多个线程来提高程序的性能和效率。python的内置模块提供了两个内置模块:thread和threading,thread是源生模块,是比较底层的模块,threading是扩展模块,是对thread做了一些封装,可以更加方便的被使用,所以只需要使用threading这个模块就能完成并发的测试。
python3.x中通过threading模块有两种方法创建新的线程:通过threading.Thread(Target=executable Method)传递给Thread对象一个可执行方法(或对象);通过继承threading.Thread定义子类并重写run()方法。下面给出了俩种创建新线程方法的例子,读者可以运行一下加深理解。
import threading
import time
def myTestFunc():
# 子线程开始
print("the current threading %s is runing" % (threading.current_thread().name))
time.sleep(1) # 休眠线程
# 子线程结束
print("the current threading %s is ended" % (threading.current_thread().name))
# 主线程
print("the current threading %s is runing" % (threading.current_thread().name))
# 子线程t1创建
t1 = threading.Thread(target=myTestFunc)
# 子线程t2创建
t2 = threading.Thread(target=myTestFunc)
t1.start() # 启动线程
t2.start()
t1.join() # join是阻塞当前线程(此处的当前线程时主线程) 主线程直到子线程t1结束之后才结束
t2.join()
# 主线程结束
print("the current threading %s is ended" % (threading.current_thread().name))
import threading
import time
class myTestThread(threading.Thread): # 继承父类threading.Thread
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
# 把要执行的代码写到run函数里面,线程在创建后会直接运行run函数
def run(self):
print("the current threading %s is runing" % (self.name))
print_time(self.name,5*self.threadID)
print("the current threading %s is ended" % (self.name))
def print_time(threadName, delay):
time.sleep(delay)
print("%s process at: %s" % (threadName, time.ctime(time.time())))
# 主线程
print("the current threading %s is runing" % (threading.current_thread().name))
# 创建新线程
t1 = myTestThread(1, "Thread-1", 1)
t2 = myTestThread(2, "Thread-2", 2)
# 开启线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print("the current threading %s is ended" % (threading.current_thread().name))
博主采用的是基础教程中普通创建方式创建新线程:将推理流程单独指定成目标函数,而后创建线程对象并指定目标函数,同一个推理session被分配给多个线程,多个线程会共享同一个onnx模型,这是因为深度学习模型的参数通常存储在模型对象中的共享内存中,并且模型的参数在运行时是可读写的,每个线程可以独立地使用模型对象执行任务,并且线程之间可以共享模型的状态和参数。
import onnxruntime as ort
import numpy as np
from PIL import Image
import time
import datetime
import sys
import os
import threading
def composed_transforms(image):
mean = np.array([0.485, 0.456, 0.406]) # 均值
std = np.array([0.229, 0.224, 0.225]) # 标准差
# transforms.Resize是双线性插值
resized_image = image.resize((args['scale'], args['scale']), resample=Image.BILINEAR)
# onnx模型的输入必须是np,并且数据类型与onnx模型要求的数据类型保持一致
resized_image = np.array(resized_image)
normalized_image = (resized_image/255.0 - mean) / std
return np.round(normalized_image.astype(np.float32), 4)
def check_mkdir(dir_name):
if not os.path.exists(dir_name):
os.makedirs(dir_name)
args = {
'scale': 416,
'save_results': True
}
def process_img(img_list,
ort_session,
image_path,
mask_path,
input_name,
output_names):
for idx, img_name in enumerate(img_list):
img = Image.open(os.path.join(image_path, img_name + '.jpg')).convert('RGB')
w, h = img.size
# 对原始图像resize和归一化
img_var = composed_transforms(img)
# np的shape从[w,h,c]=>[c,w,h]
img_var = np.transpose(img_var, (2, 0, 1))
# 增加数据的维度[c,w,h]=>[bathsize,c,w,h]
img_var = np.expand_dims(img_var, axis=0)
start_each = time.time()
prediction = ort_session.run(output_names, {input_name: img_var})
time_each = time.time() - start_each
# 除去多余的bathsize维度,NumPy变会PIL同样需要变换数据类型
# *255替换pytorch的to_pil
prediction = (np.squeeze(prediction[3]) * 255).astype(np.uint8)
if args['save_results']:
Image.fromarray(prediction).resize((w, h)).save(os.path.join(mask_path, img_name + '.jpg'))
def main():
# 线程个数
num_cores = 10
# 保存检测结果的地址
input = sys.argv[1]
# providers = ["CUDAExecutionProvider"]
providers = ["CPUExecutionProvider"]
model_path = "PFNet.onnx"
ort_session = ort.InferenceSession(model_path, providers=providers) # 创建一个推理session
input_name = ort_session.get_inputs()[0].name
# 输出有四个
output_names = [output.name for output in ort_session.get_outputs()]
print('Load {} succeed!'.format('PFNet.onnx'))
start = time.time()
image_path = os.path.join(input, 'image')
mask_path = os.path.join(input, 'mask')
if args['save_results']:
check_mkdir(mask_path)
# 所有图片数量
img_list = [os.path.splitext(f)[0] for f in os.listdir(image_path) if f.endswith('jpg')]
# 每个线程被均匀分配的图片数量
total_images = len(img_list)
start_index = 0
images_per_list = total_images // num_cores
# 理解成线程池
Thread_list = []
for i in range(num_cores):
end_index = start_index + images_per_list
img_l = img_list[start_index:end_index]
start_index = end_index
# 分配线程
t = threading.Thread(target=process_img, args=(img_l,ort_session, image_path, mask_path,input_name,output_names))
# 假如线程池
Thread_list.append(t)
# 线程执行
t.start()
# 这里是为了阻塞主线程
for t in Thread_list:
t.join()
end = time.time()
print("Total Testing Time: {}".format(str(datetime.timedelta(seconds=int(end - start)))))
if __name__ == '__main__':
main()
线程的数量根据需求而定,不是越多越好。
pyinstaller -F run_t.py
pyinstaller -F run_t.py --add-binary "D:/ProgramData/Anaconda3_data/envs/deploy/Lib/site-packages/onnxruntime/capi/onnxruntime_providers_cuda.dll;./onnxruntime/capi" --add-binary "D:/ProgramData/Anaconda3_data/envs/deploy/Lib/site-packages/onnxruntime/capi/onnxruntime_providers_shared.dll;./onnxruntime/capi"
尽可能简单、详细的介绍ONNX模型多线程快速部署过程。