From 9e84b71f597532c2bb1cfb3fd2c9abc625385360 Mon Sep 17 00:00:00 2001 From: Yixing Lao Date: Wed, 29 Apr 2026 23:36:59 +0800 Subject: [PATCH] refactor(server): consolidate request logging into stages (#29) --- src/deepseek_cursor_proxy/server.py | 287 ++++++++++++++++------------ tests/test_proxy_end_to_end.py | 31 ++- tests/test_server.py | 6 +- 3 files changed, 189 insertions(+), 135 deletions(-) diff --git a/src/deepseek_cursor_proxy/server.py b/src/deepseek_cursor_proxy/server.py index 39be7ee..3120ba2 100644 --- a/src/deepseek_cursor_proxy/server.py +++ b/src/deepseek_cursor_proxy/server.py @@ -1,7 +1,7 @@ from __future__ import annotations import argparse -from dataclasses import replace +from dataclasses import dataclass, replace import gzip from http.client import HTTPException import json @@ -26,6 +26,7 @@ from .streaming import CursorReasoningDisplayAdapter, StreamAccumulator from .trace import TraceRequest, TraceWriter from .tunnel import NgrokTunnel, local_tunnel_target from .transform import ( + PreparedRequest, RECOVERY_NOTICE_CONTENT, prepare_upstream_request, rewrite_response_body, @@ -39,6 +40,12 @@ class RequestBodyTooLarge(ValueError): pass +@dataclass +class ProxyResponseResult: + sent: bool + usage: dict[str, Any] | None = None + + class DeepSeekProxyServer(ThreadingHTTPServer): config: ProxyConfig reasoning_store: ReasoningStore @@ -61,7 +68,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): return getattr(self.server, "trace_writer", None) def log_message(self, fmt: str, *args: Any) -> None: - LOG.info("%s - %s", self.address_string(), fmt % args) + return def do_OPTIONS(self) -> None: request_path = urlparse(self.path).path @@ -143,7 +150,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): if self.config.verbose: log_json("cursor request body", payload) - LOG.info("cursor request: %s", summarize_chat_payload(payload)) + log_cursor_request(payload, self.config) prepared = prepare_upstream_request( payload, @@ -153,22 +160,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): ) if trace is not None: trace.record_transform(prepared) - if prepared.patched_reasoning_messages: - LOG.info( - "restored reasoning_content on %s assistant message(s)", - prepared.patched_reasoning_messages, - ) - if prepared.recovered_reasoning_messages: - if prepared.recovery_notice: - LOG.warning("refreshed reasoning_content history") - else: - LOG.info( - ( - "continued recovered request; omitted %s old message(s) " - "before the prior recovery boundary" - ), - prepared.recovery_dropped_messages, - ) + log_context_summary(prepared) if prepared.missing_reasoning_messages: LOG.warning( ( @@ -203,13 +195,6 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): self._finish_trace(trace, "rejected", http_status=409) return - LOG.info( - "deepseek send: %s patched=%s recovered=%s", - compact_request_stats(prepared.payload), - prepared.patched_reasoning_messages, - prepared.recovered_reasoning_messages, - ) - if self.config.verbose: LOG.info( ( @@ -247,6 +232,8 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): headers=upstream_headers, ) + log_send_summary(prepared) + try: if self.config.verbose: LOG.info("forwarding to %s", upstream_url) @@ -313,7 +300,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): record_response_messages=prepared.record_response_messages, record_response_contexts=prepared.record_response_contexts, ) - if not sent_response: + if not sent_response.sent: self._finish_trace( trace, "client_disconnected", @@ -321,18 +308,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): stream=bool(prepared.payload.get("stream")), ) return - LOG.info( - ( - "request complete status=%s stream=%s elapsed_ms=%s " - "patched_reasoning=%s missing_reasoning=%s recovered_reasoning=%s" - ), - upstream_status, - bool(prepared.payload.get("stream")), - elapsed_ms(started), - prepared.patched_reasoning_messages, - prepared.missing_reasoning_messages, - prepared.recovered_reasoning_messages, - ) + log_stats_summary(sent_response.usage) self._finish_trace( trace, "completed", @@ -549,9 +525,10 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): record_response_scope: str | None = None, record_response_messages: list[dict[str, Any]] | None = None, record_response_contexts: list[tuple[str, list[dict[str, Any]]]] | None = None, - ) -> bool: + ) -> ProxyResponseResult: body = read_response_body(response) upstream_body = body + usage = usage_from_body(upstream_body) try: body = rewrite_response_body( body, @@ -566,7 +543,6 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): ) except (json.JSONDecodeError, UnicodeDecodeError) as exc: LOG.warning("failed to rewrite upstream JSON response: %s", exc) - log_usage_from_body(body) if self.config.verbose: log_bytes("cursor response body", body) @@ -603,8 +579,9 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): "sending upstream response headers", ) if not sent_headers: - return False - return self._write_to_client(body, "sending upstream response body") + return ProxyResponseResult(False, usage) + sent = self._write_to_client(body, "sending upstream response body") + return ProxyResponseResult(sent, usage) def _proxy_streaming_response( self, @@ -617,7 +594,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): record_response_scope: str | None = None, record_response_messages: list[dict[str, Any]] | None = None, record_response_contexts: list[tuple[str, list[dict[str, Any]]]] | None = None, - ) -> bool: + ) -> ProxyResponseResult: if trace is not None: trace.record_upstream_response( status=getattr(response, "status", 200), @@ -642,10 +619,11 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): "sending streaming response headers", ) if not sent_headers: - return False + return ProxyResponseResult(False) self.close_connection = True accumulator = StreamAccumulator() + usage: dict[str, Any] | None = None display_adapter = ( CursorReasoningDisplayAdapter() if self.config.cursor_display_reasoning @@ -673,10 +651,15 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): line = response.readline() except (HTTPException, OSError) as exc: LOG.warning("upstream streaming response read failed: %s", exc) - return False + return ProxyResponseResult(False, usage) if not line: break - rewritten, finalized, pending_recovery_notice = self._rewrite_sse_line( + ( + rewritten, + finalized, + pending_recovery_notice, + chunk_usage, + ) = self._rewrite_sse_line( line, original_model, accumulator, @@ -686,12 +669,14 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): pending_recovery_notice, trace, ) + if chunk_usage is not None: + usage = chunk_usage if trace is not None: trace.record_stream_chunk(line, rewritten) if not self._write_to_client( rewritten, "sending streaming response chunk", flush=True ): - return False + return ProxyResponseResult(False, usage) if finalized: break @@ -707,9 +692,9 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): ) for scope, prior_messages in response_contexts ) - if stored: + if self.config.verbose and stored: LOG.info("stored %s streaming reasoning cache key(s)", stored) - return True + return ProxyResponseResult(True, usage) def _rewrite_sse_line( self, @@ -721,10 +706,10 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): display_adapter: CursorReasoningDisplayAdapter | None, recovery_notice: str | None = None, trace: TraceRequest | None = None, - ) -> tuple[bytes, bool, str | None]: + ) -> tuple[bytes, bool, str | None, dict[str, Any] | None]: stripped = line.strip() if not stripped.startswith(b"data:"): - return line, False, recovery_notice + return line, False, recovery_notice, None data = stripped[len(b"data:") :].strip() if data == b"[DONE]": @@ -739,7 +724,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): ) for scope, prior_messages in response_contexts ) - if stored: + if self.config.verbose and stored: LOG.info("stored %s streaming reasoning cache key(s)", stored) prefix = b"" if display_adapter is None: @@ -747,7 +732,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): prefix += sse_data( recovery_notice_chunk(original_model, recovery_notice) ) - return prefix + b"data: [DONE]\n\n", True, None + return prefix + b"data: [DONE]\n\n", True, None, None closing_chunk = display_adapter.flush_chunk(original_model) if closing_chunk is not None: prefix += sse_data(closing_chunk) @@ -755,12 +740,12 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): prefix += sse_data( recovery_notice_chunk(original_model, recovery_notice) ) - return prefix + b"data: [DONE]\n\n", True, None + return prefix + b"data: [DONE]\n\n", True, None, None try: chunk = json.loads(data.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError): - return line, False, recovery_notice + return line, False, recovery_notice, None if isinstance(chunk, dict): if recovery_notice and inject_recovery_notice(chunk, recovery_notice): @@ -775,11 +760,11 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): ) for scope, prior_messages in response_contexts ) - if stored: + if self.config.verbose and stored: LOG.info("stored %s streaming reasoning cache key(s)", stored) + chunk_usage = chunk.get("usage") if trace is not None: - trace.record_usage(chunk.get("usage")) - log_usage(chunk.get("usage")) + trace.record_usage(chunk_usage) if display_adapter is not None: display_adapter.rewrite_chunk(chunk) if "model" in chunk: @@ -795,8 +780,9 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): ), False, recovery_notice, + chunk_usage if isinstance(chunk_usage, dict) else None, ) - return line, False, recovery_notice + return line, False, recovery_notice, None def build_arg_parser() -> argparse.ArgumentParser: @@ -852,7 +838,7 @@ def build_arg_parser() -> argparse.ArgumentParser: "--verbose", action=argparse.BooleanOptionalAction, default=None, - help="Log detailed request lifecycle metadata and full payloads", + help="Log detailed request metadata and full payloads", ) parser.add_argument( "--trace-dir", @@ -928,92 +914,141 @@ def log_bytes(label: str, body: bytes) -> None: log_json(label, payload) -def log_usage_from_body(body: bytes) -> None: +def usage_from_body(body: bytes) -> dict[str, Any] | None: try: payload = json.loads(body.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError): - return + return None if isinstance(payload, dict): - log_usage(payload.get("usage")) + usage = payload.get("usage") + if isinstance(usage, dict): + return usage + return None -def log_usage(usage: Any) -> None: - if not isinstance(usage, dict): - return - summary = compact_usage_stats(usage) - if summary is None: - return - LOG.info("deepseek usage: %s", summary) +def log_cursor_request( + payload: dict[str, Any], + config: ProxyConfig, +) -> None: + model = str(payload.get("model") or config.upstream_model) + LOG.info( + "┌ cursor model=%s messages=%s tools=%s", + model, + format_count(message_count(payload)), + format_count(tool_count(payload)), + ) -def compact_request_stats(payload: dict[str, Any]) -> str: +def log_context_summary(prepared: PreparedRequest) -> None: + LOG.info( + "├ context filled=%s missing=%s recovered=%s dropped=%s status=%s", + format_count(prepared.patched_reasoning_messages), + format_count(prepared.missing_reasoning_messages), + format_count(prepared.recovered_reasoning_messages), + format_count(prepared.recovery_dropped_messages), + context_status(prepared), + ) + + +def log_send_summary(prepared: PreparedRequest) -> None: + LOG.info( + "├ send user_msgs=%s messages=%s tools=%s reasoning_content=%s", + format_count(user_message_count(prepared.payload)), + format_count(message_count(prepared.payload)), + format_count(tool_count(prepared.payload)), + format_count(reasoning_content_count(prepared.payload)), + ) + + +def log_stats_summary(usage: dict[str, Any] | None) -> None: + LOG.info( + "└ stats prompt=%s output=%s reasoning=%s cache_hit=%s", + format_usage_count(usage, "prompt_tokens"), + format_usage_count(usage, "completion_tokens"), + format_count(reasoning_token_count(usage)), + cache_hit_rate(usage), + ) + + +def context_status(prepared: PreparedRequest) -> str: + if prepared.recovered_reasoning_messages: + return "recovered" + if prepared.missing_reasoning_messages: + return "missing" + return "ok" + + +def message_count(payload: dict[str, Any]) -> int: + messages = payload.get("messages") + return len(messages) if isinstance(messages, list) else 0 + + +def tool_count(payload: dict[str, Any]) -> int: + tools = payload.get("tools") + return len(tools) if isinstance(tools, list) else 0 + + +def user_message_count(payload: dict[str, Any]) -> int: messages = payload.get("messages") if not isinstance(messages, list): - messages = [] - tools = payload.get("tools") - reasoning_count = 0 - reasoning_chars = 0 - for message in messages: - if not isinstance(message, dict) or message.get("role") != "assistant": - continue - reasoning = message.get("reasoning_content") - if isinstance(reasoning, str): - reasoning_count += 1 - reasoning_chars += len(reasoning) - rounds = sum( + return 0 + return sum( 1 for message in messages if isinstance(message, dict) and message.get("role") == "user" ) - return ( - f"model={payload.get('model')} stream={int(bool(payload.get('stream')))} " - f"rounds={rounds} msgs={len(messages)} " - f"tools={len(tools) if isinstance(tools, list) else 0} " - f"reasoning={reasoning_count}/{reasoning_chars}ch" + + +def reasoning_content_count(payload: dict[str, Any]) -> int: + messages = payload.get("messages") + if not isinstance(messages, list): + return 0 + return sum( + 1 + for message in messages + if isinstance(message, dict) + and message.get("role") == "assistant" + and isinstance(message.get("reasoning_content"), str) ) -def compact_usage_stats(usage: dict[str, Any]) -> str | None: - prompt_tokens = usage.get("prompt_tokens") - completion_tokens = usage.get("completion_tokens") - total_tokens = usage.get("total_tokens") +def format_usage_count(usage: dict[str, Any] | None, key: str) -> str: + if not isinstance(usage, dict): + return "?" + return format_count(usage.get(key)) + + +def reasoning_token_count(usage: dict[str, Any] | None) -> Any: + if not isinstance(usage, dict): + return None + details = usage.get("completion_tokens_details") + if not isinstance(details, dict): + return None + return details.get("reasoning_tokens") + + +def cache_hit_rate(usage: dict[str, Any] | None) -> str: + if not isinstance(usage, dict): + return "?" hit_tokens = usage.get("prompt_cache_hit_tokens") miss_tokens = usage.get("prompt_cache_miss_tokens") - details = usage.get("completion_tokens_details") - reasoning_tokens = None - if isinstance(details, dict): - reasoning_tokens = details.get("reasoning_tokens") + if hit_tokens is None and miss_tokens is None: + return "?" + hit = int_or_zero(hit_tokens) + miss = int_or_zero(miss_tokens) + total = hit + miss + if not total: + return "?" + return f"{hit / total:.1%}" - if all( - value is None - for value in ( - prompt_tokens, - completion_tokens, - total_tokens, - hit_tokens, - miss_tokens, - reasoning_tokens, - ) - ): - return None - cache_summary = "cache=?" - if hit_tokens is not None or miss_tokens is not None: - hit = int_or_zero(hit_tokens) - miss = int_or_zero(miss_tokens) - cache_total = hit + miss - if cache_total: - cache_summary = f"cache={hit}/{miss} hit={hit / cache_total:.1%}" - else: - cache_summary = f"cache={hit}/{miss}" - - return ( - f"prompt={prompt_tokens if prompt_tokens is not None else '?'} " - f"completion={completion_tokens if completion_tokens is not None else '?'} " - f"total={total_tokens if total_tokens is not None else '?'} " - f"{cache_summary} " - f"reasoning={reasoning_tokens if reasoning_tokens is not None else '?'}" - ) +def format_count(value: Any) -> str: + if value is None: + return "?" + try: + return f"{int(value):,}" + except (TypeError, ValueError): + return str(value) def int_or_zero(value: Any) -> int: diff --git a/tests/test_proxy_end_to_end.py b/tests/test_proxy_end_to_end.py index b7e2da4..9e5c61c 100644 --- a/tests/test_proxy_end_to_end.py +++ b/tests/test_proxy_end_to_end.py @@ -585,17 +585,33 @@ class ProxyEndToEndTests(unittest.TestCase): ) output = "\n".join(captured.output) + stage_records = [ + record + for record in captured.output + if any( + marker in record + for marker in ("┌ cursor", "├ context", "├ send", "└ stats") + ) + ] self.assertEqual(status, 200) - self.assertIn("cursor request: model='deepseek-v4-pro'", output) + self.assertEqual(len(stage_records), 4) + self.assertTrue(all("\n" not in record for record in stage_records)) self.assertIn( - "deepseek send: model=deepseek-v4-pro stream=0 rounds=1 msgs=1 tools=1 reasoning=0/0ch", + "┌ cursor model=deepseek-v4-pro messages=1 tools=1", output, ) self.assertIn( - "deepseek usage: prompt=20 completion=5 total=25 cache=12/8 hit=60.0% reasoning=3", + "├ context filled=0 missing=0 recovered=0 dropped=0 status=ok", + output, + ) + self.assertIn( + "├ send user_msgs=1 messages=1 tools=1 reasoning_content=0", + output, + ) + self.assertIn( + "└ stats prompt=20 output=5 reasoning=3 cache_hit=60.0%", output, ) - self.assertIn("request complete status=200", output) self.assertNotIn("What is tomorrow's date?", output) self.assertNotIn("sk-from-cursor", output) @@ -710,7 +726,7 @@ class ProxyEndToEndTests(unittest.TestCase): self.assertEqual(FakeDeepSeekHandler.requests, []) def test_proxy_recovers_uncached_cursor_tool_history(self) -> None: - with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: + with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: status, payload = post_json( f"{self.proxy.url}/v1/chat/completions", third_cursor_request_missing_all_reasoning(), @@ -738,9 +754,12 @@ class ProxyEndToEndTests(unittest.TestCase): {"role": "user", "content": "Thanks, now continue."}, ) self.assertIn( - "refreshed reasoning_content history", + "status=recovered", "\n".join(captured.output), ) + self.assertFalse( + any(record.startswith("WARNING:") for record in captured.output) + ) def test_trace_captures_recovery_diagnostics(self) -> None: with TemporaryDirectory() as temp_dir: diff --git a/tests/test_server.py b/tests/test_server.py index 8bcc6a3..577ca41 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -149,7 +149,7 @@ class ServerTests(unittest.TestCase): finally: handler.server.reasoning_store.close() - self.assertFalse(sent) + self.assertFalse(sent.sent) self.assertIn("sending upstream response body", "\n".join(captured.output)) def test_streaming_response_stops_on_client_disconnect(self) -> None: @@ -182,7 +182,7 @@ class ServerTests(unittest.TestCase): finally: handler.server.reasoning_store.close() - self.assertFalse(sent) + self.assertFalse(sent.sent) self.assertEqual(response.readline_calls, 1) self.assertIn("sending streaming response chunk", "\n".join(captured.output)) @@ -200,7 +200,7 @@ class ServerTests(unittest.TestCase): finally: handler.server.reasoning_store.close() - self.assertFalse(sent) + self.assertFalse(sent.sent) self.assertIn( "upstream streaming response read failed", "\n".join(captured.output),