diff --git a/newapi_provider.py b/newapi_provider.py index a1df9a7..0262fab 100644 --- a/newapi_provider.py +++ b/newapi_provider.py @@ -8,7 +8,7 @@ author_url: https://zhuangyumin.dev """ import re -import requests +import aiohttp import json import time import tiktoken @@ -150,26 +150,29 @@ class Pipe: "cost": 0.0 } - def pipes(self) -> List[dict]: - """Fetch available models from OpenRouter API""" + async def pipes(self) -> List[dict]: + """Fetch available models from NewAPI asynchronously""" if not self.valves.NEWAPI_API_KEY: return [{"id": "error", "name": "API Key not provided"}] try: headers = {"Authorization": f"Bearer {self.valves.NEWAPI_API_KEY}"} - response = requests.get( - f"{self.valves.NEWAPI_BASE_URL}/models", headers=headers - ) + + async with aiohttp.ClientSession() as session: + async with session.get( + f"{self.valves.NEWAPI_BASE_URL}/models", + headers=headers, + timeout=aiohttp.ClientTimeout(total=30) + ) as response: + if response.status != 200: + return [ + { + "id": "error", + "name": f"Error fetching models: {response.status}", + } + ] - if response.status_code != 200: - return [ - { - "id": "error", - "name": f"Error fetching models: {response.status_code}", - } - ] - - models_data = response.json() + models_data = await response.json() # Extract model information models = [] @@ -193,7 +196,7 @@ class Pipe: return [{"id": "error", "name": f"Error: {str(e)}"}] async def _report_api_call_direct(self, usage_info: dict, user_email: str, model_id: str, __event_emitter__: Callable[[Any], Awaitable[None]]): - """Report API call to upstream reporting service using direct usage information""" + """Report API call to upstream reporting service using direct usage information asynchronously""" if not self.valves.REPORT_API_URL or not self.valves.REPORT_API_KEY: return @@ -214,7 +217,7 @@ class Pipe: "cost_usd": cost_usd } - # Send to reporting API + # Send to reporting API asynchronously headers = { "Authorization": f"Bearer {self.valves.REPORT_API_KEY}", "Content-Type": "application/json" @@ -222,17 +225,17 @@ class Pipe: report_url = f"{self.valves.REPORT_API_URL.rstrip('/')}/api/record_api_call" - response = requests.post( - report_url, - headers=headers, - json=api_call_record, - timeout=30 - ) - - if response.status_code == 200: - print(f"Successfully reported API call for user {user_email}") - else: - print(f"Failed to report API call: {response.status_code}") + async with aiohttp.ClientSession() as session: + async with session.post( + report_url, + headers=headers, + json=api_call_record, + timeout=aiohttp.ClientTimeout(total=30) + ) as response: + if response.status == 200: + print(f"Successfully reported API call for user {user_email}") + else: + print(f"Failed to report API call: {response.status}") info = f"input: {input_tokens} | output: {output_tokens} | cost: {cost_usd:.6f}" await __event_emitter__( @@ -300,7 +303,7 @@ class Pipe: return self.stream_response(url, headers, payload, user_email, model_id, __event_emitter__, model_name) else: return await self.non_stream_response(url, headers, payload, user_email, model_id, __event_emitter__, model_name) - except requests.exceptions.RequestException as e: + except aiohttp.ClientError as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" except Exception as e: @@ -311,33 +314,40 @@ class Pipe: """Handle non-streaming responses and wrap reasoning in tags if present""" try: print( - f"Sending non-streaming request to OpenRouter: {json.dumps(payload)[:200]}..." + f"Sending non-streaming request to NewAPI: {json.dumps(payload)[:200]}..." ) - response = requests.post(url, headers=headers, json=payload, timeout=90) + + async with aiohttp.ClientSession() as session: + async with session.post( + url, + headers=headers, + json=payload, + timeout=aiohttp.ClientTimeout(total=90) + ) as response: + if response.status != 200: + error_message = f"HTTP Error {response.status}" + try: + error_data = await response.json() + print(f"Error response: {json.dumps(error_data)}") + if "error" in error_data: + if ( + isinstance(error_data["error"], dict) + and "message" in error_data["error"] + ): + error_message += f": {error_data['error']['message']}" + else: + error_message += f": {error_data['error']}" + except Exception as e: + print(f"Failed to parse error response: {e}") + error_text = await response.text() + error_message += f": {error_text[:500]}" - if response.status_code != 200: - error_message = f"HTTP Error {response.status_code}" - try: - error_data = response.json() - print(f"Error response: {json.dumps(error_data)}") - if "error" in error_data: - if ( - isinstance(error_data["error"], dict) - and "message" in error_data["error"] - ): - error_message += f": {error_data['error']['message']}" - else: - error_message += f": {error_data['error']}" - except Exception as e: - print(f"Failed to parse error response: {e}") - error_message += f": {response.text[:500]}" + # Log request payload for debugging + print(f"Request that caused error: {json.dumps(payload)}") + raise Exception(error_message) - # Log request payload for debugging - print(f"Request that caused error: {json.dumps(payload)}") - raise Exception(error_message) - - res = response.json() - print(f"OpenRouter response keys: {list(res.keys())}") + res = await response.json() + print(f"NewAPI response keys: {list(res.keys())}") # Check if we have choices in the response if not res.get("choices") or len(res["choices"]) == 0: @@ -384,122 +394,131 @@ class Pipe: async def stream_response(self, url, headers, payload, user_email, model_id, __event_emitter__: Callable[[Any], Awaitable[None]], model_name: str): """Stream reasoning tokens in real-time with proper tag management""" try: - response = requests.post( - url, headers=headers, json=payload, stream=True, timeout=90 - ) - - if response.status_code != 200: - error_message = f"HTTP Error {response.status_code}" - try: - error_data = response.json() - error_message += ( - f": {error_data.get('error', {}).get('message', '')}" - ) - except: - pass - raise Exception(error_message) - - # State tracking - in_reasoning_state = False # True if we've output the opening tag - latest_citations = [] # The latest citations list - accumulated_content = "" # Accumulate all content for token calculation - accumulated_reasoning = "" # Accumulate all reasoning for token calculation - - # Process the response stream - for line in response.iter_lines(): - if not line: - continue - - line_text = line.decode("utf-8") - if not line_text.startswith("data: "): - continue - elif line_text == "data: [DONE]": - # Handle citations at the end - if latest_citations: - citation_list = [f"1. {l}" for l in latest_citations] - citation_list_str = "\n".join(citation_list) - yield f"\n\n---\nCitations:\n{citation_list_str}" - - # Calculate usage information using tiktoken and report - if user_email and model_id: - messages = payload.get("messages", []) - final_response = "" - if accumulated_reasoning and accumulated_content: - final_response = f"\n{accumulated_reasoning}\n\n\n{accumulated_content}" - elif accumulated_reasoning: - final_response = f"\n{accumulated_reasoning}\n\n\n" - elif accumulated_content: - final_response = accumulated_content - - usage_info = self._calculate_tokens_and_cost(messages, final_response, model_name) - + async with aiohttp.ClientSession() as session: + async with session.post( + url, + headers=headers, + json=payload, + timeout=aiohttp.ClientTimeout(total=90) + ) as response: + if response.status != 200: + error_message = f"HTTP Error {response.status}" try: - await self._report_api_call_direct(usage_info, user_email, model_id, __event_emitter__) - except Exception as e: - print(f"Error reporting API call: {e}") - yield f"Error: {e}" - - # Stop processing after [DONE] - break + error_data = await response.json() + error_message += ( + f": {error_data.get('error', {}).get('message', '')}" + ) + except: + pass + raise Exception(error_message) - try: - chunk = json.loads(line_text[6:]) + # State tracking + in_reasoning_state = False # True if we've output the opening tag + latest_citations = [] # The latest citations list + accumulated_content = "" # Accumulate all content for token calculation + accumulated_reasoning = "" # Accumulate all reasoning for token calculation - if "choices" in chunk and chunk["choices"]: - choice = chunk["choices"][0] - citations = chunk.get("citations") or [] + # Process the response stream asynchronously + async for line_bytes in response.content: + if not line_bytes: + continue - # Update the citation list - if citations: - latest_citations = citations + line_text = line_bytes.decode("utf-8").strip() + + # Handle multiple lines in a single chunk + for line in line_text.split('\n'): + if not line.strip(): + continue + + if not line.startswith("data: "): + continue + elif line == "data: [DONE]": + # Handle citations at the end + if latest_citations: + citation_list = [f"1. {l}" for l in latest_citations] + citation_list_str = "\n".join(citation_list) + yield f"\n\n---\nCitations:\n{citation_list_str}" + + # Calculate usage information using tiktoken and report + if user_email and model_id: + messages = payload.get("messages", []) + final_response = "" + if accumulated_reasoning and accumulated_content: + final_response = f"\n{accumulated_reasoning}\n\n\n{accumulated_content}" + elif accumulated_reasoning: + final_response = f"\n{accumulated_reasoning}\n\n\n" + elif accumulated_content: + final_response = accumulated_content + + usage_info = self._calculate_tokens_and_cost(messages, final_response, model_name) + + try: + await self._report_api_call_direct(usage_info, user_email, model_id, __event_emitter__) + except Exception as e: + print(f"Error reporting API call: {e}") + yield f"Error: {e}" + + # Stop processing after [DONE] + break - # Check for reasoning tokens - reasoning_text = None - if "delta" in choice and "reasoning" in choice["delta"]: - reasoning_text = choice["delta"]["reasoning"] - elif "message" in choice and "reasoning" in choice["message"]: - reasoning_text = choice["message"]["reasoning"] + try: + chunk = json.loads(line[6:]) - # Check for content tokens - content_text = None - if "delta" in choice and "content" in choice["delta"]: - content_text = choice["delta"]["content"] - elif "message" in choice and "content" in choice["message"]: - content_text = choice["message"]["content"] + if "choices" in chunk and chunk["choices"]: + choice = chunk["choices"][0] + citations = chunk.get("citations") or [] - # Handle reasoning tokens - if reasoning_text: - # Accumulate reasoning for token calculation - accumulated_reasoning += reasoning_text - - # If first reasoning token, output opening tag - if not in_reasoning_state: - yield "\n" - in_reasoning_state = True + # Update the citation list + if citations: + latest_citations = citations - # Output the reasoning token - yield _insert_citations(reasoning_text, citations) + # Check for reasoning tokens + reasoning_text = None + if "delta" in choice and "reasoning" in choice["delta"]: + reasoning_text = choice["delta"]["reasoning"] + elif "message" in choice and "reasoning" in choice["message"]: + reasoning_text = choice["message"]["reasoning"] - # Handle content tokens - if content_text: - # Accumulate content for token calculation - accumulated_content += content_text - - # If transitioning from reasoning to content, close the thinking tag - if in_reasoning_state: - yield "\n\n\n" - in_reasoning_state = False + # Check for content tokens + content_text = None + if "delta" in choice and "content" in choice["delta"]: + content_text = choice["delta"]["content"] + elif "message" in choice and "content" in choice["message"]: + content_text = choice["message"]["content"] - # Output the content - if content_text: - yield _insert_citations(content_text, citations) + # Handle reasoning tokens + if reasoning_text: + # Accumulate reasoning for token calculation + accumulated_reasoning += reasoning_text + + # If first reasoning token, output opening tag + if not in_reasoning_state: + yield "\n" + in_reasoning_state = True - except Exception as e: - print(f"Error processing chunk: {e}") + # Output the reasoning token + yield _insert_citations(reasoning_text, citations) - # If we're still in reasoning state at the end, close the tag - if in_reasoning_state: - yield "\n\n\n" + # Handle content tokens + if content_text: + # Accumulate content for token calculation + accumulated_content += content_text + + # If transitioning from reasoning to content, close the thinking tag + if in_reasoning_state: + yield "\n\n\n" + in_reasoning_state = False + + # Output the content + if content_text: + yield _insert_citations(content_text, citations) + + except Exception as e: + print(f"Error processing chunk: {e}") + + # If we're still in reasoning state at the end, close the tag + if in_reasoning_state: + yield "\n\n\n" except Exception as e: print(f"Error in stream_response: {e}")