目录
python opencv 读取mp4, 有上一帧,下一帧的功能
pip install pynput
这个每次读取self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index),后退读取稍微有点慢,
- import cv2
- from pynput import keyboard
-
- class VideoPlayer:
- def __init__(self, video_path):
- self.cap = cv2.VideoCapture(video_path)
- self.frame_index = -1
- self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
- self.new_frame_flag = False
- self.last_frame_i=0
-
- def show_frame(self, frame_index):
- if 0 <= frame_index < self.total_frames:
- if frame_index
- self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
- self.last_frame_i = frame_index
- ret, frame = self.cap.read()
- if ret:
- cv2.imshow('Video', frame)
- self.frame_index = frame_index
- self.new_frame_flag = False
-
- def next_frame(self):
- if self.frame_index + 1 < self.total_frames:
- self.frame_index += 1
- self.new_frame_flag = True
-
- def prev_frame(self):
- if self.frame_index - 1 >= 0:
- self.frame_index -= 1
- self.new_frame_flag = True
-
- def on_press(self, key):
- if key == keyboard.Key.right: # Right arrow key
- self.next_frame()
- elif key == keyboard.Key.left: # Left arrow key
- self.prev_frame()
- elif str(key) == "'q'": # Press 'q' to exit
- return False
-
- def play(self):
- listener = keyboard.Listener(on_press=self.on_press)
- listener.start()
-
- self.show_frame(0) # Show the first frame
-
- while True:
- if self.new_frame_flag:
- self.show_frame(self.frame_index)
- if cv2.waitKey(1) & 0xFF == ord('q'): # Press 'q' to exit
- break
-
- cv2.destroyAllWindows()
-
- if __name__ == "__main__":
- player = VideoPlayer(rf'F:\data\9.mp4')
- player.play()
队列 缓存最新版,有帧号
- import cv2
- from collections import deque
- from pynput import keyboard
-
-
- def on_press(key):
- global queue_index, load_new_frame
- if key == keyboard.Key.right or key == keyboard.Key.space: # Right arrow key
- if queue_index < len(frame_buffer) - 1:
- queue_index += 1
- else:
- load_new_frame = True
- elif key == keyboard.Key.left: # Left arrow key
- if queue_index > 0:
- queue_index -= 1
- load_new_frame = False
- elif str(key) == "'q'": # Press 'q' to exit
- return False
-
-
- listener = keyboard.Listener(on_press=on_press)
- listener.start()
-
- if __name__ == '__main__':
- cap = cv2.VideoCapture(r'C:\Users\Administrator\Documents\youjie\zhuanchang\11.mp4')
-
- deque_len = 300
- frame_buffer = deque(maxlen=deque_len)
- queue_index = -1
- load_new_frame = True
- frame_index = -1
- while cap.isOpened():
- if load_new_frame:
- ret, frame = cap.read()
- if not ret:
- print('video end')
- continue
-
- frame_index += 1
- cv2.putText(frame, str(frame_index), (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
- frame_buffer.append(frame)
- queue_index = len(frame_buffer) - 1
- # print('load frame_i', frame_index, "queue_i", queue_index, 'queue len', len(frame_buffer))
- load_new_frame = False
-
- # Show the current frame
- if len(frame_buffer) > 0:
- view_index = frame_index - len(frame_buffer) + queue_index + 1
- imshow = frame_buffer[queue_index]
- # print('show frame_i', frame_index, "queue_i", queue_index, 'view_i', view_index)
- if max(imshow.shape[:2])>1280:
- imshow = cv2.resize(imshow, (1280, 720))
- cv2.imshow('Video', imshow)
-
- if cv2.waitKey(1) & 0xFF == ord('q'): # Press 'q' to exit
- break
-
- # Release the video and close windows
- cap.release()
- cv2.destroyAllWindows()
队列封装成类版本:
- import glob
- import os
- import re
-
- import cv2
- from collections import deque
-
- import yaml
- from pynput import keyboard
-
- from a2_jupter_check.ip_result.track_tool import get_box_dict, get_color
-
- def show_box(track_id, box, black_image):
- color = get_color(box['sku'])
- cv2.rectangle(black_image, (box['box'][0], box['box'][1]), (box['box'][2], box['box'][3]), color, 1)
- cv2.putText(black_image, f"p:{(box['prob']):.3f}", (box['box'][0], box['box'][1] + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
- cv2.putText(black_image, "t:" + str(track_id), (box['box'][0], box['box'][1] + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
- cv2.putText(black_image, "s:" + str(box['sku']), (box['box'][0], box['box'][1] + 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
-
-
- class VideoPlayer:
- def __init__(self, video_path,video_path2,track_txt):
-
- self.cap = cv2.VideoCapture(video_path)
- if door_type==1:
- self.cap2 = cv2.VideoCapture(video_path2)
- self.frame_index = -1
- self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
- if door_type == 1:
- self.total_frames2 = int(self.cap2.get(cv2.CAP_PROP_FRAME_COUNT))
- self.total_frames=min(self.total_frames,self.total_frames2)
-
- self.top_dict, self.bot_dict, self.end_index = get_box_dict(track_txt)
- self.queue_index = -1
- def save_cls_err(self,cam_id):
- if cam_id==0:
- box_id=0
- for box in self.top_boxes:
- img_crop=self.top_image_o[box['box'][1]:box['box'][3],box['box'][0]:box['box'][2]]
- save_path=f"{cls_dir}/{box['sku']}_{(box['prob']):.2f}_{self.frame_index}_0_{box_id}.jpg"
- cv2.imwrite(save_path,img_crop)
- print('cls top save img',save_path)
- box_id+=1
-
- elif cam_id==1:
- box_id = 0
- for box in self.bot_boxes:
- img_crop = self.bot_image_o[box['box'][1]:box['box'][3], box['box'][0]:box['box'][2]]
- save_path=f"{cls_dir}/{box['sku']}_{(box['prob']):.2f}_{self.frame_index}_1_{box_id}.jpg"
- cv2.imwrite(save_path, img_crop)
- print('cls bot save img',save_path)
- box_id += 1
- def on_press(self, key):
- global box_offset
- if key == keyboard.Key.right or key == keyboard.Key.space: # Right arrow key
- if self.queue_index < len(frame_buffer) - 1:
- self.queue_index += 1
- else:
- self.load_new_frame = True
- elif key == keyboard.Key.left: # Left arrow key
- if self.queue_index > 0:
- self.queue_index -= 1
- self.load_new_frame = False
- elif key == keyboard.KeyCode.from_char('+'): # Numpad +
- box_offset+=1
- print("box_offset +1", box_offset)
- self.new_frame_flag = True
- elif key == keyboard.KeyCode.from_char('-'): # Numpad -
- box_offset-=1
- print("box_offset -1", box_offset)
- self.new_frame_flag = True
-
- elif hasattr(key, 'char') and key.char =='1':
- self.save_cls_err(0)
- elif hasattr(key, 'char') and key.char =='2':
- self.save_cls_err(1)
- elif hasattr(key, 'char') and key.char =='3':
- save_path = f"{det_dir}/{sw_name}_{self.frame_index}_0.jpg"
- cv2.imwrite(save_path, self.top_img_big)
- print('save det err cam 0', save_path)
- elif hasattr(key, 'char') and key.char =='4':
- save_path = f"{det_dir}/{sw_name}_{self.frame_index}_1.jpg"
- cv2.imwrite(save_path, self.bot_img_big)
- print('save det err cam 1',save_path)
- elif str(key) == "'c'": # 's' key
- print('c')
- elif str(key) == "'d'": # 's' key
- print('d')
- elif str(key) == "'q'": # Press 'q' to exit
- return False
- else:
- try:
- if key.char == '/': # 检查按键字符
- print('Numpad / key pressed')
- elif key.char=='*':
- print('Numpad * key pressed')
- except AttributeError:
- pass
- def show_frame(self):
- listener = keyboard.Listener(on_press=self.on_press)
- listener.start()
- self.load_new_frame = True
- while self.cap.isOpened():
- if self.load_new_frame:
- ret, frame = self.cap.read()
- if not ret:
- print('video end')
- continue
-
- self.frame_index += 1
- cv2.putText(frame, str(self.frame_index), (20, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
- frame_buffer.append(frame)
- self.queue_index = len(frame_buffer) - 1
- # print('load frame_i', self.frame_index, "queue_i", queue_index, 'queue len', len(frame_buffer))
- self.load_new_frame = False
-
- # Show the current frame
- if len(frame_buffer) > 0:
- view_index = self.frame_index - len(frame_buffer) + self.queue_index + 1
- imshow = frame_buffer[self.queue_index]
- # print('show frame_i', self.frame_index, "queue_i", queue_index, 'view_i', view_index)
- cv2.imshow('Video', imshow)
-
- if cv2.waitKey(1) & 0xFF == ord('q'): # Press 'q' to exit
- break
-
- # Release the video and close windows
- self.cap.release()
- cv2.destroyAllWindows()
- if __name__ == '__main__':
-
- sbbh='2212311106000040'
-
- dir_a= r'F:\data\jupiter\err_nos\0912\569'
- dir_a= r'D:\data\err_orders\jupter_pair_09\SW0031423091206302824557'
- dir_a= r'D:\data\err_orders\jupter_pair_09\SW0031423091206323752502'
- cooler_path = rf'F:\data\jupiter\configs/{sbbh}/config/config_cooler.yaml'
-
- with open(cooler_path, 'r') as file:
- yaml_dict = yaml.safe_load(file)
- img_x=yaml_dict['img_x']
- # img_x = 1920 - 1240 - img_x
- img_y=yaml_dict['img_y']
- top_line=yaml_dict['top_line']
- bot_line=yaml_dict['bot_line']
- img_height, img_width = 744, 1240
- door_type = 1
-
- cls_dir=dir_a+'/cls'
- det_dir=dir_a+'/det'
- os.makedirs(cls_dir,exist_ok=True)
- os.makedirs(det_dir,exist_ok=True)
- mp4_files = glob.glob(dir_a + '/high_*.mp4')
- # img_x=300+75
- # img_y= 202
- box_offset= 0
- video_path=video_path2=None
- for file in mp4_files:
- if "high_" in file and "_0_" in file:
- video_path=file
- elif "high_" in file and "_1_" in file:
- video_path2=file
- elif "high_" in file and "_2_" in file:
- video_path=file
- door_type = 2
- print("door_type=2")
-
- if door_type==1 and (video_path is None or video_path2 is None):
- print("cap1 or cap2 is None")
- exit(0)
- txt_path = video_path.replace(".mp4", "_tracks.txt")
-
- pattern = r'SW\d+'
- match = re.search(pattern, txt_path)
- sw_name = ""
- if match:
- sw_name = match.group()
- if door_type==2:
- img_x = 1920 - 1240 - img_x
-
- deque_len = 30
- frame_buffer = deque(maxlen=deque_len)
-
-
- save_width = 1000
- save_height = save_width *(img_height * 2) // img_width
- mp4_path=os.path.dirname(txt_path)+'/'+sw_name+'_tracks.mp4'
- fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 或者使用 'X264'
- out_writer = cv2.VideoWriter(mp4_path, fourcc, 25.0, (save_width, save_height))
- print('save track mp4 ',mp4_path,'w',save_width,'h',save_height)
- player = VideoPlayer(video_path,video_path2, txt_path)
- player.show_frame()