如题,代码如下,可设置多个图片水印及它们的移动位置
功能为:可以添加多个动态移动的水印,还可以设置水印的大小以及移动速度,也可以增加文字水印,重点是这个是批量执行的,可以对目录下的所有视频文件批量添加水印
- import cv2
- import os
- import numpy as np
- from moviepy.editor import VideoFileClip
-
- def add_dynamic_watermarks_with_audio(video_path, output_path, watermark_image_paths, watermark_sizes, watermark_speeds, add_text_watermark, text_watermark_text, text_watermark_color):
- # 读取视频
- video_clip = VideoFileClip(video_path)
- audio_clip = video_clip.audio
-
- # 获取视频的宽度和高度
- width, height = video_clip.size
-
- # 准备文本水印
- if add_text_watermark:
- # 创建用于绘制文本水印的画布
- text_watermark_canvas = np.zeros((height, width, 3), dtype=np.uint8)
-
- # 设置文本水印属性
- font_face = cv2.FONT_HERSHEY_SIMPLEX
- font_scale = 1.5
- font_thickness = 2
-
- # 获取文本水印的大小
- (text_width, text_height), _ = cv2.getTextSize(text_watermark_text, font_face, font_scale, font_thickness)
-
- # 计算文本水印的位置
- text_x = int((width - text_width) / 2)
- text_y = int(height - text_height - 10)
-
- # 在画布上绘制文本水印
- cv2.putText(text_watermark_canvas, text_watermark_text, (text_x, text_y), font_face, font_scale, text_watermark_color, font_thickness, cv2.LINE_AA)
-
- # 在每一帧上添加水印
- watermark_positions = [(0, 0)] * len(watermark_image_paths)
-
- def process_frame(t, x):
- nonlocal watermark_positions
-
- frame = video_clip.get_frame(t)
-
- # 在帧上添加图片水印
- for i in range(len(watermark_image_paths)):
- watermark_image_path = watermark_image_paths[i]
- watermark_size = watermark_sizes[i]
- watermark_speed = watermark_speeds[i]
-
- # 读取水印图片
- watermark_image = cv2.imread(watermark_image_path)
-
- # 调整水印图片大小
- watermark_height, watermark_width, _ = watermark_image.shape
- if watermark_width > width or watermark_height > height:
- scale_factor = min(width / watermark_width, height / watermark_height)
- watermark_image = cv2.resize(
- watermark_image,
- (int(watermark_width * scale_factor), int(watermark_height * scale_factor)),
- interpolation=cv2.INTER_LINEAR,
- )
-
- # 更新水印位置
- if t % watermark_speed == 0:
- while True:
- x = np.random.randint(0, width - watermark_image.shape[1])
- y = np.random.randint(0, height - watermark_image.shape[0])
- too_close = False
-
- # 检查新位置与已选择水印位置之间的距离
- for j in range(i):
- dist = np.sqrt((x - watermark_positions[j][0]) ** 2 + (y - watermark_positions[j][1]) ** 2)
- if dist < watermark_image.shape[1] or dist < watermark_image.shape[0]:
- too_close = True
- break
-
- if not too_close:
- watermark_positions[i] = (x, y)
- break
- else:
- x, y = watermark_positions[i]
-
- # 在帧上添加水印
- watermark_resized = cv2.resize(
- watermark_image, (int(watermark_size * watermark_width), int(watermark_size * watermark_height))
- )
- alpha = watermark_resized[:, :, 0] / 255.0
- for c in range(3):
- frame_copy = frame.copy()
- frame_copy[y : y + watermark_resized.shape[0], x : x + watermark_resized.shape[1], c] = (
- frame[y : y + watermark_resized.shape[0], x : x + watermark_resized.shape[1], c] * (1 - alpha)
- + watermark_resized[:, :, c] * alpha
- )
-
- # 在帧上添加文本水印
- if add_text_watermark:
- frame_with_text = cv2.addWeighted(frame, 1, text_watermark_canvas, 0.7, 0)
- frame = frame_with_text
-
- cv2.imshow("Watermarked Video", frame)
- cv2.waitKey(1)
-
- return frame
-
- # 处理视频每一帧
- processed_clip = video_clip.fl(lambda gf, t: process_frame(t, gf))
-
- # 输出处理后的视频
- final_clip = processed_clip.set_audio(audio_clip)
- final_clip.write_videofile(output_path, codec='libx264', audio_codec="aac")
-
- return "水印添加完成!"
-
- def batch_add_watermarks_in_directory_with_audio(directory, output_directory, watermark_image_paths, watermark_sizes, watermark_speeds, add_text_watermark=False, text_watermark_text=None):
- # 获取目录中的所有文件
- file_list = os.listdir(directory)
-
- # 遍历文件列表
- for file_name in file_list:
- # 检查文件是否为视频文件
- if file_name.lower().endswith(('.avi', '.mp4', '.mov', '.mkv')):
- file_path = os.path.join(directory, file_name)
-
- # 设置输出文件路径
- output_file_path = os.path.join(output_directory, file_name)
-
- # 添加水印
- add_dynamic_watermarks_with_audio(file_path, output_file_path, watermark_image_paths, watermark_sizes, watermark_speeds, add_text_watermark, text_watermark_text if add_text_watermark else None, (255, 255, 255))
-
- return "批量处理完成!"
-
- # 使用示例
- if __name__ == '__main__':
- directory = input("请输入视频文件所在目录路径:") # 视频文件所在目录
- output_directory = input("请输入输出视频文件目录路径:") # 输出视频文件目录
-
- # 检查并创建输出目录
- os.makedirs(output_directory, exist_ok=True)
-
- watermark_image_paths = []
- watermark_sizes = []
- watermark_speeds = []
-
- num_watermarks = int(input("请输入要添加的水印数量:"))
- for i in range(num_watermarks):
- watermark_image_path = input(f"请输入第{i+1}个水印图片的路径:")
- watermark_size = float(input(f"请输入第{i+1}个水印图片的大小(0~1之间):"))
- watermark_speed = int(input(f"请输入第{i+1}个水印图片的速度:"))
-
- watermark_image_paths.append(watermark_image_path)
- watermark_sizes.append(watermark_size)
- watermark_speeds.append(watermark_speed)
-
- add_text_watermark_input = input("是否添加文本水印?(y/n) ")
- if add_text_watermark_input.lower() == 'y':
- add_text_watermark = True
- text_watermark_text = input("请输入文本水印内容:")
- else:
- add_text_watermark = False
- text_watermark_text = None
-
- batch_add_watermarks_in_directory_with_audio(directory, output_directory, watermark_image_paths, watermark_sizes, watermark_speeds, add_text_watermark, text_watermark_text)