现在大模型领域非常魔幻一件事,是调用友商开源的大模型构建自己的数据集,大家相互调用, 数据同源导致同样的问题回答内容也差不多,也难怪大家会质疑某些大模型是套壳gpt了,看来只有能积累原始数据的公司才能最终活下来。
这里就演示下如何用多进程调用商用大模型构建sft数据集
# -*- coding: utf-8 -*-
# @Time : 2024/6/19 上午10:33
# @Author : yblir
# @File : xiaohuangya_deepseek_json.py
# explain :
# =======================================================
import json
import re
import sys
from openai import OpenAI
from loguru import logger
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
# from queue import Queue
# from multiprocessing import Queue, cpu_count, shared_memory, Process
work_nums = 100
# img_queue = Queue(1000)
client = OpenAI(api_key="这里填入自己购买的key", base_url="https://api.deepseek.com/")
def find_all_index(str_, pattern):
patt = re.compile(pattern, re.IGNORECASE)
pos = [item.start() for item in patt.finditer(str_)]
return pos
def run(question_, index):
new_line_ = {}
history_ = []
msg1 = '结合TABLE信息,分析TABLE表结构,question的查询目标,查询条件和查询逻辑这4项内容。'
messages = [
{"role": "system", "content": "你是个数据库专家,帮忙解答用户提出的问题"},
{"role": "user", "content": f"{question_}, {msg1}"},
]
response = client.chat.completions.create(
model="deepseek-coder",
messages=messages
)
result = response.choices[0].message.content
history_.append([f"{question_}, {msg1}", result])
# right_sql = deepseek_sql_create(question)
# print(result)
msg2 = "根据你的流程分析,针对给出的TABLE,写出能表达当前question意思的sql语句"
messages.append({'role': 'assistant', 'content': result})
messages.append(
{'role': 'user', 'content': msg2})
response = client.chat.completions.create(
model="deepseek-coder",
messages=messages
)
result = response.choices[0].message.content
history_.append([msg2, result])
# print('--------------------------------------------')
# print(result)
msg3 = '分析下你写的sql语句是否正确,如果你认为有不合理的地方,指出并改正。最后,给出你认为正确的sql语句'
messages.append({'role': 'assistant', 'content': result})
messages.append({'role' : 'user',
'content': msg3})
response = client.chat.completions.create(
model="deepseek-coder",
messages=messages
)
result = response.choices[0].message.content
# print('--------------------------------------------')
# print(result)
new_line_['instruction'] = msg3
new_line_['input'] = ''
new_line_['output'] = result
new_line_['history'] = history_
logger.info(f'已完成数量:{index}')
return new_line_
if __name__ == '__main__':
with open('tuning_sample.json', 'r', encoding='utf-8') as f:
data = json.load(f)
print(len(data))
j = 6
with ProcessPoolExecutor(max_workers=work_nums) as p:
futures = [p.submit(run, line['instruction'], i + 1) for i, line in enumerate(data) if i + 1 > 1000]
# 获取结果
new_data_list = []
k = 0
for future in as_completed(futures):
result = future.result()
new_data_list.append(result)
k += 1
# 每抽取200条数据保存一次
if k % 200 == 0:
json_data = json.dumps(new_data_list, indent=4, ensure_ascii=False)
with open(f'duck_sql_{j}.json', 'w', encoding='utf-8') as f:
f.write(json_data)
j += 1
new_data_list = []
json_data = json.dumps(new_data_list, indent=4, ensure_ascii=False)
with open(f'duck_sql_extra.json', 'w', encoding='utf-8') as f:
f.write(json_data)
logger.success('json 写入成功')