星火认知大模型是科大讯飞开发的,直接使用可以点击星火认知大模型,要调用API的话在讯飞开发平台注册,之后点击进入控制台。
注册后完成实名认证,以及各种新人礼包,白嫖一些tokens! 之后就去找官方API调用文档,星火大模型的API是通过websocket建立连接然后实现调用的,和通义千问通过HTTP协议调用、智谱AI直接用封装好的python包调用不同。
单论会话就是用户提一个问题,大模型回答,这一轮就结束了。下一轮会话用户向模型发送的信息中不包括上一轮的信息。
官方给的python调用实例,修改一下on_close()函数(需要三个参数),以及将服务器端和用户端代码分离开:SparkAPI.py—服务器端口的代码👇
# coding: utf-8
import _thread as thread
import os
import time
import base64
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
import openpyxl
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
class Ws_Param(object):
# 初始化
def __init__(self, APPID, APIKey, APISecret, gpt_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(gpt_url).netloc
self.path = urlparse(gpt_url).path
self.gpt_url = gpt_url
# 生成url
def create_url(self):
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + self.host + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + self.path + " HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": self.host
}
# 拼接鉴权参数,生成url
url = self.gpt_url + '?' + urlencode(v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
return url
# 收到websocket错误的处理
def on_error(ws, error):
print("### error:", error)
# 收到websocket关闭的处理
def on_close(ws, close_status_code, close_msg):
print("### closed ###")
# 收到websocket连接建立的处理
def on_open(ws):
thread.start_new_thread(run, (ws,))
def run(ws, *args):
data = json.dumps(gen_params(appid=ws.appid, messages=ws.messages, domain=ws.domain))
ws.send(data)
# 收到websocket消息的处理
def on_message(ws, message):
# print(message)
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f'请求错误: {code}, {data}')
ws.close()
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
# 保存一轮对话的回答
global answer
answer += content
print(content,end='')
if status == 2:
print("\n#### 关闭会话")
ws.close()
def gen_params(appid, messages, domain):
"""
通过appid和用户的提问来生成请参数
"""
data = {
"header": {
"app_id": appid,
"uid": "1234",
# "patch_id": [] #接入微调模型,对应服务发布后的resourceid
},
"parameter": {
"chat": {
"domain": domain,
"temperature": 0.5,
"max_tokens": 4096,
"auditing": "default",
}
},
"payload": {
"message": {
"text": messages
}
}
}
return data
def main(appid, api_secret, api_key, gpt_url, domain, messages):
wsParam = Ws_Param(appid, api_key, api_secret, gpt_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
ws.appid = appid
ws.messages = messages
ws.domain = domain
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
客户端去调用API的代码:spark_wenda.py👇
import sparkAPI as LLM_API
if __name__ == "__main__":
# 准备用户API信息 以及访问的模型url
appid="xxxx"
api_secret="xxxx"
api_key="xxxxx"
gpt_url="wss://spark-api.xf-yun.com/v3.5/chat"
# Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat" # v3.0环境的地址
# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址
# Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址
domain="generalv3.5"
# domain = "generalv3" # v3.0版本
# domain = "generalv2" # v2.0版本
# domain = "general" # v2.0版本
# 用户的问题
query = input("\nUser:")
question =[{'role':'user', 'content':query}]
LLM_API.answer =""
print("\nAssistant:",end = "")
# 调用api得到输出
LLM_API.main(appid, api_secret, api_key, gpt_url, domain, question)
运行结果如下,获取到了大模型的回答,并只输出了其content:
多轮会话需要注意前面几轮对话用户user的问题和大模型assistant的回答都要记录下来,通过websocket访问大模型时,将之前的问答信息一同发送,因此需要记录下每轮大模型的回答,即global answer,并将其附加在messages中。具体的Spark_wenda.py内容如下👇
import sparkAPI as LLM_API
messages =[]
# length = 0
# 并入一条新的message,可以是用户的问题,也可以是大模型的回答
def getText(role,content):
jsoncon = {}
jsoncon["role"] = role
jsoncon["content"] = content
messages.append(jsoncon)
return messages
# 获取messages的长度,向大模型发送的问题token数量是有限制的
def getlength(messages):
length = 0
for content in messages:
temp = content["content"]
leng = len(temp)
length += leng
return length
# 检查token数量
def checklen(messages):
while (getlength(messages) > 8000):
del messages[0]
return messages
if __name__ == "__main__":
# 准备用户API信息 以及访问的模型url
appid="xxxxxx"
api_secret="xxxxxxx"
api_key="xxxxxxxx"
gpt_url="wss://spark-api.xf-yun.com/v3.5/chat"
# Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat" # v3.0环境的地址
# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址
# Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址
domain="generalv3.5"
# domain = "generalv3" # v3.0版本
# domain = "generalv2" # v2.0版本
# domain = "general" # v2.0版本
messages.clear
while True: # 循环进行会话
query = input("\nUser:")
# 将用户新的问题加入历史问答messages中
question = checklen(getText("user",query))
LLM_API.answer =""
print("\nAssistant:",end = "")
LLM_API.main(appid, api_secret, api_key, gpt_url, domain, messages)
# 将星火大模型的输出附加到历史问答messages中
getText("assistant",LLM_API.answer)
运行观察大模型确实能够对每次的问题进行回答,且能够有效利用之前问答的信息。
messages记录的是多轮问答的内容以及最新一次会话用户的问题,messages确实是等于question的。在vscode中调试代码,观察messages和question中的信息,确实是有前面两轮问答的问题+答案,以及最新一轮用户的问题:
首先注册登录百度千帆大模型平台;然后在控制台添加应用,后续会用到API_KEY,SECRET_KEY;然后在文档中找到官方API调用文档,其中包括了不同模型的调用代码。
这里需要注意,后续要调用API大模型,需要在在线服务中开通服务,否则会报错{"error_code":17,"error_msg":"Open api daily request limit reached"}
。
千帆大模型平台中的大模型api是通过http的方式去请求-响应的,步骤包括1、通过api-key和secret-key获取access-token;2、利用access-token向大模型发起请求;3、获取大模型的响应。
import requests
import json
# 填入自己申请的api应用的信息
API_KEY = "xxxxxxx"
SECRET_KEY = "xxxxxxx"
# 参考网址👉 https://cloud.baidu.com/doc/WENXINWORKSHOP/s/Ilkkrb0i5
# access_token
def get_access_token():
"""
使用 AK,SK 生成鉴权签名(Access Token)
:return: access_token,或是None(如果错误)
"""
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
return str(requests.post(url, params=params).json().get("access_token"))
# 单轮会话
def Single_Round_Session():
# 访问的模型的url
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + get_access_token()
# 用户输入问题
query = input("\nUser:")
payload = json.dumps({
"disable_search": False,
"enable_citation": False,
# 添加问题 发送请求
"messages": [
{
"role": "user",
"content": query,
}
]
})
headers = {
'Content-Type': 'application/json'
}
# 获取大模型的响应
response = requests.request("POST", url, headers=headers, data=payload)
# 获取响应的内容
print(response.text)
if __name__ == '__main__':
Single_Round_Session()
例子:
多轮会话需要注意保存之前对话的问答,代码如下:
# 多轮会话
def Multi_Round_Session():
# 访问的模型的url
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + get_access_token()
# 用户的问题
messages = []
while True :
# 构造访问的问题
question = {"role": "user",
"content":input("\nUser:")}
messages.append(question)
payload = json.dumps({
"disable_search": False,
"enable_citation": False,
# 添加问题 发送请求
"messages": messages
})
headers = {
'Content-Type': 'application/json'
}
# 获取大模型的响应
response = requests.request("POST", url, headers=headers, data=payload)
# 获取响应的内容
print("\n",response.json()["result"] )
# 将响应内容添加到会话历史中
answer = {"role": "assistant",
"content":response.json()["result"]}
messages.append(answer)
if __name__ == '__main__':
Multi_Round_Session()
结果: