refactor(server): consolidate request logging into stages (#29)

main
Yixing Lao 2026-04-29 23:36:59 +08:00 committed by GitHub
parent 5f14da32c6
commit 9e84b71f59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 189 additions and 135 deletions

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import argparse
from dataclasses import replace
from dataclasses import dataclass, replace
import gzip
from http.client import HTTPException
import json
@ -26,6 +26,7 @@ from .streaming import CursorReasoningDisplayAdapter, StreamAccumulator
from .trace import TraceRequest, TraceWriter
from .tunnel import NgrokTunnel, local_tunnel_target
from .transform import (
PreparedRequest,
RECOVERY_NOTICE_CONTENT,
prepare_upstream_request,
rewrite_response_body,
@ -39,6 +40,12 @@ class RequestBodyTooLarge(ValueError):
pass
@dataclass
class ProxyResponseResult:
sent: bool
usage: dict[str, Any] | None = None
class DeepSeekProxyServer(ThreadingHTTPServer):
config: ProxyConfig
reasoning_store: ReasoningStore
@ -61,7 +68,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
return getattr(self.server, "trace_writer", None)
def log_message(self, fmt: str, *args: Any) -> None:
LOG.info("%s - %s", self.address_string(), fmt % args)
return
def do_OPTIONS(self) -> None:
request_path = urlparse(self.path).path
@ -143,7 +150,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
if self.config.verbose:
log_json("cursor request body", payload)
LOG.info("cursor request: %s", summarize_chat_payload(payload))
log_cursor_request(payload, self.config)
prepared = prepare_upstream_request(
payload,
@ -153,22 +160,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
)
if trace is not None:
trace.record_transform(prepared)
if prepared.patched_reasoning_messages:
LOG.info(
"restored reasoning_content on %s assistant message(s)",
prepared.patched_reasoning_messages,
)
if prepared.recovered_reasoning_messages:
if prepared.recovery_notice:
LOG.warning("refreshed reasoning_content history")
else:
LOG.info(
(
"continued recovered request; omitted %s old message(s) "
"before the prior recovery boundary"
),
prepared.recovery_dropped_messages,
)
log_context_summary(prepared)
if prepared.missing_reasoning_messages:
LOG.warning(
(
@ -203,13 +195,6 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
self._finish_trace(trace, "rejected", http_status=409)
return
LOG.info(
"deepseek send: %s patched=%s recovered=%s",
compact_request_stats(prepared.payload),
prepared.patched_reasoning_messages,
prepared.recovered_reasoning_messages,
)
if self.config.verbose:
LOG.info(
(
@ -247,6 +232,8 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
headers=upstream_headers,
)
log_send_summary(prepared)
try:
if self.config.verbose:
LOG.info("forwarding to %s", upstream_url)
@ -313,7 +300,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
record_response_messages=prepared.record_response_messages,
record_response_contexts=prepared.record_response_contexts,
)
if not sent_response:
if not sent_response.sent:
self._finish_trace(
trace,
"client_disconnected",
@ -321,18 +308,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
stream=bool(prepared.payload.get("stream")),
)
return
LOG.info(
(
"request complete status=%s stream=%s elapsed_ms=%s "
"patched_reasoning=%s missing_reasoning=%s recovered_reasoning=%s"
),
upstream_status,
bool(prepared.payload.get("stream")),
elapsed_ms(started),
prepared.patched_reasoning_messages,
prepared.missing_reasoning_messages,
prepared.recovered_reasoning_messages,
)
log_stats_summary(sent_response.usage)
self._finish_trace(
trace,
"completed",
@ -549,9 +525,10 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
record_response_scope: str | None = None,
record_response_messages: list[dict[str, Any]] | None = None,
record_response_contexts: list[tuple[str, list[dict[str, Any]]]] | None = None,
) -> bool:
) -> ProxyResponseResult:
body = read_response_body(response)
upstream_body = body
usage = usage_from_body(upstream_body)
try:
body = rewrite_response_body(
body,
@ -566,7 +543,6 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
)
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
LOG.warning("failed to rewrite upstream JSON response: %s", exc)
log_usage_from_body(body)
if self.config.verbose:
log_bytes("cursor response body", body)
@ -603,8 +579,9 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
"sending upstream response headers",
)
if not sent_headers:
return False
return self._write_to_client(body, "sending upstream response body")
return ProxyResponseResult(False, usage)
sent = self._write_to_client(body, "sending upstream response body")
return ProxyResponseResult(sent, usage)
def _proxy_streaming_response(
self,
@ -617,7 +594,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
record_response_scope: str | None = None,
record_response_messages: list[dict[str, Any]] | None = None,
record_response_contexts: list[tuple[str, list[dict[str, Any]]]] | None = None,
) -> bool:
) -> ProxyResponseResult:
if trace is not None:
trace.record_upstream_response(
status=getattr(response, "status", 200),
@ -642,10 +619,11 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
"sending streaming response headers",
)
if not sent_headers:
return False
return ProxyResponseResult(False)
self.close_connection = True
accumulator = StreamAccumulator()
usage: dict[str, Any] | None = None
display_adapter = (
CursorReasoningDisplayAdapter()
if self.config.cursor_display_reasoning
@ -673,10 +651,15 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
line = response.readline()
except (HTTPException, OSError) as exc:
LOG.warning("upstream streaming response read failed: %s", exc)
return False
return ProxyResponseResult(False, usage)
if not line:
break
rewritten, finalized, pending_recovery_notice = self._rewrite_sse_line(
(
rewritten,
finalized,
pending_recovery_notice,
chunk_usage,
) = self._rewrite_sse_line(
line,
original_model,
accumulator,
@ -686,12 +669,14 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
pending_recovery_notice,
trace,
)
if chunk_usage is not None:
usage = chunk_usage
if trace is not None:
trace.record_stream_chunk(line, rewritten)
if not self._write_to_client(
rewritten, "sending streaming response chunk", flush=True
):
return False
return ProxyResponseResult(False, usage)
if finalized:
break
@ -707,9 +692,9 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
)
for scope, prior_messages in response_contexts
)
if stored:
if self.config.verbose and stored:
LOG.info("stored %s streaming reasoning cache key(s)", stored)
return True
return ProxyResponseResult(True, usage)
def _rewrite_sse_line(
self,
@ -721,10 +706,10 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
display_adapter: CursorReasoningDisplayAdapter | None,
recovery_notice: str | None = None,
trace: TraceRequest | None = None,
) -> tuple[bytes, bool, str | None]:
) -> tuple[bytes, bool, str | None, dict[str, Any] | None]:
stripped = line.strip()
if not stripped.startswith(b"data:"):
return line, False, recovery_notice
return line, False, recovery_notice, None
data = stripped[len(b"data:") :].strip()
if data == b"[DONE]":
@ -739,7 +724,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
)
for scope, prior_messages in response_contexts
)
if stored:
if self.config.verbose and stored:
LOG.info("stored %s streaming reasoning cache key(s)", stored)
prefix = b""
if display_adapter is None:
@ -747,7 +732,7 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
prefix += sse_data(
recovery_notice_chunk(original_model, recovery_notice)
)
return prefix + b"data: [DONE]\n\n", True, None
return prefix + b"data: [DONE]\n\n", True, None, None
closing_chunk = display_adapter.flush_chunk(original_model)
if closing_chunk is not None:
prefix += sse_data(closing_chunk)
@ -755,12 +740,12 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
prefix += sse_data(
recovery_notice_chunk(original_model, recovery_notice)
)
return prefix + b"data: [DONE]\n\n", True, None
return prefix + b"data: [DONE]\n\n", True, None, None
try:
chunk = json.loads(data.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return line, False, recovery_notice
return line, False, recovery_notice, None
if isinstance(chunk, dict):
if recovery_notice and inject_recovery_notice(chunk, recovery_notice):
@ -775,11 +760,11 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
)
for scope, prior_messages in response_contexts
)
if stored:
if self.config.verbose and stored:
LOG.info("stored %s streaming reasoning cache key(s)", stored)
chunk_usage = chunk.get("usage")
if trace is not None:
trace.record_usage(chunk.get("usage"))
log_usage(chunk.get("usage"))
trace.record_usage(chunk_usage)
if display_adapter is not None:
display_adapter.rewrite_chunk(chunk)
if "model" in chunk:
@ -795,8 +780,9 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler):
),
False,
recovery_notice,
chunk_usage if isinstance(chunk_usage, dict) else None,
)
return line, False, recovery_notice
return line, False, recovery_notice, None
def build_arg_parser() -> argparse.ArgumentParser:
@ -852,7 +838,7 @@ def build_arg_parser() -> argparse.ArgumentParser:
"--verbose",
action=argparse.BooleanOptionalAction,
default=None,
help="Log detailed request lifecycle metadata and full payloads",
help="Log detailed request metadata and full payloads",
)
parser.add_argument(
"--trace-dir",
@ -928,92 +914,141 @@ def log_bytes(label: str, body: bytes) -> None:
log_json(label, payload)
def log_usage_from_body(body: bytes) -> None:
def usage_from_body(body: bytes) -> dict[str, Any] | None:
try:
payload = json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return
return None
if isinstance(payload, dict):
log_usage(payload.get("usage"))
usage = payload.get("usage")
if isinstance(usage, dict):
return usage
return None
def log_usage(usage: Any) -> None:
if not isinstance(usage, dict):
return
summary = compact_usage_stats(usage)
if summary is None:
return
LOG.info("deepseek usage: %s", summary)
def log_cursor_request(
payload: dict[str, Any],
config: ProxyConfig,
) -> None:
model = str(payload.get("model") or config.upstream_model)
LOG.info(
"┌ cursor model=%s messages=%s tools=%s",
model,
format_count(message_count(payload)),
format_count(tool_count(payload)),
)
def compact_request_stats(payload: dict[str, Any]) -> str:
def log_context_summary(prepared: PreparedRequest) -> None:
LOG.info(
"├ context filled=%s missing=%s recovered=%s dropped=%s status=%s",
format_count(prepared.patched_reasoning_messages),
format_count(prepared.missing_reasoning_messages),
format_count(prepared.recovered_reasoning_messages),
format_count(prepared.recovery_dropped_messages),
context_status(prepared),
)
def log_send_summary(prepared: PreparedRequest) -> None:
LOG.info(
"├ send user_msgs=%s messages=%s tools=%s reasoning_content=%s",
format_count(user_message_count(prepared.payload)),
format_count(message_count(prepared.payload)),
format_count(tool_count(prepared.payload)),
format_count(reasoning_content_count(prepared.payload)),
)
def log_stats_summary(usage: dict[str, Any] | None) -> None:
LOG.info(
"└ stats prompt=%s output=%s reasoning=%s cache_hit=%s",
format_usage_count(usage, "prompt_tokens"),
format_usage_count(usage, "completion_tokens"),
format_count(reasoning_token_count(usage)),
cache_hit_rate(usage),
)
def context_status(prepared: PreparedRequest) -> str:
if prepared.recovered_reasoning_messages:
return "recovered"
if prepared.missing_reasoning_messages:
return "missing"
return "ok"
def message_count(payload: dict[str, Any]) -> int:
messages = payload.get("messages")
return len(messages) if isinstance(messages, list) else 0
def tool_count(payload: dict[str, Any]) -> int:
tools = payload.get("tools")
return len(tools) if isinstance(tools, list) else 0
def user_message_count(payload: dict[str, Any]) -> int:
messages = payload.get("messages")
if not isinstance(messages, list):
messages = []
tools = payload.get("tools")
reasoning_count = 0
reasoning_chars = 0
for message in messages:
if not isinstance(message, dict) or message.get("role") != "assistant":
continue
reasoning = message.get("reasoning_content")
if isinstance(reasoning, str):
reasoning_count += 1
reasoning_chars += len(reasoning)
rounds = sum(
return 0
return sum(
1
for message in messages
if isinstance(message, dict) and message.get("role") == "user"
)
return (
f"model={payload.get('model')} stream={int(bool(payload.get('stream')))} "
f"rounds={rounds} msgs={len(messages)} "
f"tools={len(tools) if isinstance(tools, list) else 0} "
f"reasoning={reasoning_count}/{reasoning_chars}ch"
def reasoning_content_count(payload: dict[str, Any]) -> int:
messages = payload.get("messages")
if not isinstance(messages, list):
return 0
return sum(
1
for message in messages
if isinstance(message, dict)
and message.get("role") == "assistant"
and isinstance(message.get("reasoning_content"), str)
)
def compact_usage_stats(usage: dict[str, Any]) -> str | None:
prompt_tokens = usage.get("prompt_tokens")
completion_tokens = usage.get("completion_tokens")
total_tokens = usage.get("total_tokens")
def format_usage_count(usage: dict[str, Any] | None, key: str) -> str:
if not isinstance(usage, dict):
return "?"
return format_count(usage.get(key))
def reasoning_token_count(usage: dict[str, Any] | None) -> Any:
if not isinstance(usage, dict):
return None
details = usage.get("completion_tokens_details")
if not isinstance(details, dict):
return None
return details.get("reasoning_tokens")
def cache_hit_rate(usage: dict[str, Any] | None) -> str:
if not isinstance(usage, dict):
return "?"
hit_tokens = usage.get("prompt_cache_hit_tokens")
miss_tokens = usage.get("prompt_cache_miss_tokens")
details = usage.get("completion_tokens_details")
reasoning_tokens = None
if isinstance(details, dict):
reasoning_tokens = details.get("reasoning_tokens")
if all(
value is None
for value in (
prompt_tokens,
completion_tokens,
total_tokens,
hit_tokens,
miss_tokens,
reasoning_tokens,
)
):
return None
cache_summary = "cache=?"
if hit_tokens is not None or miss_tokens is not None:
if hit_tokens is None and miss_tokens is None:
return "?"
hit = int_or_zero(hit_tokens)
miss = int_or_zero(miss_tokens)
cache_total = hit + miss
if cache_total:
cache_summary = f"cache={hit}/{miss} hit={hit / cache_total:.1%}"
else:
cache_summary = f"cache={hit}/{miss}"
total = hit + miss
if not total:
return "?"
return f"{hit / total:.1%}"
return (
f"prompt={prompt_tokens if prompt_tokens is not None else '?'} "
f"completion={completion_tokens if completion_tokens is not None else '?'} "
f"total={total_tokens if total_tokens is not None else '?'} "
f"{cache_summary} "
f"reasoning={reasoning_tokens if reasoning_tokens is not None else '?'}"
)
def format_count(value: Any) -> str:
if value is None:
return "?"
try:
return f"{int(value):,}"
except (TypeError, ValueError):
return str(value)
def int_or_zero(value: Any) -> int:

View File

@ -585,17 +585,33 @@ class ProxyEndToEndTests(unittest.TestCase):
)
output = "\n".join(captured.output)
stage_records = [
record
for record in captured.output
if any(
marker in record
for marker in ("┌ cursor", "├ context", "├ send", "└ stats")
)
]
self.assertEqual(status, 200)
self.assertIn("cursor request: model='deepseek-v4-pro'", output)
self.assertEqual(len(stage_records), 4)
self.assertTrue(all("\n" not in record for record in stage_records))
self.assertIn(
"deepseek send: model=deepseek-v4-pro stream=0 rounds=1 msgs=1 tools=1 reasoning=0/0ch",
"┌ cursor model=deepseek-v4-pro messages=1 tools=1",
output,
)
self.assertIn(
"deepseek usage: prompt=20 completion=5 total=25 cache=12/8 hit=60.0% reasoning=3",
"├ context filled=0 missing=0 recovered=0 dropped=0 status=ok",
output,
)
self.assertIn(
"├ send user_msgs=1 messages=1 tools=1 reasoning_content=0",
output,
)
self.assertIn(
"└ stats prompt=20 output=5 reasoning=3 cache_hit=60.0%",
output,
)
self.assertIn("request complete status=200", output)
self.assertNotIn("What is tomorrow's date?", output)
self.assertNotIn("sk-from-cursor", output)
@ -710,7 +726,7 @@ class ProxyEndToEndTests(unittest.TestCase):
self.assertEqual(FakeDeepSeekHandler.requests, [])
def test_proxy_recovers_uncached_cursor_tool_history(self) -> None:
with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured:
with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured:
status, payload = post_json(
f"{self.proxy.url}/v1/chat/completions",
third_cursor_request_missing_all_reasoning(),
@ -738,9 +754,12 @@ class ProxyEndToEndTests(unittest.TestCase):
{"role": "user", "content": "Thanks, now continue."},
)
self.assertIn(
"refreshed reasoning_content history",
"status=recovered",
"\n".join(captured.output),
)
self.assertFalse(
any(record.startswith("WARNING:") for record in captured.output)
)
def test_trace_captures_recovery_diagnostics(self) -> None:
with TemporaryDirectory() as temp_dir:

View File

@ -149,7 +149,7 @@ class ServerTests(unittest.TestCase):
finally:
handler.server.reasoning_store.close()
self.assertFalse(sent)
self.assertFalse(sent.sent)
self.assertIn("sending upstream response body", "\n".join(captured.output))
def test_streaming_response_stops_on_client_disconnect(self) -> None:
@ -182,7 +182,7 @@ class ServerTests(unittest.TestCase):
finally:
handler.server.reasoning_store.close()
self.assertFalse(sent)
self.assertFalse(sent.sent)
self.assertEqual(response.readline_calls, 1)
self.assertIn("sending streaming response chunk", "\n".join(captured.output))
@ -200,7 +200,7 @@ class ServerTests(unittest.TestCase):
finally:
handler.server.reasoning_store.close()
self.assertFalse(sent)
self.assertFalse(sent.sent)
self.assertIn(
"upstream streaming response read failed",
"\n".join(captured.output),