NOTE:代码仅用来参考,没时间解释啦!
在某台服务器中,从存放数据集的数据库自动抽取标注好的数据标签,这一步操作有什么用呢?当我们发现我们数据不均衡的时候,就如上图右边部分。我们可以从数据库中抽取缺少的数据标签进行填充。
- import os
- import shutil
- # from get_structs import print_file_structure
- import random
-
- def print_file_structure(file_path, indent=''):
- if os.path.isfile(file_path):
- print(indent + '├── ' + os.path.basename(file_path))
- elif os.path.isdir(file_path):
- print(indent + '├── ' + os.path.basename(file_path))
- for root, dirs, files in os.walk(file_path):
- for name in dirs:
- print(indent + '│ ├── ' + name)
- for name in files:
- print(indent + '│ └── ' + name)
- break # Only print files in the top-level directory
- break # Only print directories in the top-level directory
- else:
- print('无效的文件路径')
-
- def from_dataset_get_data_label(source_dataset_path, label):
- subFiles = os.listdir(source_dataset_path)
- if label not in subFiles:
- print("您输入的标签名无效,不存在于test子目录中!")
- return
- target_path = os.path.join(source_dataset_path, label)
- label_lenght = count_jpg_files(target_path)
- print("<{}>标签的数量统计为:【{}】".format(label, label_lenght))
- print('------------------------------------')
- all_need_img_paths = []
- all_need_xml_paths = []
- for file_name in os.listdir(target_path):
- subPath = os.path.join(target_path, file_name)
- if not os.path.isdir(subPath):
- continue
- for data_name in os.listdir(subPath):
- if data_name.endswith('.jpg'):
- xml_file = os.path.splitext(data_name)[0] + '.xml'
- if os.path.exists(os.path.join(subPath, xml_file)):
- all_need_img_paths.append(os.path.join(subPath, data_name))
- all_need_xml_paths.append(os.path.join(subPath, xml_file))
- # print(all_need_img_paths[:5])
- print("统计有xml的图片数量:",len(all_need_img_paths))
- print('------------------------------------')
- get_num = int(input("请输入您要随机抽取的数据数量:"))
- print('------------------------------------')
- if get_num > len(all_need_img_paths):
- get_num = len(all_need_img_paths) - 1
- random_indexs = random.sample(range(len(all_need_img_paths)), get_num)
- print("请注意!所有文件都会复制到工作目录,请慎重选择工作目录。")
- print('------------------------------------')
- opt = input("请选择您的移动方式:[cp/mv]")
- print('------------------------------------')
- while opt not in ['cp', 'mv']:
- opt = input("[ERROR]请选择您的移动方式:[cp/mv]")
- print('------------------------------------')
- if opt == 'cp':
- for inx in random_indexs:
- wd = os.getcwd()
- if not os.path.exists(wd + '/' + 'images'):
- os.makedirs(wd + '/' + 'images')
- if not os.path.exists(wd + '/' + 'Annotations'):
- os.makedirs(wd + '/' + 'Annotations')
-
- img_path = all_need_img_paths[inx]
- shutil.copyfile(img_path, wd + '/' + 'images/' + img_path.split('/')[-1])
- xml_path = all_need_xml_paths[inx]
- shutil.copyfile(xml_path, wd + '/' + 'Annotations/' + xml_path.split('/')[-1])
- elif opt == 'mv':
- pass
-
- print("在上列操作中您选择了{}标签,从中抽取了{}数据量,并且使用{}方式放到了{}工作目录下。".format(label, get_num, opt, wd))
- print('------------------------------------')
- def count_jpg_files(path):
- count = 0
- for root, dirs, files in os.walk(path):
- for file in files:
- if file.endswith('.jpg'):
- xml_file = os.path.splitext(file)[0] + '.xml'
- if os.path.exists(os.path.join(root, xml_file)):
- count += 1
- return count
-
- if __name__ == "__main__":
- source_dataset_path = '/data/personal/chz/find_allimgs_label/test'
- use_labels = ["zsd_m","zsd_l","fhz_h","fhz_f","kk_f","kk_h","fhz_bs", "fhz_ycn","fhz_wcn","fhz_red_h", "fhz_green_f", "fhz_m", "bs_ur", "bs_ul", "bs_up", "bs_down", "fhz_ztyc", "bs_right", "bs_left", "bs_dl", "bs_dr", "kgg_ybh", "kgg_ybf", "yljdq_flow", "yljdq_stop"]
- print_file_structure(source_dataset_path, "")
- print('------------------------------------')
- label = input("请您根据上列中的test菜单,选取您想要的标签:")
- print('------------------------------------')
- from_dataset_get_data_label(source_dataset_path, label)
- import minio
- import pymysql
- import openpyxl
- import os
-
- def get_data_from_mysql():
- # 连接数据库-
- conn = pymysql.connect(host="10.168.1.94", user="", passwd="", db="RemotePatrolDB", port=, charset="utf8")
- cur = conn.cursor() # 创建游标对象
-
- # 查询表中数据
- cur.execute("SELECT * FROM CorrectPoint;")
- df = cur.fetchall() # 获取所有数据
- imageUrls = []
- for data in df:
- imageUrls.append(data[15])
- # print(data[15])
- cur.close()
- conn.close()
- return imageUrls
-
- def save_for_excel(df):
- wb = openpyxl.Workbook()
- ws = wb.active
- for row in df:
- ws.append(row)
- wb.save("文件名.xlsx")
-
-
- # 从minio上面拉取图片
- def load_data_minio(bucket: str, imageUrls):
- minio_conf = {
- 'endpoint': '10.168.1.96:9000',
- 'access_key': '',
- 'secret_key': '',
- 'secure': False
- }
- client = minio.Minio(**minio_conf)
- if not client.bucket_exists(bucket):
- return None
-
- root_path = os.path.join("imageUrlFromminIO")
- for imageUrl in imageUrls:
- imageUrl = imageUrl.split('/')[-1]
- data = client.get_object(bucket, imageUrl)
- save_path = os.path.join(root_path, imageUrl)
- with open(save_path, 'wb') as file_data:
- for d in data.stream(32 * 1024):
- file_data.write(d)
- return data.data
-
- # 上传图片到minio
- def up_data_minio(bucket: str, image_Urls_path='imageUrlFromminIO'):
- # TODO:minio_conf唯一要修改的地方!
- minio_conf = {
- 'endpoint': '192.168.120.188',
- 'access_key': '',
- 'secret_key': '',
- 'secure': False
- }
- for im_name in os.listdir(image_Urls_path):
- client = minio.Minio(**minio_conf)
- '''
- client.fput_object('mybucket', 'myobject.jpg', '/path/to/myobject.jpg', content_type='image/jpeg')
- '''
- client.fput_object(bucket_name=bucket, object_name=im_name,
- file_path=os.path.join(image_Urls_path, im_name),
- content_type='image/jpeg'
- )
-
- def download():
- # NOTE:Step:1 拉取数据库信息
- imageUrls = get_data_from_mysql()
- # NOTE:Step:2 把图片从96的minio上面拉下来
- print(type(load_data_minio("test", imageUrls)))
-
- def upload():
- # NOTE:Step:3 把拉下来的图片传上去给XXX服务器的minio
- up_data_minio("test", image_Urls_path='imageUrlFromminIO')
-
- if __name__ == "__main__":
- # 拉取使用
- download()
- # 上推使用
- # upload()
- '''
- 用于批量修改数据库ImagePath字段信息,替换为自己的ip。
- ---
- UPDATE CorrectPoint SET ImagePath=REPLACE(ImagePath, '10.168.1.96', '192.168.120.188');
- '''
-
-
-
需要放一个文件到本地目录:
链接:https://pan.baidu.com/s/1iEJKpqt-z_5yBJdenUABbA
提取码:uoox
--来自百度网盘超级会员V3的分享
- def cv2AddChineseText(self, img_ori, text, p1, box_color, textColor=(255, 255, 255), textSize=17):
- if (isinstance(img_ori, np.ndarray)): # 判断是否OpenCV图片类型
- img = Image.fromarray(cv2.cvtColor(img_ori, cv2.COLOR_BGR2RGB))
- # 创建一个可以在给定图像上绘图的对象
- draw = ImageDraw.Draw(img)
- # 字体的格式
- fontStyle = ImageFont.truetype(
- "simsun.ttc", textSize, encoding="utf-8")
- # 绘制文本
- text_width, text_height = draw.textsize(text, font=fontStyle)
- position = []
- outside_x = p1[0] + text_width + 3 < img.width
- outside_y = p1[1] - text_height - 3 >= 0
- position.append(p1[0] + 3 if outside_x else img.width - text_width)
- position.append(p1[1] - text_height - 3 if outside_y else p1[1] + 3)
- p2 = (position[0] + text_width, position[1] + text_height)
- image = cv2.rectangle(img_ori, position, p2, box_color, -1, cv2.LINE_AA) # filled
- img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
- draw = ImageDraw.Draw(img)
- draw.text((position[0], position[1]), text, textColor, font=fontStyle)
- # 转换回OpenCV格式
- return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)
-
- def draw_boxs(self, boxes, image):
- for res in boxes:
- box = [res[0], res[1], res[2]+res[0], res[3]+res[1]]
- label = self.labels[res[4]]
- conf = round(res[5], 4)
-
- box = np.array(box[:4], dtype=np.int32) # xyxy
-
- line_width = int(3)
- txt_color = (255, 255, 255)
- box_color = (58, 56, 255)
-
- p1, p2 = (box[0], box[1]), (box[2], box[3])
- image = cv2.rectangle(image, p1, p2, box_color, line_width)
-
- tf = max(line_width - 1, 1) # font thickness
- box_label = '%s: %.2f' % (self.get_desc(label), conf)
- image = self.cv2AddChineseText(image, box_label, p1, box_color, txt_color)
- return image
使用labelimage标注的文件是xml的,无法用来yolo训练,所以需要使用自动转换工具把xml都转换为txt。
请确保目录结构如下:
- import os
- import xml.etree.ElementTree as ET
- import cv2
- import random
- from tqdm import tqdm
- from multiprocessing import Pool
- import numpy as np
- import shutil
- '''
- 优化之前:
- 1.把函数路径改为新的数据集,先运行一次,生成txt;
- 2.把新的数据集Images Annotations labels都手动放入 原生数据集;
- 3.再把路径改回来原生数据集,再运行一次,生成txt;
- 问题:
- (1)txt不是追加模式,虽然会在第三步被覆盖掉,但重复执行没必要。
- (2)有很多地方类似(1)其实是运行了两次的。
- 优化之后:
- 1.把函数路径改为新的数据集,运行一次,完成!
- '''
- random.seed(0)
- class Tools_xml2yolo(object):
- def __init__(self,
- img_path = r"ft_220/images",
- anno_path = r"ft_220/annotations_xml",
- label_path = r"ft_220/labels",
- themeFIle = 'ft_220',
- classes = [""],
- the_data_is_new=False
- ) -> None:
- self.img_path = img_path
- self.anno_path = anno_path
- self.label_path = label_path
- self.the_data_is_new = the_data_is_new
- self.classes = classes
-
- self.txt_path = themeFIle
- if the_data_is_new:
- self.ftest = open(os.path.join(self.txt_path,'test.txt'), 'a')
- self.ftrain = open(os.path.join(self.txt_path,'train.txt'), 'a')
- else:
- self.ftest = open(os.path.join(self.txt_path,'test.txt'), 'w')
- self.ftrain = open(os.path.join(self.txt_path,'train.txt'), 'w')
-
- train_percent = 1
-
- self.files = os.listdir(self.anno_path)
- num = len(self.files)
- # print('num image',num)
- list = range(num)
- tr = int(num * train_percent)
- self.train_list = random.sample(list, tr)
- print('len train', self.train_list)
- if not os.path.exists(self.label_path):
- os.makedirs(self.label_path)
-
- def resi(self, num):
- x = round(num, 6)
- x = str(abs(x))
- while len(x) < 8:
- x = x + str(0)
- return x
-
- def convert(self, size, box):
- dw = 1./size[0]
- dh = 1./size[1]
- x = (box[0] + box[1])/2.0 # x = x轴中点
- y = (box[2] + box[3])/2.0 # y = y轴中点
- w = box[1] - box[0] #w = width
- h = box[3] - box[2] # h = height
- x = self.resi(x*dw)
- w = self.resi(w*dw)
- y = self.resi(y*dh)
- h = self.resi(h*dh)
- return (x,y,w,h)
-
- # import glob
- def process(self, name):
- # found_flag = 0
- img_names = ['.jpg','.JPG','.PNG','.png','.jpeg']
- for j in img_names:
- img_name = os.path.splitext(name)[0] + j
- iter_image_path = os.path.join(self.img_path, img_name)
- # print("iter image path:", iter_image_path)
- if os.path.exists(iter_image_path):
- break
-
- xml_name = os.path.splitext(name)[0] + ".xml"
- txt_name = os.path.splitext(name)[0] + ".txt"
- string1 = ""
- # print(name)
- w,h = None, None
- iter_anno_path = os.path.join(self.anno_path, xml_name)
- iter_txt_path = os.path.join(self.label_path, txt_name)
- xml_file = ET.parse(iter_anno_path)
-
- root = xml_file.getroot()
-
- try:
- with open(iter_image_path, 'rb') as f:
- check = f.read()[-2:]
- if check != b'\xff\xd9':
- print('JPEG File collapse:', iter_image_path)
- a = cv2.imdecode(np.fromfile(iter_image_path,dtype=np.uint8),-1)
- cv2.imencode(".jpg", a)[1].tofile(iter_image_path)
- h,w = cv2.imdecode(np.fromfile(iter_image_path, dtype=np.uint8),-1).shape[:2]
- print('----------Rewrite & Read image successfully----------')
-
- else:
- h,w = cv2.imdecode(np.fromfile(iter_image_path,dtype=np.uint8),-1).shape[:2]
- except:
- print(iter_image_path)
-
- if (w is not None) and (h is not None):
- count = 0
- for child in root.findall('object'):
- if child != '':
- count = count + 1
- if count != 0:
-
- string1 = []
- for obj in root.iter('object'):
- cls = obj.find('name').text
-
- if cls in self.classes:
- cls_id = self.classes.index(cls)
- else:
- print(cls)
- continue
- xmlbox = obj.find('bndbox')
- b = (float(xmlbox.find('xmin').text), float(xmlbox.find('xmax').text), float(xmlbox.find('ymin').text),
- float(xmlbox.find('ymax').text))
- bb = self.convert((w, h), b)
- for a in bb:
- if float(a) > 1.0:
- print(iter_anno_path + "wrong xywh",bb)
- return
-
- string1.append(str(cls_id) + " " + " ".join([str(a) for a in bb]) + '\n')
-
- out_file = open(iter_txt_path, "w")
- for string in string1:
- out_file.write(string)
- out_file.close()
- else:
- print('count=0')
- print(img_name)
-
- else:
- print('wh is none')
-
- def moveNewData(self, ):
- newImageDataPaths = os.listdir(self.img_path)
- newAnnotationPaths = os.listdir(self.anno_path)
- newLabelPaths = os.listdir(self.label_path)
- for idx in range(len(newAnnotationPaths)):
- shutil.move(os.path.join(self.img_path, newImageDataPaths[idx]), os.path.join(self.txt_path, "images",newImageDataPaths[idx]) )
- shutil.move(os.path.join(self.anno_path, newAnnotationPaths[idx]), os.path.join(self.txt_path, "Annotations",newAnnotationPaths[idx]) )
- shutil.move(os.path.join(self.label_path, newLabelPaths[idx]), os.path.join(self.txt_path, "labels",newLabelPaths[idx]) )
-
- def run(self,):
-
- pbar = tqdm(total=(len(self.files)))
- update = lambda *args: pbar.update()
-
- pool = Pool(6)
-
- for i, name in enumerate(self.files):
- self.process(name)
- print("Iter:[{}:{}]".format(i+1, len(self.files)))
- '''
- pool.apply_async必须在 if __main__ == "__main__"中被定义才可以使用;
- 这点以后优化得了,现在数据量少还用不上。
- 所以改成面对对象class类这样运行,多进程是不会有反应的。所以加了上面这个函数。
- 本来是没有的。
- '''
- pool.apply_async(self.process, args=(name), callback=update)
- # pbar.update(1)
-
- pool.close()
- pool.join()
- img_names = ['.jpg','.JPG','.PNG','.png', '.jpeg']
- for i, name in enumerate(self.files):
- for j in img_names:
- img_name = os.path.splitext(name)[0] + j
- iter_image_path = os.path.join(self.img_path, img_name)
- if os.path.exists(iter_image_path):
- break
-
- if i in self.train_list:
- self.ftrain.write(iter_image_path + "\n")
- else:
- self.ftest.write(iter_image_path + "\n")
-
- # writeAnnotation_path = os.path.join(self.img_path, os.path.splitext(name)[0] + '.xml')
- # print("写入:", iter_image_path, writeAnnotation_path )
- # 如果有只有图片没有xml的,需要生成空白txt
- if self.anno_path == '':
- imgs = os.listdir(self.img_path)
- for img_name in imgs:
- txt_name = os.path.basename(img_name).split('.')[0] + '.txt'
- if not os.path.exists(os.path.join(self.label_path, txt_name)):
- _ = open(os.path.join(self.label_path, txt_name),'w')
- self.ftrain.write(os.path.join(self.img_path, img_name) + "\n")
- if self.the_data_is_new:
- self.moveNewData()
-
- if __name__ == '__main__':
- # tool = Tools_xml2yolo()
- tool = Tools_xml2yolo(
- img_path='datasets/jzl_zhoushan_train/images/',
- anno_path='datasets/jzl_zhoushan_train/Annotations/',
- label_path='datasets/jzl_zhoushan_train/labels/',
- themeFIle='datasets/jzl_zhoushan_train/',
- classes=["zsd_m","zsd_l","fhz_h","fhz_f","kk_f","kk_h","fhz_bs", "fhz_ycn","fhz_wcn","fhz_red_h", "fhz_green_f", "fhz_m", "bs_ur", "bs_ul", "bs_up", "bs_down", "fhz_ztyc", "bs_right", "bs_left", "bs_dl", "bs_dr", "kgg_ybh", "kgg_ybf", "yljdq_flow", "yljdq_stop"],
- the_data_is_new=False)
-
- # themeFIle是原生数据集
- # 前面三个参数是新增数据集子集
- # the_data_is_new=True: 自动把images\Annotations\labels移到原生数据集对应images\Annotations\labels里面
- # 默认把xml转换为yolo训练所需的txt格式
- tool.run()
- import os
- import torch
- import xml.etree.ElementTree as ET
- from PIL import Image
-
- # 分类类别名称字典
- class_dict = {
- 'zsd_m': '指示灯灭',
- 'zsd_l': '指示灯亮',
- 'fhz_h': '分合闸-合',
- 'fhz_f': '分合闸-分',
- 'fhz_ztyc': '分合闸-状态异常',
- 'fhz_bs': '旋转把手',
- 'kk_f': '空气开关-分',
- 'kk_h': '空气开关-合',
- 'fhz_ycn': '分合闸-已储能',
- 'fhz_wcn': '分合闸未储能',
- 'fhz_red_h': '分合闸-红-合',
- 'fhz_green_f': '分合闸-绿-分',
- 'fhz_m': '分合闸-灭',
- 'bs_ur': '把手-右上',
- 'bs_ul': '把手-左上',
- 'bs_up': '把手-上',
- 'bs_down': '把手-下',
- 'bs_right': '把手-右',
- 'bs_left': '把手-左',
- 'bs_dl': '把手-左下',
- 'bs_dr': '把手-右下',
- "kgg_ybf": "开关柜-压板分",
- "kgg_ybh": "开关柜-压板合",
- "ddzsd_green":"带电指示灯-绿色",
- "ddzsd_red":"带电指示灯-红色"
- }
-
- def detect_and_save(model_path, folder_path, iter_start_index):
- # 加载模型
- model = torch.load(model_path, map_location=torch.device('cpu'))
-
- # 将模型设置为评估模式
- model.eval()
-
- # 遍历文件夹下的每一张图片
- for ind, file_name in enumerate(os.listdir(folder_path)):
- if ind <= iter_start_index:
- continue
- if file_name.endswith('.jpg') or file_name.endswith('.png'):
- # 打开图片
- img_path = os.path.join(folder_path, file_name)
- img = Image.open(img_path)
-
- # 进行推理
- results = model(img)
-
- # 生成xml文件
- root = ET.Element('annotation')
- folder = ET.SubElement(root, 'folder')
- folder.text = os.path.basename(folder_path)
- filename = ET.SubElement(root, 'filename')
- filename.text = file_name
- size = ET.SubElement(root, 'size')
- width = ET.SubElement(size, 'width')
- width.text = str(img.width)
- height = ET.SubElement(size, 'height')
- height.text = str(img.height)
- depth = ET.SubElement(size, 'depth')
- depth.text = str(3)
- for result in results.xyxy[0]:
- if result[-1] in class_dict:
- obj = ET.SubElement(root, 'object')
- name = ET.SubElement(obj, 'name')
- name.text = class_dict[result[-1]]
- bndbox = ET.SubElement(obj, 'bndbox')
- xmin = ET.SubElement(bndbox, 'xmin')
- xmin.text = str(int(result[0]))
- ymin = ET.SubElement(bndbox, 'ymin')
- ymin.text = str(int(result[1]))
- xmax = ET.SubElement(bndbox, 'xmax')
- xmax.text = str(int(result[2]))
- ymax = ET.SubElement(bndbox, 'ymax')
- ymax.text = str(int(result[3]))
-
- # 保存xml文件
- xml_path = os.path.join(folder_path, os.path.splitext(file_name)[0] + '.xml')
- tree = ET.ElementTree(root)
- tree.write(xml_path)
-
- if __name__ == "__main__":
- detect_and_save('./best.pt', './rmwrite_zhoushan/rmwrite_zhoushan', iter_start_index=180)