
云原生 API 网关 APISIX 入门教程
近期在研究一些常见的大语言模型的API怎么调用,让他们批量回答问题,然后把结果导出,也看了一些相关的官方文档。在这个推送中分享最终跑通的调用方法。
需求:有一个问题列表<q.pickle>,需要逐个提问,然后把回答记录下来。
笔记中所有的API key已删除,保证Key的安全。
封面图片来自:开源大语言模型(LLM)汇总(持续更新中) (yii666.com)
以下是调用实例,官方文档还有通过web或者Linux调用的。
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
import time
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import sys
import websocket
import pickle
# file = open('讯飞星火导出未分隔的结果.txt', 'w')
# def custom_print(text):
# print(text)
# file.write(text + '\n')
# file.flush() # 立即将内容写入文件
#
# # 重定向sys.stdout到自定义的输出流
# sys.stdout = custom_print
# # sys.stdout = custom_print
# 把输出的内容重新定向到一个文件
# sys.stdout = open('讯飞星火导出未分隔的结果.txt', 'w')
class Ws_Param(object):
# 初始化
def __init__(self, APPID, APIKey, APISecret, gpt_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(gpt_url).netloc
self.path = urlparse(gpt_url).path
self.gpt_url = gpt_url
# 生成url
def create_url(self):
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + self.host + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + self.path + " HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": self.host
}
# 拼接鉴权参数,生成url
url = self.gpt_url + '?' + urlencode(v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
return url
# 收到websocket错误的处理
def on_error(ws, error):
print("### error:", error)
# 收到websocket关闭的处理
def on_close(ws):
print("### closed ###")
# 收到websocket连接建立的处理
def on_open(ws):
thread.start_new_thread(run, (ws,))
def run(ws, *args):
data = json.dumps(gen_params(appid=ws.appid, question=ws.question))
ws.send(data)
# 收到websocket消息的处理
def on_message(ws, message):
# print(message)
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f'请求错误: {code}, {data}')
ws.close()
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
print(content, end='')
if status == 2:
ws.close()
def gen_params(appid, question):
"""
通过appid和用户的提问来生成请参数
"""
data = {
"header": {
"app_id": appid,
"uid": "1234"
},
"parameter": {
"chat": {
"domain": "general",
"random_threshold": 0.5,
"max_tokens": 2048,
"auditing": "default"
}
},
"payload": {
"message": {
"text": [
{"role": "user", "content": question}
]
}
}
}
return data
def main(appid, api_key, api_secret, gpt_url, question):
wsParam = Ws_Param(appid, api_key, api_secret, gpt_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
ws.appid = appid
ws.question = question
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
return
def ask_question(q):
main(appid="",
api_secret="",
api_key="",
gpt_url="ws://spark-api.xf-yun.com/v1.1/chat",
question= q)
print("@")
with open('D:\Desktop\AI与夜曲编程\chatbot及其余内容\自己写的小Bot\q.pickle', 'rb') as f:
questions = pickle.load(f)
for question in questions:
ask_question(question)
time.sleep(5)
# sys.stdout.close()
print("done")
保存:
import csv
def export_text_to_csv(text, filename):
paragraphs = text.split('@')
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
for paragraph in paragraphs:
writer.writerow([paragraph.strip()]) # 每行一个文本,去除前后空格
print("成功导出为CSV文件:", filename)
export_text_to_csv(text, 'output.csv')
import openai
openai.api_key = ""
openai.api_base = ""
response = openai.ChatCompletion.create(
model='gpt-4',
messages=[
{'role': 'user', 'content': "树上八只鸟,打掉一只,还有几只鸟"},
],
stream=True
)
for chunk in response:
print(chunk.choices[0].delta.content, end="", flush=True)
GPT3.5调用
def chat_with_robot(text):
completion = openai.ChatCompletion.create(
model = "gpt-3.5-turbo-0613",
messages=[
{'role': 'user', 'content': text},
],
temperature=0)
return completion
def use_gpt_in_jupyter(question):
clipbord = []
output = chat_with_robot(question)
print('GPT3,', output['usage']['total_tokens'], "token used,",round(output['usage']['total_tokens']/1000*0.002, 5), "人民币 used >>>>>")
print(output["choices"][0]["message"]["content"])
pass
with open('questions.pickle', 'rb') as f:
questions = pickle.load(f)
questions[::10]
new_answers=[]
timer = 0
for question in questions:
answer = use_gpt(question)
new_answers.append(answer)
timer = timer+1
# time.sleep(30)
print(f"第 {timer} 个问题已回答,\n【问题】= {question} \n【答案】= {answer}\n")
time.sleep(25)
df = pd.DataFrame({'Answers': new_answers})
df.to_excel('new_answers.xlsx', index=False)
import time
import requests
import json
import pandas as pd
import pickle
url = 'https://lm_experience.sensetime.com/v1/nlp/chat/completions'
def use_shangtang(question):
data = {
"messages": [{"role": "user", "content": question}],
"temperature": 1,
"top_p": 0.7,
"max_new_tokens": 1024,
"repetition_penalty": 1.05,
"stream": False,
"user": "test"
} # 咱只能2048个token呢,商汤大模型不行啊
headers = {
'Content-Type': 'application/json',
'Authorization': api_secret_key
}
response = requests.post(url, headers=headers, json=data)
raw_response = json.loads(response.text)
return raw_response["data"]["choices"][0]["message"]
with open('自己写的小Bot\q.pickle', 'rb') as f:
questions = pickle.load(f)
questions[::100]
answers=[]
timer = 0
for question in questions:
answer = use_shangtang(question)
answers.append(answer)
timer = timer+1
# time.sleep(30)
print(f"第 {timer} 个问题已回答,\n【问题】= {question} \n【答案】= {answer}\n")
time.sleep(3)
df = pd.DataFrame({'Answers': answers})
df.to_excel('商汤answers.xlsx', index=False)
print("done")
本文章转载微信公众号@小陆的空间