feat: add full async for openrouter

This commit is contained in:
2025-07-23 19:42:32 +08:00
parent f22f1f717e
commit 1c05f934fe
3 changed files with 682 additions and 150 deletions

View File

@@ -8,7 +8,7 @@ author_url: https://zhuangyumin.dev
"""
import re
import requests
import aiohttp
import json
import time
from typing import List, Union, Generator, Iterator, Optional, Callable, Any, Awaitable, AsyncGenerator
@@ -75,26 +75,29 @@ class Pipe:
self.type = "manifold" # Multiple models
self.valves = self.Valves()
def pipes(self) -> List[dict]:
"""Fetch available models from OpenRouter API"""
async def pipes(self) -> List[dict]:
"""Fetch available models from OpenRouter API asynchronously"""
if not self.valves.OPENROUTER_API_KEY:
return [{"id": "error", "name": "API Key not provided"}]
try:
headers = {"Authorization": f"Bearer {self.valves.OPENROUTER_API_KEY}"}
response = requests.get(
"https://openrouter.ai/api/v1/models", headers=headers
)
async with aiohttp.ClientSession() as session:
async with session.get(
"https://openrouter.ai/api/v1/models",
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status != 200:
return [
{
"id": "error",
"name": f"Error fetching models: {response.status}",
}
]
if response.status_code != 200:
return [
{
"id": "error",
"name": f"Error fetching models: {response.status_code}",
}
]
models_data = response.json()
models_data = await response.json()
# Extract model information
models = []
@@ -118,7 +121,7 @@ class Pipe:
return [{"id": "error", "name": f"Error: {str(e)}"}]
async def _report_api_call_direct(self, usage_info: dict, user_email: str, model_id: str, __event_emitter__: Callable[[Any], Awaitable[None]]):
"""Report API call to upstream reporting service using direct usage information"""
"""Report API call to upstream reporting service using direct usage information asynchronously"""
if not self.valves.REPORT_API_URL or not self.valves.REPORT_API_KEY:
return
@@ -139,7 +142,7 @@ class Pipe:
"cost_usd": cost_usd
}
# Send to reporting API
# Send to reporting API asynchronously
headers = {
"Authorization": f"Bearer {self.valves.REPORT_API_KEY}",
"Content-Type": "application/json"
@@ -147,17 +150,17 @@ class Pipe:
report_url = f"{self.valves.REPORT_API_URL.rstrip('/')}/api/record_api_call"
response = requests.post(
report_url,
headers=headers,
json=api_call_record,
timeout=30
)
if response.status_code == 200:
print(f"Successfully reported API call for user {user_email}")
else:
print(f"Failed to report API call: {response.status_code}")
async with aiohttp.ClientSession() as session:
async with session.post(
report_url,
headers=headers,
json=api_call_record,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
print(f"Successfully reported API call for user {user_email}")
else:
print(f"Failed to report API call: {response.status}")
info = f"input: {input_tokens} | output: {output_tokens} | cost: {cost_usd:.6f}"
await __event_emitter__(
@@ -223,7 +226,7 @@ class Pipe:
return self.stream_response(url, headers, payload, user_email, model_id, __event_emitter__)
else:
return await self.non_stream_response(url, headers, payload, user_email, model_id, __event_emitter__)
except requests.exceptions.RequestException as e:
except aiohttp.ClientError as e:
print(f"Request failed: {e}")
return f"Error: Request failed: {e}"
except Exception as e:
@@ -236,31 +239,38 @@ class Pipe:
print(
f"Sending non-streaming request to OpenRouter: {json.dumps(payload)[:200]}..."
)
response = requests.post(url, headers=headers, json=payload, timeout=90)
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=90)
) as response:
if response.status != 200:
error_message = f"HTTP Error {response.status}"
try:
error_data = await response.json()
print(f"Error response: {json.dumps(error_data)}")
if "error" in error_data:
if (
isinstance(error_data["error"], dict)
and "message" in error_data["error"]
):
error_message += f": {error_data['error']['message']}"
else:
error_message += f": {error_data['error']}"
except Exception as e:
print(f"Failed to parse error response: {e}")
error_text = await response.text()
error_message += f": {error_text[:500]}"
if response.status_code != 200:
error_message = f"HTTP Error {response.status_code}"
try:
error_data = response.json()
print(f"Error response: {json.dumps(error_data)}")
if "error" in error_data:
if (
isinstance(error_data["error"], dict)
and "message" in error_data["error"]
):
error_message += f": {error_data['error']['message']}"
else:
error_message += f": {error_data['error']}"
except Exception as e:
print(f"Failed to parse error response: {e}")
error_message += f": {response.text[:500]}"
# Log request payload for debugging
print(f"Request that caused error: {json.dumps(payload)}")
raise Exception(error_message)
# Log request payload for debugging
print(f"Request that caused error: {json.dumps(payload)}")
raise Exception(error_message)
res = response.json()
print(f"OpenRouter response keys: {list(res.keys())}")
res = await response.json()
print(f"OpenRouter response keys: {list(res.keys())}")
# Extract usage information directly from response for reporting
if user_email and model_id:
@@ -307,111 +317,120 @@ class Pipe:
async def stream_response(self, url, headers, payload, user_email, model_id, __event_emitter__: Callable[[Any], Awaitable[None]]):
"""Stream reasoning tokens in real-time with proper tag management"""
try:
response = requests.post(
url, headers=headers, json=payload, stream=True, timeout=90
)
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=90)
) as response:
if response.status != 200:
error_message = f"HTTP Error {response.status}"
try:
error_data = await response.json()
error_message += (
f": {error_data.get('error', {}).get('message', '')}"
)
except:
pass
raise Exception(error_message)
if response.status_code != 200:
error_message = f"HTTP Error {response.status_code}"
try:
error_data = response.json()
error_message += (
f": {error_data.get('error', {}).get('message', '')}"
)
except:
pass
raise Exception(error_message)
# State tracking
in_reasoning_state = False # True if we've output the opening <think> tag
latest_citations = [] # The latest citations list
done_received = False # Track if we've received [DONE]
usage_info = None # Store usage information for reporting
# State tracking
in_reasoning_state = False # True if we've output the opening <think> tag
latest_citations = [] # The latest citations list
done_received = False # Track if we've received [DONE]
usage_info = None # Store usage information for reporting
# Process the response stream asynchronously
async for line_bytes in response.content:
if not line_bytes:
continue
# Process the response stream
for line in response.iter_lines():
if not line:
continue
line_text = line.decode("utf-8")
if not line_text.startswith("data: "):
continue
elif line_text == "data: [DONE]":
done_received = True
# Handle citations at the end
if latest_citations:
citation_list = [f"1. {l}" for l in latest_citations]
citation_list_str = "\n".join(citation_list)
yield f"\n\n---\nCitations:\n{citation_list_str}"
continue
try:
chunk = json.loads(line_text[6:])
# Check for usage information (appears after [DONE])
if "usage" in chunk:
usage_info = chunk["usage"]
print(f"Extracted usage info: {usage_info}")
line_text = line_bytes.decode("utf-8").strip()
# Handle reporting with usage information
if user_email and model_id and usage_info:
# Handle multiple lines in a single chunk
for line in line_text.split('\n'):
if not line.strip():
continue
if not line.startswith("data: "):
continue
elif line == "data: [DONE]":
done_received = True
# Handle citations at the end
if latest_citations:
citation_list = [f"1. {l}" for l in latest_citations]
citation_list_str = "\n".join(citation_list)
yield f"\n\n---\nCitations:\n{citation_list_str}"
continue
try:
await self._report_api_call_direct(usage_info, user_email, model_id, __event_emitter__)
chunk = json.loads(line[6:])
# Check for usage information (appears after [DONE])
if "usage" in chunk:
usage_info = chunk["usage"]
print(f"Extracted usage info: {usage_info}")
# Handle reporting with usage information
if user_email and model_id and usage_info:
try:
await self._report_api_call_direct(usage_info, user_email, model_id, __event_emitter__)
except Exception as e:
print(f"Error reporting API call: {e}")
yield f"Error: {e}"
yield "" # trick to ensure proper completion
continue
if "choices" in chunk and chunk["choices"]:
choice = chunk["choices"][0]
citations = chunk.get("citations") or []
# Update the citation list
if citations:
latest_citations = citations
# Check for reasoning tokens
reasoning_text = None
if "delta" in choice and "reasoning" in choice["delta"]:
reasoning_text = choice["delta"]["reasoning"]
elif "message" in choice and "reasoning" in choice["message"]:
reasoning_text = choice["message"]["reasoning"]
# Check for content tokens
content_text = None
if "delta" in choice and "content" in choice["delta"]:
content_text = choice["delta"]["content"]
elif "message" in choice and "content" in choice["message"]:
content_text = choice["message"]["content"]
# Handle reasoning tokens
if reasoning_text:
# If first reasoning token, output opening tag
if not in_reasoning_state:
yield "<think>\n"
in_reasoning_state = True
# Output the reasoning token
yield _insert_citations(reasoning_text, citations)
# Handle content tokens
if content_text:
# If transitioning from reasoning to content, close the thinking tag
if in_reasoning_state:
yield "\n</think>\n\n"
in_reasoning_state = False
# Output the content
if content_text:
yield _insert_citations(content_text, citations)
except Exception as e:
print(f"Error reporting API call: {e}")
yield f"Error: {e}"
yield "" # trick to ensure proper completion
continue
print(f"Error processing chunk: {e}")
if "choices" in chunk and chunk["choices"]:
choice = chunk["choices"][0]
citations = chunk.get("citations") or []
# Update the citation list
if citations:
latest_citations = citations
# Check for reasoning tokens
reasoning_text = None
if "delta" in choice and "reasoning" in choice["delta"]:
reasoning_text = choice["delta"]["reasoning"]
elif "message" in choice and "reasoning" in choice["message"]:
reasoning_text = choice["message"]["reasoning"]
# Check for content tokens
content_text = None
if "delta" in choice and "content" in choice["delta"]:
content_text = choice["delta"]["content"]
elif "message" in choice and "content" in choice["message"]:
content_text = choice["message"]["content"]
# Handle reasoning tokens
if reasoning_text:
# If first reasoning token, output opening tag
if not in_reasoning_state:
yield "<think>\n"
in_reasoning_state = True
# Output the reasoning token
yield _insert_citations(reasoning_text, citations)
# Handle content tokens
if content_text:
# If transitioning from reasoning to content, close the thinking tag
if in_reasoning_state:
yield "\n</think>\n\n"
in_reasoning_state = False
# Output the content
if content_text:
yield _insert_citations(content_text, citations)
except Exception as e:
print(f"Error processing chunk: {e}")
# If we're still in reasoning state at the end, close the tag
if in_reasoning_state:
yield "\n</think>\n\n"
# If we're still in reasoning state at the end, close the tag
if in_reasoning_state:
yield "\n</think>\n\n"
except Exception as e:
print(f"Error in stream_response: {e}")