流式传输
OpenRoute AI API 允许从_任何模型_进行流式响应。这对于构建聊天界面或其他需要在模型生成响应时更新 UI 的应用程序很有用。
要启用流式传输,您可以在请求中将 stream
参数设置为 true
。然后模型将以块的形式将响应流式传输给客户端,而不是一次性返回整个响应。
以下是如何流式传输响应并处理它的示例:
import requests
import json
question = "How would you build the tallest building ever?"
url = "https://www.openroute.cn/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {API_KEY_REF}",
"Content-Type": "application/json"
}
payload = {
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": question}],
"stream": True
}
buffer = ""
with requests.post(url, headers=headers, json=payload, stream=True) as r:
for chunk in r.iter_content(chunk_size=1024, decode_unicode=True):
buffer += chunk
while True:
try:
# Find the next complete SSE line
line_end = buffer.find('\n')
if line_end == -1:
break
line = buffer[:line_end].strip()
buffer = buffer[line_end + 1:]
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
break
try:
data_obj = json.loads(data)
content = data_obj["choices"][0]["delta"].get("content")
if content:
print(content, end="", flush=True)
except json.JSONDecodeError:
pass
except Exception:
break
const question = 'How would you build the tallest building ever?';
const response = await fetch('https://www.openroute.cn/api/v1/chat/completions', {
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY_REF}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: question }],
stream: true,
}),
});
const reader = response.body?.getReader();
if (!reader) {
throw new Error('Response body is not readable');
}
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Append new chunk to buffer
buffer += decoder.decode(value, { stream: true });
// Process complete lines from buffer
while (true) {
const lineEnd = buffer.indexOf('\n');
if (lineEnd === -1) break;
const line = buffer.slice(0, lineEnd).trim();
buffer = buffer.slice(lineEnd + 1);
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') break;
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0].delta.content;
if (content) {
console.log(content);
}
} catch (e) {
// Ignore invalid JSON
}
}
}
}
} finally {
reader.cancel();
}
const question = 'How would you build the tallest building ever?';
const response = await fetch('https://www.openroute.cn/api/v1/chat/completions', {
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY_REF}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: question }],
stream: true,
}),
});
const reader = response.body?.getReader();
if (!reader) {
throw new Error('Response body is not readable');
}
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Append new chunk to buffer
buffer += decoder.decode(value, { stream: true });
// Process complete lines from buffer
while (true) {
const lineEnd = buffer.indexOf('\n');
if (lineEnd === -1) break;
const line = buffer.slice(0, lineEnd).trim();
buffer = buffer.slice(lineEnd + 1);
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') break;
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0].delta.content;
if (content) {
console.log(content);
}
} catch (e) {
// Ignore invalid JSON
}
}
}
}
} finally {
reader.cancel();
}
附加信息
对于 SSE(服务器发送事件)流,OpenRoute AI 偶尔发送注释以防止连接超时。这些注释如下所示:
: OPENROUTE AI PROCESSING
根据SSE 规范,可以安全地忽略注释负载。但是,您可以根据需要利用它来改善用户体验,例如显示动态加载指示器。
某些 SSE 客户端实现可能不会根据规范解析负载,这会在您对非 JSON 负载进行 JSON.stringify
时导致未捕获的错误。我们推荐以下客户端:
流取消
流式请求可以通过中止连接来取消。对于支持的提供商,这会立即停止模型处理和计费。
要实现流取消:
import requests
from threading import Event, Thread
def stream_with_cancellation(prompt: str, cancel_event: Event):
with requests.Session() as session:
response = session.post(
"https://www.openroute.cn/api/v1/chat/completions",
headers={"Authorization": f"Bearer {API_KEY_REF}"},
json={"model": "openai/gpt-4o", "messages": [{"role": "user", "content": prompt}], "stream": True},
stream=True
)
try:
for line in response.iter_lines():
if cancel_event.is_set():
response.close()
return
if line:
print(line.decode(), end="", flush=True)
finally:
response.close()
# Example usage:
cancel_event = Event()
stream_thread = Thread(target=lambda: stream_with_cancellation("Write a story", cancel_event))
stream_thread.start()
# To cancel the stream:
cancel_event.set()
const controller = new AbortController();
try {
const response = await fetch(
'https://www.openroute.cn/api/v1/chat/completions',
{
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY_REF}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: 'Write a story' }],
stream: true,
}),
signal: controller.signal,
},
);
// Process the stream...
} catch (error) {
if (error.name === 'AbortError') {
console.log('Stream cancelled');
} else {
throw error;
}
}
// To cancel the stream:
controller.abort();
const controller = new AbortController();
try {
const response = await fetch(
'https://www.openroute.cn/api/v1/chat/completions',
{
method: 'POST',
headers: {
Authorization: `Bearer ${API_KEY_REF}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: 'Write a story' }],
stream: true,
}),
signal: controller.signal,
},
);
// Process the stream...
} catch (error) {
if (error.name === 'AbortError') {
console.log('Stream cancelled');
} else {
throw error;
}
}
// To cancel the stream:
controller.abort();
取消仅适用于支持提供商的流式请求。对于非流式请求或不支持的提供商,模型将继续处理,您将被收取完整响应的费用。
流式传输中的错误处理
OpenRoute AI 根据错误在流式传输过程中发生的时间采用不同的错误处理方式:
发送任何令牌之前的错误
如果在任何令牌流式传输给客户端之前发生错误,OpenRoute AI 返回带有适当 HTTP 状态代码的标准 JSON 错误响应。这遵循标准错误格式:
{
"error": {
"code": 400,
"message": "Invalid model specified"
}
}
常见的 HTTP 状态代码包括:
- 400: 错误请求(无效参数)
- 401: 未授权(无效 API 密钥)
- 402: 需要付款(信用额度不足)
- 429: 请求过多(被限流)
- 502: 网关错误(提供商错误)
- 503: 服务不可用(无可用提供商)
发送令牌后的错误(流中)
如果某些令牌已经流式传输给客户端后发生错误,OpenRoute AI 无法更改 HTTP 状态代码(已经是 200 OK)。相反,错误作为具有统一结构的服务器发送事件(SSE)发送:
data: {"id":"cmpl-abc123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-3.5-turbo","provider":"openai","error":{"code":"server_error","message":"Provider disconnected unexpectedly"},"choices":[{"index":0,"delta":{"content":""},"finish_reason":"error"}]}
流中错误的关键特征:
- 错误出现在顶层,与标准响应字段(id、object、created 等)并列
- 包含带有
finish_reason: "error"
的choices
数组以正确终止流 - HTTP 状态保持 200 OK,因为标头已经发送
- 流在此统一错误事件后终止
代码示例
以下是如何在您的流式实现中正确处理两种类型的错误:
import requests
import json
async def stream_with_error_handling(prompt):
response = requests.post(
'https://www.openroute.cn/api/v1/chat/completions',
headers={'Authorization': f'Bearer {API_KEY_REF}'},
json={
'model': 'openai/gpt-4o',
'messages': [{'role': 'user', 'content': prompt}],
'stream': True
},
stream=True
)
# Check initial HTTP status for pre-stream errors
if response.status_code != 200:
error_data = response.json()
print(f"Error: {error_data['error']['message']}")
return
# Process stream and handle mid-stream errors
for line in response.iter_lines():
if line:
line_text = line.decode('utf-8')
if line_text.startswith('data: '):
data = line_text[6:]
if data == '[DONE]':
break
try:
parsed = json.loads(data)
# Check for mid-stream error
if 'error' in parsed:
print(f"Stream error: {parsed['error']['message']}")
# Check finish_reason if needed
if parsed.get('choices', [{}])[0].get('finish_reason') == 'error':
print("Stream terminated due to error")
break
# Process normal content
content = parsed['choices'][0]['delta'].get('content')
if content:
print(content, end='', flush=True)
except json.JSONDecodeError:
pass
async function streamWithErrorHandling(prompt: string) {
const response = await fetch(
'https://www.openroute.cn/api/v1/chat/completions',
{
method: 'POST',
headers: {
'Authorization': `Bearer ${API_KEY_REF}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true,
}),
}
);
// Check initial HTTP status for pre-stream errors
if (!response.ok) {
const error = await response.json();
console.error(`Error: ${error.error.message}`);
return;
}
const reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
while (true) {
const lineEnd = buffer.indexOf('\n');
if (lineEnd === -1) break;
const line = buffer.slice(0, lineEnd).trim();
buffer = buffer.slice(lineEnd + 1);
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
// Check for mid-stream error
if (parsed.error) {
console.error(`Stream error: ${parsed.error.message}`);
// Check finish_reason if needed
if (parsed.choices?.[0]?.finish_reason === 'error') {
console.log('Stream terminated due to error');
}
return;
}
// Process normal content
const content = parsed.choices[0].delta.content;
if (content) {
console.log(content);
}
} catch (e) {
// Ignore parsing errors
}
}
}
}
} finally {
reader.cancel();
}
}
async function streamWithErrorHandling(prompt) {
const response = await fetch(
'https://www.openroute.cn/api/v1/chat/completions',
{
method: 'POST',
headers: {
'Authorization': `Bearer ${API_KEY_REF}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true,
}),
}
);
// Check initial HTTP status for pre-stream errors
if (!response.ok) {
const error = await response.json();
console.error(`Error: ${error.error.message}`);
return;
}
const reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
while (true) {
const lineEnd = buffer.indexOf('\n');
if (lineEnd === -1) break;
const line = buffer.slice(0, lineEnd).trim();
buffer = buffer.slice(lineEnd + 1);
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
// Check for mid-stream error
if (parsed.error) {
console.error(`Stream error: ${parsed.error.message}`);
// Check finish_reason if needed
if (parsed.choices?.[0]?.finish_reason === 'error') {
console.log('Stream terminated due to error');
}
return;
}
// Process normal content
const content = parsed.choices[0].delta.content;
if (content) {
console.log(content);
}
} catch (e) {
// Ignore parsing errors
}
}
}
}
} finally {
reader.cancel();
}
}
API 特定行为
不同的 API 端点可能以略有不同的方式处理流式错误:
- OpenAI Chat Completions API:如果没有处理任何块,则直接返回
ErrorResponse
,或者如果处理了一些块,则在响应中包含错误信息 - OpenAI Responses API:可能将某些错误代码(如
context_length_exceeded
)转换为带有finish_reason: "length"
的成功响应,而不是将它们视为错误
Last updated on