From d66b060cba7583fe7899b6c741add41c75e4509c Mon Sep 17 00:00:00 2001 From: ZhuangYumin Date: Thu, 26 Jun 2025 18:45:01 +0800 Subject: [PATCH] able to submit record --- openrouter_provider.py | 141 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 132 insertions(+), 9 deletions(-) diff --git a/openrouter_provider.py b/openrouter_provider.py index e4db927..a54368a 100644 --- a/openrouter_provider.py +++ b/openrouter_provider.py @@ -1,8 +1,8 @@ """ title: OpenRouter -version: 0.1.2 +version: 0.2.0 license: MIT -description: Adds support for OpenRouter, including citations and reasoning tokens +description: Adds support for OpenRouter, including citations, reasoning tokens, and API call reporting author: rburmorrison author_url: https://github.com/rburmorrison """ @@ -10,6 +10,7 @@ author_url: https://github.com/rburmorrison import re import requests import json +import time from typing import List, Union, Generator, Iterator from pydantic import BaseModel, Field @@ -60,6 +61,14 @@ class Pipe: MODEL_PREFIX: str = Field( default="", description="Optional prefix for model names in Open WebUI" ) + REPORT_API_URL: str = Field( + default="", + description="URL to report API", + ) + REPORT_API_KEY: str = Field( + default="", + description="API key to report API", + ) def __init__(self): self.type = "manifold" # Multiple models @@ -107,7 +116,82 @@ class Pipe: print(f"Error fetching models: {e}") return [{"id": "error", "name": f"Error: {str(e)}"}] - def pipe(self, body: dict) -> Union[str, Generator, Iterator]: + def _fetch_generation_details(self, generation_id: str) -> dict: + """Fetch generation details from OpenRouter API""" + try: + cnt = 0 + max_allowed = 10 + while cnt < max_allowed: + cnt += 1 + headers = {"Authorization": f"Bearer {self.valves.OPENROUTER_API_KEY}"} + response = requests.get( + "https://openrouter.ai/api/v1/generation", + headers=headers, + params={"id": generation_id}, + timeout=30 + ) + + if response.status_code == 200: + return response.json() + elif response.status_code == 404: + print(f"Generation not found, retrying... ({cnt}/{max_allowed})") + time.sleep(1) + continue + else: + print(f"Error fetching generation details: {response.status_code}") + return {} + except Exception as e: + print(f"Error fetching generation details: {e}") + return {} + + def _report_api_call(self, generation_data: dict, user_email: str, model_id: str): + """Report API call to upstream reporting service""" + if not self.valves.REPORT_API_URL or not self.valves.REPORT_API_KEY: + return + + try: + data = generation_data.get("data", {}) + + # Extract required fields for reporting + timestamp = int(time.time()) + input_tokens = data.get("tokens_prompt", 0) + output_tokens = data.get("tokens_completion", 0) + cost_usd = data.get("total_cost", 0.0) + + # Prepare API call record + api_call_record = { + "timestamp": timestamp, + "model_id": model_id, + "user_email": user_email, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "cost_usd": cost_usd + } + + # Send to reporting API + headers = { + "Authorization": f"Bearer {self.valves.REPORT_API_KEY}", + "Content-Type": "application/json" + } + + report_url = f"{self.valves.REPORT_API_URL.rstrip('/')}/api/record_api_call" + + response = requests.post( + report_url, + headers=headers, + json=api_call_record, + timeout=30 + ) + + if response.status_code == 200: + print(f"Successfully reported API call for user {user_email}") + else: + print(f"Failed to report API call: {response.status_code}") + + except Exception as e: + print(f"Error reporting API call: {e}") + + def pipe(self, body: dict, __user__: dict) -> Union[str, Generator, Iterator]: """Process the request and handle reasoning tokens if supported""" # Clone the body for OpenRouter payload = body.copy() @@ -115,6 +199,10 @@ class Pipe: # Print incoming body for debugging print(f"Original request body: {json.dumps(body)[:500]}...") + # Extract user email and model ID for reporting + user_email = __user__.get("email", "") if __user__ else "" + model_id = "test_model" + # Make sure the model ID is properly extracted from the pipe format if "model" in payload and payload["model"] and "." in payload["model"]: # Extract the model ID from the format like "openrouter.model-id" @@ -148,9 +236,9 @@ class Pipe: try: if body.get("stream", False): - return self.stream_response(url, headers, payload) + return self.stream_response(url, headers, payload, user_email, model_id) else: - return self.non_stream_response(url, headers, payload) + return self.non_stream_response(url, headers, payload, user_email, model_id) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") return f"Error: Request failed: {e}" @@ -158,7 +246,7 @@ class Pipe: print(f"Error in pipe method: {e}") return f"Error: {e}" - def non_stream_response(self, url, headers, payload): + def non_stream_response(self, url, headers, payload, user_email, model_id): """Handle non-streaming responses and wrap reasoning in tags if present""" try: print( @@ -190,6 +278,20 @@ class Pipe: res = response.json() print(f"OpenRouter response keys: {list(res.keys())}") + # Extract generation ID for reporting + generation_id = res.get("id", "") + if generation_id and user_email and model_id: + try: + generation_data = self._fetch_generation_details(generation_id) + if generation_data: + self._report_api_call(generation_data, user_email, model_id) + except Exception as e: + print(f"Error reporting API call: {e}") + return f"Error: {e}" + else: + print(f"No generation ID found for reporting") + return f"Error: No generation ID found for reporting" + # Check if we have choices in the response if not res.get("choices") or len(res["choices"]) == 0: return "" @@ -204,8 +306,8 @@ class Pipe: content = message.get("content", "") reasoning = message.get("reasoning", "") - print(f"Found reasoning: {bool(reasoning)} ({len(reasoning)} chars)") - print(f"Found content: {bool(content)} ({len(content)} chars)") + print(f"Found reasoning: {bool(reasoning)} ({len(reasoning) if reasoning is not None else 0} chars)") + print(f"Found content: {bool(content)} ({len(content) if content is not None else 0} chars)") # If we have both reasoning and content if reasoning and content: @@ -219,7 +321,7 @@ class Pipe: print(f"Error in non_stream_response: {e}") return f"Error: {e}" - def stream_response(self, url, headers, payload): + def stream_response(self, url, headers, payload, user_email, model_id): """Stream reasoning tokens in real-time with proper tag management""" try: response = requests.post( @@ -240,6 +342,7 @@ class Pipe: # State tracking in_reasoning_state = False # True if we've output the opening tag latest_citations = [] # The latest citations list + generation_id = "" # Track generation ID for reporting # Process the response stream for line in response.iter_lines(): @@ -250,15 +353,35 @@ class Pipe: if not line_text.startswith("data: "): continue elif line_text == "data: [DONE]": + # Handle citations at the end if latest_citations: citation_list = [f"1. {l}" for l in latest_citations] citation_list_str = "\n".join(citation_list) yield f"\n\n---\nCitations:\n{citation_list_str}" + + # Handle reporting at the end + if generation_id and user_email and model_id: + try: + generation_data = self._fetch_generation_details(generation_id) + if generation_data: + self._report_api_call(generation_data, user_email, model_id) + except Exception as e: + print(f"Error reporting API call: {e}") + return f"Error: {e}" + yield "" ## trick + else: + print(f"No generation ID found for reporting") + return f"Error: No generation ID found for reporting" continue try: chunk = json.loads(line_text[6:]) + # Extract generation ID from any chunk that has it + if not generation_id and "id" in chunk: + generation_id = chunk["id"] + print(f"Extracted generation ID for reporting: {generation_id}") + if "choices" in chunk and chunk["choices"]: choice = chunk["choices"][0] citations = chunk.get("citations") or []