每次我忘记戴口罩去食堂吃饭的时候,门口都会有志愿者学生提醒你:“你好,麻烦戴下口罩。” 进门后里面那块大屏幕还会发出声音:“请佩戴口罩”。
上次博客仿照宿舍楼下那块大屏幕写了个人脸考勤,所以这次我打算弄一个口罩识别。
实现口罩识别的方式有很多种,也并非是一件难事。重要的是要学会把学到的知识运用到生活中去。为此,我特地翻出了吃灰的“树莓派”主板,打算放到宿舍门口。无他,卷一卷督促室友学习。
相信大多数初学人工智能的小伙伴,最早接触的一定是CNN处理图像分类的问题,本次项目的核心也就是CNN分类了。所以,这只是前面所学知识的结合,不做过多赘述。
权重文件和整体代码在文章最后
无论是机器学习还是深度学习,最重要的便是数据集,之后才是相关的模型和算法。
在这里,我们首先处理图片,把每一张佩戴口罩的照片裁剪出人脸部分,之后为了便于计算和训练我们将图片进行压缩。相关方法之前博客已经介绍过这里不做赘述。
大家可以自己到网上下载数据集,也可以使用我参考的这份,数据量比较小,用于演示:链接
- face_detector = cv2.dnn.readNetFromCaffe('./weights/deploy.prototxt.txt','weights/res10_300x300_ssd_iter_140000.caffemodel')
- # 人脸检测函数
- def face_detect(img):
- #转为Blob
- img_blob = cv2.dnn.blobFromImage(img,1,(300,300),(104,177,123),swapRB=True)
- # 输入
- face_detector.setInput(img_blob)
- # 推理
- detections = face_detector.forward()
- # 获取原图尺寸
- img_h,img_w = img.shape[:2]
-
- # 人脸框数量
- person_count = detections.shape[2]
-
- for face_index in range(person_count):
- # 通过置信度选择
- confidence = detections[0,0,face_index,2]
- if confidence > 0.5:
- locations = detections[0,0,face_index,3:7] * np.array([img_w,img_h,img_w,img_h])
- # 获得坐标 记得取整
- l,t,r,b = locations.astype('int')
- return img[t:b,l:r]
- return None
效果:
- # 转为Blob格式函数
- def imgBlob(img):
- # 转为Blob
- img_blob = cv2.dnn.blobFromImage(img,1,(100,100),(104,177,123),swapRB=True)
- # 维度压缩
- img_squeeze = np.squeeze(img_blob).T
- # 旋转
- img_rotate = cv2.rotate(img_squeeze,cv2.ROTATE_90_CLOCKWISE)
- # 镜像
- img_flip = cv2.flip(img_rotate,1)
- # 去除负数,并归一化
- img_blob = np.maximum(img_flip,0) / img_flip.max()
- return img_blob
效果:
有了这两个函数,我们就可以进行数据集的处理了:
- import tqdm
- import os,glob
-
- labels = os.listdir('images/')
-
- img_list = []
- label_list = []
- for label in labels:
- # 获取每类文件列表
- file_list =glob.glob('images/%s/*.jpg' % (label))
-
- for img_file in tqdm.tqdm( file_list ,desc = "处理文件夹 %s " % (label)):
- # 读取文件
- img = cv2.imread(img_file)
- # 裁剪人脸
- img_crop = face_detect(img)
- # 转为Blob
- if img_crop is not None:
- img_blob = imgBlob(img_crop)
- img_list.append(img_blob)
- label_list.append(label)
最后,我们将其转换为npz格式文件:
- X = np.asarray(img_list)
- Y = np.asarray(label_list)
-
- np.savez('./data/imageData.npz',X,Y)
首先我们读取之前保存的npz文件:
- import numpy as np
- arr = np.load('./data/imageData.npz')
- img_list = arr['arr_0']
- label_list =arr['arr_1']
- print(img_list.shape,label_list.shape)
((5328, 100, 100, 3), (5328,))
设置为onehot独热编码:
- from sklearn.preprocessing import OneHotEncoder
-
- onehot = OneHotEncoder()
- # 编码
- y_onehot =onehot.fit_transform(label_list.reshape(-1,1))
- y_onehot_arr = y_onehot.toarray()
划分数据集:
- from sklearn.model_selection import train_test_split
-
- x_train,x_test,y_train,y_test=train_test_split(img_list,y_onehot_arr,test_size=0.2,random_state=123)
-
- x_train.shape,x_test.shape,y_train.shape,y_test.shape
((4262, 100, 100, 3), (1066, 100, 100, 3), (4262, 3), (1066, 3))
构建并编译模型:
- from tensorflow import keras
- from tensorflow.keras import layers,models
- import tensorflow as tf
-
- gpus = tf.config.list_physical_devices("GPU")
-
- if gpus:
- gpu0 = gpus[0] #如果有多个GPU,仅使用第0个GPU
- tf.config.experimental.set_memory_growth(gpu0, True) #设置GPU显存用量按需使用
- tf.config.set_visible_devices([gpu0],"GPU")
-
-
- model = models.Sequential([
- layers.Conv2D(16,3,padding='same',input_shape=(100,100,3),activation='relu'),
- layers.MaxPool2D(),
- layers.Conv2D(32,3,padding='same',activation='relu'),
- layers.MaxPool2D(),
- layers.Conv2D(64,3,padding='same',activation='relu'),
- layers.MaxPool2D(),
- layers.Flatten(),
- layers.Dense(166,activation='relu'),
- layers.Dense(22,activation='relu'),
- layers.Dense(3,activation='sigmoid')
- ])
-
- # 编译模型
- model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
- loss=tf.keras.losses.categorical_crossentropy,
- metrics=['accuracy'])
model.summary()
Model: "sequential" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= conv2d (Conv2D) (None, 100, 100, 16) 448 _________________________________________________________________ max_pooling2d (MaxPooling2D) (None, 50, 50, 16) 0 _________________________________________________________________ conv2d_1 (Conv2D) (None, 50, 50, 32) 4640 _________________________________________________________________ max_pooling2d_1 (MaxPooling2 (None, 25, 25, 32) 0 _________________________________________________________________ conv2d_2 (Conv2D) (None, 25, 25, 64) 18496 _________________________________________________________________ max_pooling2d_2 (MaxPooling2 (None, 12, 12, 64) 0 _________________________________________________________________ flatten (Flatten) (None, 9216) 0 _________________________________________________________________ dense (Dense) (None, 166) 1530022 _________________________________________________________________ dense_1 (Dense) (None, 22) 3674 _________________________________________________________________ dense_2 (Dense) (None, 3) 69 ================================================================= Total params: 1,557,349 Trainable params: 1,557,349 Non-trainable params: 0 _________________________________________________________________
训练模型:
- history = model.fit(x=x_train,
- y=y_train,
- validation_data=(x_test,y_test),
- batch_size=30,
- epochs=15)
模型评估:
- acc = history.history['accuracy']
- val_acc = history.history['val_accuracy']
-
- loss = history.history['loss']
- val_loss = history.history['val_loss']
-
- epochs_range = range(len(loss))
-
- plt.figure(figsize=(12, 4))
- plt.subplot(1, 2, 1)
- plt.plot(epochs_range, acc, label='Training Accuracy')
- plt.plot(epochs_range, val_acc, label='Validation Accuracy')
- plt.legend(loc='lower right')
- plt.title('Training and Validation Accuracy')
-
- plt.subplot(1, 2, 2)
- plt.plot(epochs_range, loss, label='Training Loss')
- plt.plot(epochs_range, val_loss, label='Validation Loss')
- plt.legend(loc='upper right')
- plt.title('Training and Validation Loss')
- plt.show()
保存模型:
model.save('./data/face_mask_model')
我们上面可以看到,简单的数据集和模型已经可以使准确率达到98%了,我们接下来就可以打开摄像头,然后获取自己的图片放入模型进行预测了!
在这之前,可以简单测试一下模型:
- # 加载模型
- model = tf.keras.models.load_model('./data/face_mask_model/')
-
- # 挑选测试图片
- img = cv2.imread('./images/2.no/0_0_caizhuoyan_0009.jpg')
-
- plt.imshow(cv2.cvtColor(img,cv2.COLOR_BGR2RGB))
- plt.axis("off")
由于我们训练的时候,数据集是什么样的,做过什么处理,我们输入就要对其做同样的处理,才能保证预测的准确率:
- # 裁剪人脸
- img_crop = face_detect(img)
-
- # 转为Blob
- img_blob = imgBlob(img_crop)
-
- # reshape
- img_input = img_blob.reshape(1,100,100,3)
-
- # 预测
- result = model.predict(img_input)
预测结果:
- labels = os.listdir('./images/')
- labels[result.argmax()]
就像上面说的,模型的输入要与训练时一致,所以我们同样要对其进行裁剪、格式转换、压缩、归一化的操作。
下面直接附上完整代码。
权重文件和数据集:链接
大家可以根据自己需求更改代码。
- import cv2
- import time
- import numpy as np
- import tensorflow as tf
-
-
- class MaskDetection:
-
- def __init__(self,mode='rasp'):
- """
- 加载人脸检测模型 和 口罩模型
- """
- gpus = tf.config.list_physical_devices("GPU")
- if gpus:
- gpu0 = gpus[0] #如果有多个GPU,仅使用第0个GPU
- tf.config.experimental.set_memory_growth(gpu0, True) #设置GPU显存用量按需使用
- tf.config.set_visible_devices([gpu0],"GPU")
-
- self.mask_model = tf.keras.models.load_model('./data/face_mask_model.h5')
- # 类别标签
- self.labels = ['正常','未佩戴','不规范']
- # 标签对应颜色,BGR顺序,绿色、红色、黄色
- self.colors = [(0,255,0),(0,0,255),(0,255,255)]
-
- # 获取label显示的图像
- self.zh_label_img_list = self.getLabelPngList()
-
-
- def getLabelPngList(self):
- """
- 获取本地label显示的图像的列表
- """
- overlay_list = []
- for i in range(3):
- fileName = './label_img/%s.png' % (i)
- overlay = cv2.imread(fileName,cv2.COLOR_RGB2BGR)
- overlay = cv2.resize(overlay,(0,0), fx=0.3, fy=0.3)
- overlay_list.append(overlay)
-
- return overlay_list
-
-
-
- def imageBlob(self,face_region):
- """
- 将图像转为blob
- """
-
- if face_region is not None:
- blob = cv2.dnn.blobFromImage(face_region,1,(100,100),(104,117,123),swapRB=True)
- blob_squeeze = np.squeeze(blob).T
- blob_rotate = cv2.rotate(blob_squeeze,cv2.ROTATE_90_CLOCKWISE)
- blob_flip = cv2.flip(blob_rotate,1)
- # 对于图像一般不用附属,所以将它移除
- # 归一化处理
- blob_norm = np.maximum(blob_flip,0) / blob_flip.max()
- return blob_norm
- else:
- return None
-
- def detect(self):
- """
- 识别
- """
-
- face_detector = cv2.dnn.readNetFromCaffe('./weights/deploy.prototxt.txt','./weights/res10_300x300_ssd_iter_140000.caffemodel')
-
- cap = cv2.VideoCapture(0)
-
- frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
-
- frameTime = time.time()
-
- videoWriter = cv2.VideoWriter('./record_video/out'+str(time.time())+'.mp4', cv2.VideoWriter_fourcc(*'H264'), 10, (960,720))
-
-
- while True:
- ret,frame = cap.read()
- frame = cv2.flip(frame,1)
- frame_resize = cv2.resize(frame,(300,300))
-
- img_blob = cv2.dnn.blobFromImage(frame_resize,1.0,(300,300),(104.0, 177.0, 123.0),swapRB=True)
-
- face_detector.setInput(img_blob)
- detections = face_detector.forward()
- num_of_detections = detections.shape[2]
-
- # 记录人数(框)
- person_count = 0
-
- # 遍历多个
- for index in range(num_of_detections):
- # 置信度
- detection_confidence = detections[0,0,index,2]
- # 挑选置信度
- if detection_confidence>0.5:
-
- person_count+=1
-
- # 位置坐标 记得放大
- locations = detections[0,0,index,3:7] * np.array([frame_w,frame_h,frame_w,frame_h])
- l,t,r,b = locations.astype('int')
- # 裁剪人脸区域
- face_region = frame[t:b,l:r]
- # 转为blob格式
- blob_norm = self.imageBlob(face_region)
-
- if blob_norm is not None:
- # 模型预测
- img_input = blob_norm.reshape(1,100,100,3)
- result = self.mask_model.predict(img_input)
-
- # softmax分类器处理
- result = tf.nn.softmax(result[0]).numpy()
-
- # 最大值索引
- max_index = result.argmax()
- # 最大值
- max_value = result[max_index]
- # 标签
- label = self.labels[max_index]
-
- # 对应中文标签
- overlay = self.zh_label_img_list[max_index]
- overlay_h,overlay_w = overlay.shape[:2]
-
- # 覆盖范围
- overlay_l,overlay_t = l,(t - overlay_h-20)
- overlay_r,overlay_b = (l + overlay_w),(overlay_t+overlay_h)
-
- # 判断边界
- if overlay_t > 0 and overlay_r < frame_w:
-
- overlay_copy=cv2.addWeighted(frame[overlay_t:overlay_b, overlay_l:overlay_r ],1,overlay,20,0)
- frame[overlay_t:overlay_b, overlay_l:overlay_r ] = overlay_copy
-
- cv2.putText(frame, str(round(max_value*100,2))+"%", (overlay_r+20, overlay_t+40), cv2.FONT_ITALIC, 0.8, self.colors[max_index], 2)
-
- # 人脸框
- cv2.rectangle(frame,(l,t),(r,b),self.colors[max_index],5)
-
-
- now = time.time()
- fpsText = 1 / (now - frameTime)
- frameTime = now
-
- cv2.putText(frame, "FPS: " + str(round(fpsText,2)), (20, 40), cv2.FONT_ITALIC, 0.8, (0, 255, 0), 2)
- cv2.putText(frame, "Person: " + str(person_count), (20, 60), cv2.FONT_ITALIC, 0.8, (0, 255, 0), 2)
-
- videoWriter.write(frame)
-
- cv2.imshow('demo',frame)
-
- if cv2.waitKey(10) & 0xFF == ord('q'):
- break
-
- videoWriter.release()
- cap.release()
- cv2.destroyAllWindows()
-
-
- mask_detection = MaskDetection()
- mask_detection.detect()
效果如下: