diff --git a/.gitignore b/.gitignore index 8966628..d3ef3b3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# AIs +.claude/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[codz] diff --git a/README.md b/README.md index 5750d35..6fd581e 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,7 @@ Select `deepseek-v4-pro` in Cursor and use chat or agent mode as usual. ## How It Works -- **Core fix:** DeepSeek's [thinking mode](https://api-docs.deepseek.com/guides/thinking_mode#tool-calls) requires `reasoning_content` from assistant tool-call messages to be passed back in subsequent requests, but Cursor omits this field, causing a 400 error. The proxy (`Cursor → ngrok → proxy → DeepSeek API`) stores `reasoning_content` from every DeepSeek response in a local SQLite cache, keyed by message signature, tool-call ID, and tool-call function signature, and patches outgoing requests with missing `reasoning_content` before they reach DeepSeek. On a cold cache (proxy restart, model switch), it logs and drops unrecoverable history, continues from the latest user request, and prefixes the next Cursor response with a notice. +- **Core fix:** DeepSeek [thinking-mode tool calls](https://api-docs.deepseek.com/guides/thinking_mode#tool-calls) require the complete **multi-round** `reasoning_content` chain to be sent back in later requests. Cursor omits that field, causing a 400 error. The proxy (`Cursor -> ngrok -> proxy -> DeepSeek API`) stores DeepSeek's original `reasoning_content` and patches missing blocks back into outgoing tool-call history. - **Multi-conversation isolation:** To avoid collisions across concurrent conversations, the proxy scopes cache keys by a SHA-256 hash of the canonical conversation prefix (roles, content, and tool calls, excluding `reasoning_content`) plus the upstream model, configuration, and an API-key hash. Different threads get different scopes, so reused tool-call IDs do not collide. Byte-identical cloned histories produce identical scopes. - **Context caching compatibility:** The proxy preserves compatibility by never injecting synthetic thread IDs, timestamps, or cache-control messages. It restores `reasoning_content` as the exact original string, so repeated prefixes remain intact for [DeepSeek context cache](https://api-docs.deepseek.com/guides/kv_cache). Cache hit rates are logged in the terminal output. - **Additional compatibility fixes:** Beyond reasoning repair, the proxy converts legacy `functions`/`function_call` fields to `tools`/`tool_choice`, preserves required and named tool-choice semantics, normalizes `reasoning_effort` aliases, strips mirrored thinking display blocks from assistant content, flattens multi-part content arrays to plain text, and mirrors `reasoning_content` into Cursor-visible Markdown details blocks. diff --git a/src/deepseek_cursor_proxy/config.py b/src/deepseek_cursor_proxy/config.py index 427887b..8c10e22 100644 --- a/src/deepseek_cursor_proxy/config.py +++ b/src/deepseek_cursor_proxy/config.py @@ -172,8 +172,6 @@ def settings_from_config( def normalize_thinking(value: Any) -> str: thinking = as_str(value, DEFAULT_THINKING).strip().lower() - if thinking in {"passthrough", "pass-through", "pass_through"}: - return "pass-through" if thinking in {"enabled", "disabled"}: return thinking return DEFAULT_THINKING diff --git a/src/deepseek_cursor_proxy/server.py b/src/deepseek_cursor_proxy/server.py index 7356e4f..6479d31 100644 --- a/src/deepseek_cursor_proxy/server.py +++ b/src/deepseek_cursor_proxy/server.py @@ -540,6 +540,8 @@ class DeepSeekProxyHandler(BaseHTTPRequestHandler): scope=record_response_scope, prior_messages=record_response_messages, recording_contexts=record_response_contexts, + display_reasoning=self.config.display_reasoning, + collapsible_reasoning=self.config.collapsible_reasoning, ) except (json.JSONDecodeError, UnicodeDecodeError) as exc: LOG.warning("failed to rewrite upstream JSON response: %s", exc) @@ -812,7 +814,7 @@ def build_arg_parser() -> argparse.ArgumentParser: ) parser.add_argument( "--thinking", - choices=["enabled", "disabled", "pass-through"], + choices=["enabled", "disabled"], help="DeepSeek thinking mode, default from config or enabled", ) parser.add_argument( diff --git a/src/deepseek_cursor_proxy/streaming.py b/src/deepseek_cursor_proxy/streaming.py index 9157d1c..8d6ee95 100644 --- a/src/deepseek_cursor_proxy/streaming.py +++ b/src/deepseek_cursor_proxy/streaming.py @@ -292,3 +292,34 @@ class CursorReasoningDisplayAdapter: } if metadata: self._last_chunk_metadata.update(metadata) + + +def fold_reasoning_into_content( + response_payload: dict[str, Any], + collapsible: bool, +) -> None: + """Mirror `reasoning_content` into the visible `content` field for + non-streaming responses, matching the streaming `
` layout.""" + block_start = ( + COLLAPSIBLE_THINKING_BLOCK_START if collapsible else THINKING_BLOCK_START + ) + block_end = COLLAPSIBLE_THINKING_BLOCK_END if collapsible else THINKING_BLOCK_END + choices = response_payload.get("choices") + if not isinstance(choices, list): + return + for choice in choices: + if not isinstance(choice, dict): + continue + message = choice.get("message") + if not isinstance(message, dict): + continue + reasoning = message.get("reasoning_content") + if not isinstance(reasoning, str) or not reasoning: + continue + content = message.get("content") + message["content"] = ( + block_start + + reasoning + + block_end + + (content if isinstance(content, str) else "") + ) diff --git a/src/deepseek_cursor_proxy/transform.py b/src/deepseek_cursor_proxy/transform.py index 1ea53ee..53f22ee 100644 --- a/src/deepseek_cursor_proxy/transform.py +++ b/src/deepseek_cursor_proxy/transform.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field import hashlib import json +import logging import re from typing import Any @@ -15,6 +16,10 @@ from .reasoning_store import ( tool_call_signature, turn_context_signature, ) +from .streaming import fold_reasoning_into_content + + +LOG = logging.getLogger("deepseek_cursor_proxy") SUPPORTED_REQUEST_FIELDS = { @@ -35,6 +40,13 @@ SUPPORTED_REQUEST_FIELDS = { "frequency_penalty", "logprobs", "top_logprobs", + # Standard OpenAI Chat Completions fields that DeepSeek either honors or + # safely ignores. Cursor and most OpenAI SDKs send these unconditionally, + # so forwarding keeps clients happy and avoids log spam. + "user", + "seed", + "n", + "logit_bias", } MESSAGE_FIELDS = { @@ -83,10 +95,6 @@ CURSOR_THINKING_BLOCK_RE = re.compile( ) RECOVERY_NOTICE_TEXT = "[deepseek-cursor-proxy] Refreshed reasoning_content history." -LEGACY_RECOVERY_NOTICE_TEXT = ( - "Note: recovered this DeepSeek chat because older tool-call reasoning " - "was unavailable; continuing with recent context only." -) RECOVERY_NOTICE_CONTENT = f"{RECOVERY_NOTICE_TEXT}\n\n" RECOVERY_SYSTEM_CONTENT = ( "deepseek-cursor-proxy recovered this request because older DeepSeek " @@ -460,10 +468,33 @@ def has_recovery_notice(message: dict[str, Any]) -> bool: return ( message.get("role") == "assistant" and isinstance(content, str) - and content.startswith((RECOVERY_NOTICE_TEXT, LEGACY_RECOVERY_NOTICE_TEXT)) + and content.startswith(RECOVERY_NOTICE_TEXT) ) +def strip_recovery_notice_for_upstream( + messages: list[dict[str, Any]], +) -> list[dict[str, Any]]: + """Cursor echoes the proxy's recovery notice back to us in later turns. + The notice serves as a boundary marker for the proxy, but DeepSeek must + not see proxy-generated prose. Return a copy with assistant prefixes + stripped; leave the input untouched so cache scopes/recording contexts + keep matching the with-prefix history that Cursor will send next time.""" + stripped: list[dict[str, Any]] = [] + for message in messages: + if message.get("role") != "assistant": + stripped.append(message) + continue + content = message.get("content") + if not isinstance(content, str) or not content.startswith(RECOVERY_NOTICE_TEXT): + stripped.append(message) + continue + cleaned = dict(message) + cleaned["content"] = content[len(RECOVERY_NOTICE_TEXT) :].lstrip("\r\n") + stripped.append(cleaned) + return stripped + + def leading_system_messages(messages: list[dict[str, Any]]) -> list[dict[str, Any]]: leading_messages: list[dict[str, Any]] = [] for message in messages: @@ -628,6 +659,11 @@ def assistant_needs_reasoning_for_tool_context( def upstream_model_for(original_model: str, config: ProxyConfig) -> str: if original_model.startswith("deepseek-"): return original_model + LOG.warning( + "rewriting non-DeepSeek model %r to configured fallback %r", + original_model, + config.upstream_model, + ) return config.upstream_model @@ -688,6 +724,16 @@ def prepare_upstream_request( prepared = { key: value for key, value in payload.items() if key in SUPPORTED_REQUEST_FIELDS } + dropped_fields = sorted( + key + for key in payload.keys() + if key not in SUPPORTED_REQUEST_FIELDS + and key not in {"max_completion_tokens", "functions", "function_call"} + ) + if dropped_fields: + LOG.warning( + "dropping unsupported request field(s): %s", ", ".join(dropped_fields) + ) if "max_tokens" not in prepared and "max_completion_tokens" in payload: prepared["max_tokens"] = payload["max_completion_tokens"] @@ -719,14 +765,9 @@ def prepare_upstream_request( if tool_choice is not None: prepared["tool_choice"] = tool_choice - if config.thinking != "pass-through": - prepared["thinking"] = {"type": config.thinking} - - thinking = prepared.get("thinking") - thinking_enabled = isinstance(thinking, dict) and thinking.get("type") == "enabled" - thinking_disabled = ( - isinstance(thinking, dict) and thinking.get("type") == "disabled" - ) + prepared["thinking"] = {"type": config.thinking} + thinking_enabled = config.thinking == "enabled" + thinking_disabled = config.thinking == "disabled" if thinking_enabled: prepared["reasoning_effort"] = normalize_reasoning_effort( prepared.get("reasoning_effort") or config.reasoning_effort @@ -797,12 +838,12 @@ def prepare_upstream_request( keep_reasoning=not thinking_disabled, ) reasoning_diagnostics.extend(latest_diagnostics) - prepared["messages"] = messages active_record_response_scope = conversation_scope(messages, cache_namespace) record_response_contexts = response_recording_contexts( (record_response_scope, record_response_messages), (active_record_response_scope, messages), ) + prepared["messages"] = strip_recovery_notice_for_upstream(messages) return PreparedRequest( payload=prepared, @@ -874,6 +915,8 @@ def rewrite_response_body( scope: str | None = None, prior_messages: list[dict[str, Any]] | None = None, recording_contexts: list[tuple[str, list[dict[str, Any]]]] | None = None, + display_reasoning: bool = False, + collapsible_reasoning: bool = True, ) -> bytes: response_payload = json.loads(body.decode("utf-8")) if isinstance(response_payload, dict): @@ -888,6 +931,8 @@ def rewrite_response_body( prior_messages=prior_messages, recording_contexts=recording_contexts, ) + if display_reasoning: + fold_reasoning_into_content(response_payload, collapsible_reasoning) if "model" in response_payload: response_payload["model"] = original_model return json.dumps( diff --git a/tests/test_config.py b/tests/test_config.py index 68651e8..118ee67 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -121,7 +121,7 @@ class ConfigTests(unittest.TestCase): [ "base_url: https://example.com/v1/", "model: deepseek-v4-flash", - "thinking: pass_through", + "thinking: disabled", "reasoning_effort: max", "port: 9100", "host: 0.0.0.0", @@ -145,7 +145,7 @@ class ConfigTests(unittest.TestCase): self.assertEqual(config.upstream_base_url, "https://example.com/v1") self.assertEqual(config.upstream_model, "deepseek-v4-flash") - self.assertEqual(config.thinking, "pass-through") + self.assertEqual(config.thinking, "disabled") self.assertEqual(config.reasoning_effort, "max") self.assertEqual(config.host, "0.0.0.0") self.assertEqual(config.port, 9100) diff --git a/tests/test_live_deepseek_cursor_proxy.py b/tests/test_live.py similarity index 100% rename from tests/test_live_deepseek_cursor_proxy.py rename to tests/test_live.py diff --git a/tests/test_protocol.py b/tests/test_protocol.py new file mode 100644 index 0000000..6f565ae --- /dev/null +++ b/tests/test_protocol.py @@ -0,0 +1,1327 @@ +"""End-to-end protocol tests against an in-process fake DeepSeek. + +Each test boots the proxy in-process against a small fake upstream and walks +a real HTTP request scenario. The strict variant rejects with HTTP 400 — the +same error real DeepSeek emits — whenever an assistant message that +participated in a tool-calling turn lacks `reasoning_content`. So if the +proxy is protocol-compliant, every turn succeeds; if not, the upstream +short-circuits and the test fails fast. + +This file is the ground truth for "does the proxy speak DeepSeek correctly?" +""" + +from __future__ import annotations + +from copy import deepcopy +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +import json +import threading +import time +import unittest +from typing import Any +from urllib.error import HTTPError +from urllib.request import Request, urlopen + +from deepseek_cursor_proxy.config import ProxyConfig +from deepseek_cursor_proxy.reasoning_store import ReasoningStore +from deepseek_cursor_proxy.server import DeepSeekProxyHandler, DeepSeekProxyServer + + +# Canonical fake-DeepSeek reasoning/answer text reused across tests. +THINKING_1_1 = "Thinking 1.1 - need to look up the date." +THINKING_1_2 = "Thinking 1.2 - I have the date, now I need the weather." +THINKING_1_3 = "Thinking 1.3 - tool results suffice for the answer." +THINKING_2_1 = "Thinking 2.1 - a brand new user turn." +ANSWER_1 = "Answer 1: Tomorrow is sunny on 2026-04-24." +ANSWER_2 = "Answer 2: Acknowledged follow-up." +CALL_ID_1 = "call_get_date" +CALL_ID_2 = "call_get_weather" + +TOOLS = [ + { + "type": "function", + "function": { + "name": "get_date", + "parameters": {"type": "object", "properties": {}}, + }, + }, + { + "type": "function", + "function": { + "name": "get_weather", + "parameters": { + "type": "object", + "properties": {"date": {"type": "string"}}, + "required": ["date"], + }, + }, + }, +] + + +# --------------------------------------------------------------------------- +# Fake upstreams +# --------------------------------------------------------------------------- + + +def _completion( + *, + chat_id: str, + finish_reason: str, + content: str = "", + reasoning: str | None = None, + tool_calls: list[dict[str, Any]] | None = None, +) -> dict[str, Any]: + message: dict[str, Any] = {"role": "assistant", "content": content} + if reasoning is not None: + message["reasoning_content"] = reasoning + if tool_calls is not None: + message["tool_calls"] = tool_calls + return { + "id": chat_id, + "object": "chat.completion", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [{"index": 0, "finish_reason": finish_reason, "message": message}], + } + + +class StrictFakeDeepSeek(BaseHTTPRequestHandler): + """DeepSeek protocol contract: tool-turn assistants must carry + `reasoning_content` (string). Returns canned canonical responses keyed + on the request shape; rejects with 400 otherwise.""" + + requests: list[dict[str, Any]] = [] + auth_headers: list[str] = [] + + def log_message(self, fmt: str, *args: Any) -> None: + return + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length") or 0) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + self.__class__.requests.append(payload) + self.__class__.auth_headers.append(self.headers.get("Authorization", "")) + + messages = payload.get("messages") or [] + for index, message in enumerate(messages): + if not isinstance(message, dict) or message.get("role") != "assistant": + continue + if _is_tool_turn_assistant(messages, index) and not isinstance( + message.get("reasoning_content"), str + ): + return self._send( + 400, + { + "error": { + "message": ( + "The reasoning_content in the thinking mode " + "must be passed back to the API." + ), + "type": "invalid_request_error", + "code": "invalid_request_error", + "missing_index": index, + } + }, + ) + + last_user = _last_index(messages, "user") + last_tool = _last_index(messages, "tool") + last_assistant = _last_index(messages, "assistant") + if last_user == 0 and last_assistant == -1 and last_tool == -1: + return self._send( + 200, + _completion( + chat_id="chatcmpl-1-1", + finish_reason="tool_calls", + reasoning=THINKING_1_1, + tool_calls=[ + { + "id": CALL_ID_1, + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + } + ], + ), + ) + if last_user > 0 and last_user > last_tool: + return self._send( + 200, + _completion( + chat_id="chatcmpl-2-1", + finish_reason="stop", + content=ANSWER_2, + reasoning=THINKING_2_1, + ), + ) + if ( + last_tool != -1 + and messages[last_tool].get("tool_call_id") == CALL_ID_1 + and last_assistant < last_tool + ): + return self._send( + 200, + _completion( + chat_id="chatcmpl-1-2", + finish_reason="tool_calls", + reasoning=THINKING_1_2, + tool_calls=[ + { + "id": CALL_ID_2, + "type": "function", + "function": { + "name": "get_weather", + "arguments": '{"date":"2026-04-24"}', + }, + } + ], + ), + ) + if last_tool != -1 and messages[last_tool].get("tool_call_id") == CALL_ID_2: + return self._send( + 200, + _completion( + chat_id="chatcmpl-1-3", + finish_reason="stop", + content=ANSWER_1, + reasoning=THINKING_1_3, + ), + ) + return self._send( + 400, + { + "error": { + "message": ( + "test fake: unexpected shape: " + f"roles={[m.get('role') for m in messages]}" + ) + } + }, + ) + + def _send(self, status: int, body: dict[str, Any]) -> None: + encoded = json.dumps(body).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + +def _is_tool_turn_assistant(messages: list[dict[str, Any]], index: int) -> bool: + message = messages[index] + if message.get("tool_calls"): + return True + for prior in reversed(messages[:index]): + role = prior.get("role") + if role == "tool": + return True + if role in {"user", "system"}: + return False + return False + + +def _last_index(messages: list[dict[str, Any]], role: str) -> int: + for i in range(len(messages) - 1, -1, -1): + if messages[i].get("role") == role: + return i + return -1 + + +# --------------------------------------------------------------------------- +# HTTP fixtures +# --------------------------------------------------------------------------- + + +class _Fixture: + def __init__(self, server: ThreadingHTTPServer) -> None: + self.server = server + self.thread = threading.Thread(target=server.serve_forever, daemon=True) + self.thread.start() + + @property + def url(self) -> str: + host, port = self.server.server_address + return f"http://{host}:{port}" + + def close(self) -> None: + self.server.shutdown() + self.server.server_close() + self.thread.join(timeout=5) + + +def _start_strict_upstream() -> _Fixture: + StrictFakeDeepSeek.requests = [] + StrictFakeDeepSeek.auth_headers = [] + return _Fixture(ThreadingHTTPServer(("127.0.0.1", 0), StrictFakeDeepSeek)) + + +def _start_proxy( + upstream_url: str, + store: ReasoningStore, + **config_overrides: Any, +) -> _Fixture: + proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) + proxy.config = ProxyConfig( + upstream_base_url=upstream_url, + upstream_model="deepseek-v4-pro", + ngrok=False, + verbose=False, + cors=False, + **config_overrides, + ) + proxy.reasoning_store = store + return _Fixture(proxy) + + +def _post( + url: str, + payload: dict[str, Any], + authorization: str = "Bearer sk-test", +) -> tuple[int, dict[str, Any]]: + request = Request( + url, + data=json.dumps(payload).encode("utf-8"), + method="POST", + headers={"Authorization": authorization, "Content-Type": "application/json"}, + ) + try: + with urlopen(request, timeout=10) as response: + return response.status, json.loads(response.read().decode("utf-8")) + except HTTPError as exc: + return exc.code, json.loads(exc.read().decode("utf-8")) + + +def _drop_reasoning(message: dict[str, Any]) -> dict[str, Any]: + cleaned = deepcopy(message) + cleaned.pop("reasoning_content", None) + return cleaned + + +# --------------------------------------------------------------------------- +# Test suites +# --------------------------------------------------------------------------- + + +class _StrictUpstreamCase(unittest.TestCase): + """Common setup: strict fake DeepSeek + proxy.""" + + config_overrides: dict[str, Any] = {} + + def setUp(self) -> None: + self.upstream = _start_strict_upstream() + self.store = ReasoningStore(":memory:") + self.proxy = _start_proxy( + self.upstream.url, self.store, **self.config_overrides + ) + + def tearDown(self) -> None: + self.proxy.close() + self.upstream.close() + self.store.close() + + +class CanonicalLoopTests(_StrictUpstreamCase): + def test_canonical_four_turn_tool_call_loop(self) -> None: + """Cursor strips reasoning_content from history; the proxy must + patch every prior assistant message that participated in the tool + chain so the strict upstream accepts each turn.""" + # Turn 1.1: bare user message. + status, response_1_1 = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "What's the weather tomorrow?"} + ], + "tools": TOOLS, + }, + ) + self.assertEqual(status, 200, response_1_1) + first = response_1_1["choices"][0]["message"] + self.assertEqual(first["reasoning_content"], THINKING_1_1) + self.assertEqual(first["tool_calls"][0]["id"], CALL_ID_1) + + # Turn 1.2: append tool result; Cursor drops reasoning. + status, response_1_2 = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "What's the weather tomorrow?"}, + _drop_reasoning(first), + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + ], + "tools": TOOLS, + }, + ) + self.assertEqual(status, 200, response_1_2) + second = response_1_2["choices"][0]["message"] + self.assertEqual(second["tool_calls"][0]["id"], CALL_ID_2) + upstream_1_2 = StrictFakeDeepSeek.requests[1]["messages"] + self.assertEqual(upstream_1_2[1]["reasoning_content"], THINKING_1_1) + + # Turn 1.3: both prior assistants need patching. + status, response_1_3 = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "What's the weather tomorrow?"}, + _drop_reasoning(first), + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + _drop_reasoning(second), + {"role": "tool", "tool_call_id": CALL_ID_2, "content": "sunny"}, + ], + "tools": TOOLS, + }, + ) + self.assertEqual(status, 200, response_1_3) + third = response_1_3["choices"][0]["message"] + self.assertIn(ANSWER_1, third["content"]) + upstream_1_3 = StrictFakeDeepSeek.requests[2]["messages"] + self.assertEqual(upstream_1_3[1]["reasoning_content"], THINKING_1_1) + self.assertEqual(upstream_1_3[3]["reasoning_content"], THINKING_1_2) + + # Turn 2.1: brand new user turn; the prior final assistant also + # needs patching since DeepSeek treats it as part of the tool turn. + status, response_2_1 = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "What's the weather tomorrow?"}, + _drop_reasoning(first), + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + _drop_reasoning(second), + {"role": "tool", "tool_call_id": CALL_ID_2, "content": "sunny"}, + _drop_reasoning(third), + {"role": "user", "content": "Thanks. What about Saturday?"}, + ], + "tools": TOOLS, + }, + ) + self.assertEqual(status, 200, response_2_1) + self.assertIn(ANSWER_2, response_2_1["choices"][0]["message"]["content"]) + upstream_2_1 = StrictFakeDeepSeek.requests[3]["messages"] + self.assertEqual(upstream_2_1[1]["reasoning_content"], THINKING_1_1) + self.assertEqual(upstream_2_1[3]["reasoning_content"], THINKING_1_2) + self.assertEqual(upstream_2_1[5]["reasoning_content"], THINKING_1_3) + + def test_authorization_namespace_isolation(self) -> None: + """A second user with the same conversation prefix must NOT see + cached reasoning from the first user.""" + # Prime cache as user A. + _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "What's the weather tomorrow?"} + ], + "tools": TOOLS, + }, + authorization="Bearer sk-USER-A", + ) + + # User B replays a tool history with the exact same shape. + status, _ = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "What's the weather tomorrow?"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": CALL_ID_1, + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + } + ], + }, + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + ], + "tools": TOOLS, + }, + authorization="Bearer sk-USER-B", + ) + self.assertEqual(status, 200) + sent = StrictFakeDeepSeek.requests[-1] + leaked = any( + m.get("role") == "assistant" and m.get("reasoning_content") == THINKING_1_1 + for m in sent["messages"] + ) + self.assertFalse(leaked) + + +class StrictRejectModeTests(_StrictUpstreamCase): + config_overrides = {"missing_reasoning_strategy": "reject"} + + def test_returns_409_without_calling_upstream(self) -> None: + status, payload = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": CALL_ID_1, + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + } + ], + }, + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + ], + }, + ) + self.assertEqual(status, 409, payload) + self.assertEqual(payload["error"]["missing_reasoning_messages"], 1) + self.assertEqual(StrictFakeDeepSeek.requests, []) + + +class ThinkingDisabledTests(_StrictUpstreamCase): + config_overrides = {"thinking": "disabled"} + + def test_disabled_does_not_inject_reasoning(self) -> None: + _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "ping"}, + { + "role": "assistant", + "content": "", + "reasoning_content": "Should be discarded by proxy.", + "tool_calls": [ + { + "id": CALL_ID_1, + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + } + ], + }, + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + ], + }, + ) + sent = StrictFakeDeepSeek.requests[-1] + self.assertEqual(sent["thinking"], {"type": "disabled"}) + self.assertNotIn("reasoning_content", sent["messages"][1]) + + +class RecoveryTests(_StrictUpstreamCase): + def test_cold_cache_recovers_to_latest_user_with_notice(self) -> None: + """Stale tool history with no cached reasoning: proxy keeps only + the latest user message + recovery system message and prefixes a + user-facing notice into the response.""" + status, response = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "system", "content": "Be brief."}, + {"role": "user", "content": "old work"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": CALL_ID_1, + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + } + ], + }, + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + {"role": "user", "content": "Thanks. What about Saturday?"}, + ], + }, + ) + self.assertEqual(status, 200, response) + sent = StrictFakeDeepSeek.requests[-1] + self.assertEqual( + [m["role"] for m in sent["messages"]], ["system", "system", "user"] + ) + self.assertEqual( + sent["messages"][-1]["content"], "Thanks. What about Saturday?" + ) + self.assertIn( + "[deepseek-cursor-proxy] Refreshed reasoning", + response["choices"][0]["message"]["content"], + ) + + def test_recovery_notice_is_stripped_before_upstream_replay(self) -> None: + """When Cursor echoes a previous response (which carried the proxy's + recovery notice) back as assistant content, the notice serves as a + boundary marker for the proxy but must not be replayed upstream as + if DeepSeek had written it.""" + # Trigger initial recovery so the response carries the notice. + status, first = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "old work"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": CALL_ID_1, + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + } + ], + }, + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + {"role": "user", "content": "Thanks. What about Saturday?"}, + ], + }, + ) + self.assertEqual(status, 200) + + # Cursor faithfully echoes the response (including the notice prefix). + echoed = _drop_reasoning(first["choices"][0]["message"]) + status, _ = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "old work"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": CALL_ID_1, + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + } + ], + }, + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + {"role": "user", "content": "Thanks. What about Saturday?"}, + echoed, + {"role": "user", "content": "And Sunday?"}, + ], + }, + ) + self.assertEqual(status, 200) + sent = StrictFakeDeepSeek.requests[-1] + for message in sent["messages"]: + if message.get("role") != "assistant": + continue + self.assertNotIn("deepseek-cursor-proxy", message.get("content", "")) + + +# --------------------------------------------------------------------------- +# Streaming behaviour +# --------------------------------------------------------------------------- + + +def _sse_chunks(*chunks: dict[str, Any]) -> bytes: + out = b"" + for chunk in chunks: + out += f"data: {json.dumps(chunk)}\n\n".encode("utf-8") + out += b"data: [DONE]\n\n" + return out + + +class _StreamingThenJsonHandler(BaseHTTPRequestHandler): + """First request streams a tool call (with reasoning); subsequent + non-streaming requests echo a final answer if the assistant message + carries the previously-streamed reasoning_content.""" + + requests: list[dict[str, Any]] = [] + + def log_message(self, fmt: str, *args: Any) -> None: + return + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length") or 0) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + self.__class__.requests.append(payload) + + if payload.get("stream"): + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.end_headers() + self.wfile.write( + _sse_chunks( + { + "id": "stream-1", + "object": "chat.completion.chunk", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [ + { + "index": 0, + "delta": { + "role": "assistant", + "reasoning_content": THINKING_1_1, + "tool_calls": [ + { + "index": 0, + "id": CALL_ID_1, + "type": "function", + "function": { + "name": "get_date", + "arguments": "{}", + }, + } + ], + }, + "finish_reason": None, + } + ], + }, + { + "id": "stream-1", + "object": "chat.completion.chunk", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [ + {"index": 0, "delta": {}, "finish_reason": "tool_calls"} + ], + }, + ) + ) + self.wfile.flush() + return + + # Non-streaming follow-up: enforce that proxy patched the prior + # streamed reasoning_content into history. + assistant = payload["messages"][1] + if assistant.get("reasoning_content") != THINKING_1_1: + self._send(400, {"error": {"message": "missing streamed reasoning"}}) + return + self._send( + 200, + _completion( + chat_id="follow-up", + finish_reason="stop", + content="follow-up accepted", + reasoning="post-tool reasoning", + ), + ) + + def _send(self, status: int, body: dict[str, Any]) -> None: + encoded = json.dumps(body).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + +class StreamingThenNonStreamingTests(unittest.TestCase): + def setUp(self) -> None: + _StreamingThenJsonHandler.requests = [] + self.upstream = _Fixture( + ThreadingHTTPServer(("127.0.0.1", 0), _StreamingThenJsonHandler) + ) + self.store = ReasoningStore(":memory:") + self.proxy = _start_proxy(self.upstream.url, self.store) + + def tearDown(self) -> None: + self.proxy.close() + self.upstream.close() + self.store.close() + + def test_streamed_reasoning_is_replayed_in_next_non_streaming_request( + self, + ) -> None: + """Cursor often streams Turn 1.1 then issues Turn 1.2 as a non-stream + POST. The proxy must repair reasoning_content from the streaming + cache before forwarding the follow-up.""" + # Stream a tool call. + request = Request( + f"{self.proxy.url}/v1/chat/completions", + data=json.dumps( + { + "model": "deepseek-v4-pro", + "stream": True, + "messages": [{"role": "user", "content": "go"}], + "tools": [TOOLS[0]], + } + ).encode("utf-8"), + method="POST", + headers={ + "Authorization": "Bearer sk-test", + "Content-Type": "application/json", + }, + ) + with urlopen(request, timeout=5) as response: + self.assertIn("data: [DONE]", response.read().decode("utf-8")) + + # Non-streaming follow-up (Cursor strips reasoning_content). + status, payload = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "go"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": CALL_ID_1, + "type": "function", + "function": { + "name": "get_date", + "arguments": "{}", + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": CALL_ID_1, + "content": "2026-04-24", + }, + ], + "tools": [TOOLS[0]], + }, + ) + self.assertEqual(status, 200, payload) + sent = _StreamingThenJsonHandler.requests[-1] + self.assertEqual(sent["messages"][1]["reasoning_content"], THINKING_1_1) + + +class _ReasoningStreamHandler(BaseHTTPRequestHandler): + """Streams reasoning_content tokens followed by a content answer.""" + + def log_message(self, fmt: str, *args: Any) -> None: + return + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length") or 0) + self.rfile.read(length) + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.end_headers() + self.wfile.write( + _sse_chunks( + { + "id": "stream-r", + "object": "chat.completion.chunk", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [ + { + "index": 0, + "delta": { + "role": "assistant", + "reasoning_content": "Need ", + }, + "finish_reason": None, + } + ], + }, + { + "id": "stream-r", + "object": "chat.completion.chunk", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [ + { + "index": 0, + "delta": {"reasoning_content": "context."}, + "finish_reason": None, + } + ], + }, + { + "id": "stream-r", + "object": "chat.completion.chunk", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [ + { + "index": 0, + "delta": {"content": "Final."}, + "finish_reason": None, + } + ], + }, + { + "id": "stream-r", + "object": "chat.completion.chunk", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], + }, + ) + ) + self.wfile.flush() + + +class StreamingDisplayTests(unittest.TestCase): + def setUp(self) -> None: + self.upstream = _Fixture( + ThreadingHTTPServer(("127.0.0.1", 0), _ReasoningStreamHandler) + ) + self.store = ReasoningStore(":memory:") + self.proxy = _start_proxy(self.upstream.url, self.store) + + def tearDown(self) -> None: + self.proxy.close() + self.upstream.close() + self.store.close() + + def test_streaming_response_mirrors_reasoning_into_details_block(self) -> None: + request = Request( + f"{self.proxy.url}/v1/chat/completions", + data=json.dumps( + { + "model": "deepseek-v4-pro", + "stream": True, + "messages": [{"role": "user", "content": "stream"}], + } + ).encode("utf-8"), + method="POST", + headers={ + "Authorization": "Bearer sk-test", + "Content-Type": "application/json", + }, + ) + with urlopen(request, timeout=2) as response: + body = response.read().decode("utf-8") + + chunks = [ + json.loads(line.removeprefix("data: ")) + for line in body.splitlines() + if line.startswith("data: {") + ] + self.assertEqual( + chunks[0]["choices"][0]["delta"]["content"], + "
\nThinking\n\nNeed ", + ) + self.assertEqual( + chunks[2]["choices"][0]["delta"]["content"], + "\n
\n\nFinal.", + ) + + +class NonStreamingDisplayTests(_StrictUpstreamCase): + def test_non_streaming_response_mirrors_reasoning_into_details_block( + self, + ) -> None: + """The README claims thinking tokens are displayed in Cursor; this + must hold for non-streaming responses too, not only streaming ones.""" + status, response = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "What's the weather tomorrow?"} + ], + "tools": TOOLS, + }, + ) + self.assertEqual(status, 200, response) + # Turn 1.1 has empty content + reasoning + tool calls. + content = response["choices"][0]["message"]["content"] + self.assertEqual( + content, + f"
\nThinking\n\n{THINKING_1_1}\n
\n\n", + ) + + +# --------------------------------------------------------------------------- +# Concurrent threads (independent fake to keep the canonical strict fake +# stateless across the suite). +# --------------------------------------------------------------------------- + + +class _PerThreadFakeDeepSeek(BaseHTTPRequestHandler): + """Strict fake keyed on a thread-name embedded in the user message; each + thread expects its own reasoning text. Catches cross-thread cache leaks.""" + + requests: list[dict[str, Any]] = [] + + def log_message(self, fmt: str, *args: Any) -> None: + return + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length") or 0) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + self.__class__.requests.append(payload) + + thread = self._thread_name(payload.get("messages") or []) + expected_tool = f"tool reasoning {thread}" + expected_final = f"final reasoning {thread}" + final_content = f"answer {thread}" + + for index, message in enumerate(payload.get("messages") or []): + if message.get("role") != "assistant": + continue + if ( + message.get("tool_calls") + and message.get("reasoning_content") != expected_tool + ): + return self._send(400, {"error": {"missing_index": index}}) + if ( + message.get("content") == final_content + and message.get("reasoning_content") != expected_final + ): + return self._send(400, {"error": {"missing_index": index}}) + + messages = payload.get("messages") or [] + if len(messages) == 1: + return self._send( + 200, + _completion( + chat_id=f"tool-{thread}", + finish_reason="tool_calls", + reasoning=expected_tool, + tool_calls=[ + { + "id": "call_reused", + "type": "function", + "function": {"name": "lookup", "arguments": "{}"}, + } + ], + ), + ) + if len(messages) == 3: + return self._send( + 200, + _completion( + chat_id=f"final-{thread}", + finish_reason="stop", + content=final_content, + reasoning=expected_final, + ), + ) + return self._send( + 200, + _completion( + chat_id=f"followup-{thread}", + finish_reason="stop", + content=f"follow-up {thread}", + reasoning="follow-up reasoning", + ), + ) + + @staticmethod + def _thread_name(messages: list[dict[str, Any]]) -> str: + if not messages: + return "?" + text = str(messages[0].get("content") or "") + if "thread A" in text: + return "A" + if "thread B" in text: + return "B" + return "?" + + def _send(self, status: int, body: dict[str, Any]) -> None: + encoded = json.dumps(body).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + +class ConcurrentThreadTests(unittest.TestCase): + def setUp(self) -> None: + _PerThreadFakeDeepSeek.requests = [] + self.upstream = _Fixture( + ThreadingHTTPServer(("127.0.0.1", 0), _PerThreadFakeDeepSeek) + ) + self.store = ReasoningStore(":memory:") + self.proxy = _start_proxy(self.upstream.url, self.store) + + def tearDown(self) -> None: + self.proxy.close() + self.upstream.close() + self.store.close() + + def test_two_interleaved_threads_do_not_leak_reasoning(self) -> None: + """Tool-call IDs are reused across both threads. The proxy must + scope cache by conversation, not by tool_call_id, so each thread + sees only its own reasoning.""" + tools = [ + { + "type": "function", + "function": { + "name": "lookup", + "parameters": {"type": "object", "properties": {}}, + }, + } + ] + + def first(thread: str) -> dict[str, Any]: + return { + "model": "deepseek-v4-pro", + "messages": [{"role": "user", "content": f"Start thread {thread}."}], + "tools": tools, + } + + def second(thread: str, assistant: dict[str, Any]) -> dict[str, Any]: + return { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": f"Start thread {thread}."}, + _drop_reasoning(assistant), + { + "role": "tool", + "tool_call_id": "call_reused", + "content": f"tool result {thread}", + }, + ], + "tools": tools, + } + + status, first_a = _post(f"{self.proxy.url}/v1/chat/completions", first("A")) + self.assertEqual(status, 200) + status, first_b = _post(f"{self.proxy.url}/v1/chat/completions", first("B")) + self.assertEqual(status, 200) + status, _ = _post( + f"{self.proxy.url}/v1/chat/completions", + second("B", first_b["choices"][0]["message"]), + ) + self.assertEqual(status, 200) + status, _ = _post( + f"{self.proxy.url}/v1/chat/completions", + second("A", first_a["choices"][0]["message"]), + ) + self.assertEqual(status, 200) + + # If the cache leaked, the upstream's strict reasoning check would + # have rejected one of these turns with 400. + upstream_b = _PerThreadFakeDeepSeek.requests[2]["messages"] + upstream_a = _PerThreadFakeDeepSeek.requests[3]["messages"] + self.assertEqual(upstream_b[1]["reasoning_content"], "tool reasoning B") + self.assertEqual(upstream_a[1]["reasoning_content"], "tool reasoning A") + + +# --------------------------------------------------------------------------- +# Streaming-cache timing: tool reasoning must be available before [DONE]. +# --------------------------------------------------------------------------- + + +class _SlowToolStreamHandler(BaseHTTPRequestHandler): + """Streams reasoning + tool_calls, then waits before sending [DONE]. + Lets us verify the proxy stores reasoning at finish_reason=tool_calls, + not at [DONE].""" + + def log_message(self, fmt: str, *args: Any) -> None: + return + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length") or 0) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + + if payload.get("stream"): + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.end_headers() + chunks = [ + { + "id": "stream-tool", + "object": "chat.completion.chunk", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [ + { + "index": 0, + "delta": { + "role": "assistant", + "reasoning_content": "Streamed tool reasoning.", + "tool_calls": [ + { + "index": 0, + "id": "call_stream_tool", + "type": "function", + "function": { + "name": "lookup", + "arguments": "{}", + }, + } + ], + }, + "finish_reason": None, + } + ], + }, + { + "id": "stream-tool", + "object": "chat.completion.chunk", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [ + {"index": 0, "delta": {}, "finish_reason": "tool_calls"} + ], + }, + ] + for chunk in chunks: + self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode("utf-8")) + self.wfile.flush() + if chunk["choices"][0]["finish_reason"] is None: + time.sleep(0.2) + time.sleep(1) # delay [DONE] so the follow-up beats it. + self.wfile.write(b"data: [DONE]\n\n") + self.wfile.flush() + return + + # Non-streaming follow-up. + messages = payload.get("messages") or [] + if ( + len(messages) >= 2 + and messages[1].get("reasoning_content") == "Streamed tool reasoning." + ): + self._send( + 200, + _completion( + chat_id="follow", + finish_reason="stop", + content="follow-up accepted", + reasoning="post-tool", + ), + ) + return + self._send(400, {"error": {"message": "missing streamed reasoning"}}) + + def _send(self, status: int, body: dict[str, Any]) -> None: + encoded = json.dumps(body).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + +class StreamingCacheTimingTests(unittest.TestCase): + def setUp(self) -> None: + self.upstream = _Fixture( + ThreadingHTTPServer(("127.0.0.1", 0), _SlowToolStreamHandler) + ) + self.store = ReasoningStore(":memory:") + self.proxy = _start_proxy(self.upstream.url, self.store) + + def tearDown(self) -> None: + self.proxy.close() + self.upstream.close() + self.store.close() + + def test_tool_reasoning_is_cached_before_done(self) -> None: + """A follow-up POST issued before [DONE] must still find the + streamed reasoning in cache.""" + tools = [ + { + "type": "function", + "function": { + "name": "lookup", + "parameters": {"type": "object", "properties": {}}, + }, + } + ] + request = Request( + f"{self.proxy.url}/v1/chat/completions", + data=json.dumps( + { + "model": "deepseek-v4-pro", + "stream": True, + "messages": [{"role": "user", "content": "stream tool"}], + "tools": tools, + } + ).encode("utf-8"), + method="POST", + headers={ + "Authorization": "Bearer sk-test", + "Content-Type": "application/json", + }, + ) + with urlopen(request, timeout=3) as response: + # Read until we see the tool_calls chunk; the upstream then + # delays [DONE] for ~1s, giving us a window to send a follow-up. + while True: + line = response.readline().decode("utf-8") + self.assertNotEqual(line, "") + if '"finish_reason":"tool_calls"' in line: + break + + status, payload = _post( + f"{self.proxy.url}/v1/chat/completions", + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "stream tool"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_stream_tool", + "type": "function", + "function": {"name": "lookup", "arguments": "{}"}, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_stream_tool", + "content": "tool result", + }, + ], + "tools": tools, + }, + ) + response.read() + self.assertEqual(status, 200, payload) + self.assertEqual( + payload["choices"][0]["message"]["content"].split("
")[-1], + "\n\nfollow-up accepted", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_proxy_end_to_end.py b/tests/test_proxy_end_to_end.py deleted file mode 100644 index 18d5cf8..0000000 --- a/tests/test_proxy_end_to_end.py +++ /dev/null @@ -1,1414 +0,0 @@ -from __future__ import annotations - -from dataclasses import replace -from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -import json -from pathlib import Path -from tempfile import TemporaryDirectory -import threading -import time -import unittest -from urllib.error import HTTPError -from urllib.request import Request, urlopen - -from deepseek_cursor_proxy.config import ProxyConfig -from deepseek_cursor_proxy.reasoning_store import ( - ReasoningStore, - conversation_scope, - message_signature, -) -from deepseek_cursor_proxy.server import DeepSeekProxyHandler, DeepSeekProxyServer -from deepseek_cursor_proxy.trace import TraceWriter -from deepseek_cursor_proxy.transform import ( - RECOVERY_NOTICE_CONTENT, - reasoning_cache_namespace, -) - - -TOOL_REASONING = "I need the current date before answering." -FINAL_REASONING = "The tool result gives the date, so I can answer." -FINAL_CONTENT = "Final answer after using the tool." - - -def post_json( - url: str, payload: dict, api_key: str = "sk-cursor-test" -) -> tuple[int, dict]: - body = json.dumps(payload).encode("utf-8") - request = Request( - url, - data=body, - method="POST", - headers={ - "Authorization": f"Bearer {api_key}", - "Content-Type": "application/json", - }, - ) - try: - response = urlopen(request, timeout=5) - with response: - return response.status, json.loads(response.read().decode("utf-8")) - except HTTPError as exc: - return exc.code, json.loads(exc.read().decode("utf-8")) - - -class FakeDeepSeekHandler(BaseHTTPRequestHandler): - requests: list[dict] = [] - auth_headers: list[str] = [] - - def log_message(self, fmt: str, *args: object) -> None: - return - - def do_POST(self) -> None: - length = int(self.headers.get("Content-Length") or 0) - payload = json.loads(self.rfile.read(length).decode("utf-8")) - self.__class__.requests.append(payload) - self.__class__.auth_headers.append(self.headers.get("Authorization", "")) - - for index, message in enumerate(payload.get("messages", [])): - if not isinstance(message, dict) or message.get("role") != "assistant": - continue - requires_reasoning = ( - bool(message.get("tool_calls")) - or message.get("content") == FINAL_CONTENT - ) - if requires_reasoning and not message.get("reasoning_content"): - self._send_json( - 400, - { - "error": { - "message": "The reasoning_content in the thinking mode must be passed back to the API.", - "type": "invalid_request_error", - "param": None, - "code": "invalid_request_error", - "missing_index": index, - } - }, - ) - return - - call_number = len(self.__class__.requests) - if call_number == 1: - self._send_json(200, tool_call_response()) - elif call_number == 2: - self._send_json(200, final_response()) - else: - self._send_json(200, plain_response("Follow-up accepted.")) - - def _send_json(self, status: int, payload: dict) -> None: - body = json.dumps(payload).encode("utf-8") - self.send_response(status) - self.send_header("Content-Type", "application/json") - self.send_header("Content-Length", str(len(body))) - self.end_headers() - self.wfile.write(body) - - -class InterleavedFakeDeepSeekHandler(BaseHTTPRequestHandler): - requests: list[dict] = [] - - def log_message(self, fmt: str, *args: object) -> None: - return - - def do_POST(self) -> None: - length = int(self.headers.get("Content-Length") or 0) - payload = json.loads(self.rfile.read(length).decode("utf-8")) - self.__class__.requests.append(payload) - - messages = payload.get("messages", []) - thread_name = thread_name_from_messages(messages) - if thread_name not in {"A", "B"}: - self._send_json(400, {"error": {"message": "unknown test thread"}}) - return - - expected_tool_reasoning = f"tool reasoning for thread {thread_name}" - expected_final_reasoning = f"final reasoning for thread {thread_name}" - final_content = f"final answer for thread {thread_name}" - - for index, message in enumerate(messages): - if not isinstance(message, dict) or message.get("role") != "assistant": - continue - if ( - message.get("tool_calls") - and message.get("reasoning_content") != expected_tool_reasoning - ): - self._send_missing_reasoning(index) - return - if ( - message.get("content") == final_content - and message.get("reasoning_content") != expected_final_reasoning - ): - self._send_missing_reasoning(index) - return - - if len(messages) == 1: - self._send_json(200, interleaved_tool_call_response(thread_name)) - elif len(messages) == 3: - self._send_json(200, interleaved_final_response(thread_name)) - else: - self._send_json( - 200, plain_response(f"follow-up accepted for thread {thread_name}") - ) - - def _send_missing_reasoning(self, index: int) -> None: - self._send_json( - 400, - { - "error": { - "message": "The reasoning_content in the thinking mode must be passed back to the API.", - "type": "invalid_request_error", - "param": None, - "code": "invalid_request_error", - "missing_index": index, - } - }, - ) - - def _send_json(self, status: int, payload: dict) -> None: - body = json.dumps(payload).encode("utf-8") - self.send_response(status) - self.send_header("Content-Type", "application/json") - self.send_header("Content-Length", str(len(body))) - self.end_headers() - self.wfile.write(body) - - -class SlowAfterDoneStreamingDeepSeekHandler(BaseHTTPRequestHandler): - def log_message(self, fmt: str, *args: object) -> None: - return - - def do_POST(self) -> None: - self.send_response(200) - self.send_header("Content-Type", "text/event-stream") - self.end_headers() - chunk = { - "id": "chatcmpl-stream", - "object": "chat.completion.chunk", - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "delta": {"role": "assistant", "content": "streamed"}, - "finish_reason": None, - } - ], - } - self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode("utf-8")) - self.wfile.write(b"data: [DONE]\n\n") - self.wfile.flush() - time.sleep(2) - - -class ReasoningStreamingDeepSeekHandler(BaseHTTPRequestHandler): - def log_message(self, fmt: str, *args: object) -> None: - return - - def do_POST(self) -> None: - self.send_response(200) - self.send_header("Content-Type", "text/event-stream") - self.end_headers() - chunks = [ - { - "id": "chatcmpl-stream", - "object": "chat.completion.chunk", - "created": 1, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "delta": {"role": "assistant", "reasoning_content": "Need "}, - "finish_reason": None, - } - ], - }, - { - "id": "chatcmpl-stream", - "object": "chat.completion.chunk", - "created": 1, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "delta": {"reasoning_content": "context."}, - "finish_reason": None, - } - ], - }, - { - "id": "chatcmpl-stream", - "object": "chat.completion.chunk", - "created": 1, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "delta": {"content": FINAL_CONTENT}, - "finish_reason": None, - } - ], - }, - { - "id": "chatcmpl-stream", - "object": "chat.completion.chunk", - "created": 1, - "model": "deepseek-v4-pro", - "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], - "usage": { - "prompt_tokens": 10, - "completion_tokens": 5, - "total_tokens": 15, - "completion_tokens_details": {"reasoning_tokens": 3}, - }, - }, - ] - for chunk in chunks: - self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode("utf-8")) - self.wfile.write(b"data: [DONE]\n\n") - self.wfile.flush() - - -class ToolCallStreamingBeforeDoneDeepSeekHandler(BaseHTTPRequestHandler): - requests: list[dict] = [] - - def log_message(self, fmt: str, *args: object) -> None: - return - - def do_POST(self) -> None: - length = int(self.headers.get("Content-Length") or 0) - payload = json.loads(self.rfile.read(length).decode("utf-8")) - self.__class__.requests.append(payload) - - if payload.get("stream"): - self.send_response(200) - self.send_header("Content-Type", "text/event-stream") - self.end_headers() - chunks = [ - { - "id": "chatcmpl-stream-tool", - "object": "chat.completion.chunk", - "created": 1, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "delta": { - "role": "assistant", - "reasoning_content": "Streamed tool reasoning.", - "tool_calls": [ - { - "index": 0, - "id": "call_stream_tool", - "type": "function", - "function": { - "name": "lookup", - "arguments": "{}", - }, - } - ], - }, - "finish_reason": None, - } - ], - }, - { - "id": "chatcmpl-stream-tool", - "object": "chat.completion.chunk", - "created": 1, - "model": "deepseek-v4-pro", - "choices": [ - {"index": 0, "delta": {}, "finish_reason": "tool_calls"} - ], - }, - ] - for chunk in chunks: - self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode("utf-8")) - self.wfile.flush() - if chunk["choices"][0]["finish_reason"] is None: - time.sleep(0.2) - time.sleep(1) - self.wfile.write(b"data: [DONE]\n\n") - self.wfile.flush() - return - - messages = payload.get("messages", []) - if ( - len(messages) >= 2 - and messages[1].get("reasoning_content") == "Streamed tool reasoning." - ): - self._send_json(200, plain_response("stream follow-up accepted")) - return - self._send_json(400, {"error": {"message": "missing streamed reasoning"}}) - - def _send_json(self, status: int, payload: dict) -> None: - body = json.dumps(payload).encode("utf-8") - self.send_response(status) - self.send_header("Content-Type", "application/json") - self.send_header("Content-Length", str(len(body))) - self.end_headers() - self.wfile.write(body) - - -def tool_call_response() -> dict: - return { - "id": "chatcmpl-tool", - "object": "chat.completion", - "created": 1, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "finish_reason": "tool_calls", - "message": { - "role": "assistant", - "content": "", - "reasoning_content": TOOL_REASONING, - "tool_calls": [ - { - "id": "call_date", - "type": "function", - "function": {"name": "get_date", "arguments": "{}"}, - } - ], - }, - } - ], - "usage": { - "prompt_tokens": 20, - "completion_tokens": 5, - "total_tokens": 25, - "prompt_cache_hit_tokens": 12, - "prompt_cache_miss_tokens": 8, - "completion_tokens_details": {"reasoning_tokens": 3}, - }, - } - - -def interleaved_tool_call_response(thread_name: str) -> dict: - return { - "id": f"chatcmpl-tool-{thread_name}", - "object": "chat.completion", - "created": 1, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "finish_reason": "tool_calls", - "message": { - "role": "assistant", - "content": "", - "reasoning_content": f"tool reasoning for thread {thread_name}", - "tool_calls": [ - { - "id": "call_reused", - "type": "function", - "function": {"name": "lookup", "arguments": "{}"}, - } - ], - }, - } - ], - } - - -def interleaved_final_response(thread_name: str) -> dict: - return { - "id": f"chatcmpl-final-{thread_name}", - "object": "chat.completion", - "created": 2, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "finish_reason": "stop", - "message": { - "role": "assistant", - "content": f"final answer for thread {thread_name}", - "reasoning_content": f"final reasoning for thread {thread_name}", - }, - } - ], - } - - -def final_response() -> dict: - return { - "id": "chatcmpl-final", - "object": "chat.completion", - "created": 2, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "finish_reason": "stop", - "message": { - "role": "assistant", - "content": FINAL_CONTENT, - "reasoning_content": FINAL_REASONING, - }, - } - ], - } - - -def plain_response(content: str) -> dict: - return { - "id": "chatcmpl-plain", - "object": "chat.completion", - "created": 3, - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "finish_reason": "stop", - "message": {"role": "assistant", "content": content}, - } - ], - } - - -class ServerFixture: - def __init__(self, server: ThreadingHTTPServer) -> None: - self.server = server - self.thread = threading.Thread(target=server.serve_forever, daemon=True) - - @property - def url(self) -> str: - host, port = self.server.server_address - return f"http://{host}:{port}" - - def start(self) -> "ServerFixture": - self.thread.start() - return self - - def close(self) -> None: - self.server.shutdown() - self.server.server_close() - self.thread.join(timeout=5) - - -def read_single_trace(session_dir: Path) -> dict: - deadline = time.monotonic() + 2 - trace_files = sorted(session_dir.glob("request-*.json")) - while not trace_files and time.monotonic() < deadline: - time.sleep(0.01) - trace_files = sorted(session_dir.glob("request-*.json")) - if len(trace_files) != 1: - raise AssertionError(f"expected one trace file, found {trace_files}") - return json.loads(trace_files[0].read_text(encoding="utf-8")) - - -class ProxyEndToEndTests(unittest.TestCase): - def setUp(self) -> None: - FakeDeepSeekHandler.requests = [] - FakeDeepSeekHandler.auth_headers = [] - self.upstream = ServerFixture( - ThreadingHTTPServer(("127.0.0.1", 0), FakeDeepSeekHandler) - ).start() - self.store = ReasoningStore(":memory:") - proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) - proxy.config = ProxyConfig( - upstream_base_url=self.upstream.url, - upstream_model="deepseek-v4-pro", - ) - proxy.reasoning_store = self.store - self.proxy = ServerFixture(proxy).start() - - def tearDown(self) -> None: - self.proxy.close() - self.upstream.close() - self.store.close() - - def test_missing_reasoning_reproduces_upstream_error_without_proxy_repair( - self, - ) -> None: - status, payload = post_json( - f"{self.upstream.url}/chat/completions", - second_cursor_request(include_reasoning=False), - ) - - self.assertEqual(status, 400) - self.assertIn("reasoning_content", payload["error"]["message"]) - - def test_proxy_repairs_cursor_style_multi_round_tool_call_history(self) -> None: - status, first = post_json( - f"{self.proxy.url}/v1/chat/completions", - first_cursor_request(), - ) - self.assertEqual(status, 200) - tool_call_message = first["choices"][0]["message"] - self.assertEqual(tool_call_message["reasoning_content"], TOOL_REASONING) - - status, second = post_json( - f"{self.proxy.url}/v1/chat/completions", - second_cursor_request(include_reasoning=False), - ) - self.assertEqual(status, 200) - self.assertEqual(second["choices"][0]["message"]["content"], FINAL_CONTENT) - - status, third = post_json( - f"{self.proxy.url}/v1/chat/completions", - third_cursor_request_missing_all_reasoning(), - ) - self.assertEqual(status, 200) - self.assertEqual( - third["choices"][0]["message"]["content"], "Follow-up accepted." - ) - - second_upstream_messages = FakeDeepSeekHandler.requests[1]["messages"] - self.assertEqual( - second_upstream_messages[1]["reasoning_content"], TOOL_REASONING - ) - - third_upstream_messages = FakeDeepSeekHandler.requests[2]["messages"] - self.assertEqual( - third_upstream_messages[1]["reasoning_content"], TOOL_REASONING - ) - self.assertEqual( - third_upstream_messages[3]["reasoning_content"], FINAL_REASONING - ) - - def test_proxy_forwards_cursor_bearer_token_to_deepseek(self) -> None: - status, _ = post_json( - f"{self.proxy.url}/v1/chat/completions", - first_cursor_request(), - api_key="sk-from-cursor", - ) - - self.assertEqual(status, 200) - self.assertEqual(FakeDeepSeekHandler.auth_headers[0], "Bearer sk-from-cursor") - - def test_normal_mode_logs_safe_request_progress_without_bodies(self) -> None: - with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: - status, _ = post_json( - f"{self.proxy.url}/v1/chat/completions", - first_cursor_request(), - api_key="sk-from-cursor", - ) - - output = "\n".join(captured.output) - stage_records = [ - record - for record in captured.output - if any( - marker in record - for marker in ("┌ cursor", "├ context", "├ send", "└ stats") - ) - ] - self.assertEqual(status, 200) - self.assertEqual(len(stage_records), 4) - self.assertTrue(all("\n" not in record for record in stage_records)) - self.assertIn( - "┌ cursor model=deepseek-v4-pro messages=1 tools=1", - output, - ) - self.assertIn( - "├ context filled=0 missing=0 recovered=0 dropped=0 status=ok", - output, - ) - self.assertIn( - "├ send user_msgs=1 messages=1 tools=1 reasoning_content=0", - output, - ) - self.assertIn( - "└ stats prompt=20 output=5 reasoning=3 cache_hit=60.0%", - output, - ) - self.assertNotIn("What is tomorrow's date?", output) - self.assertNotIn("sk-from-cursor", output) - - def test_verbose_mode_logs_metadata_and_bodies_without_api_key(self) -> None: - self.proxy.server.config = replace(self.proxy.server.config, verbose=True) - - with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: - status, _ = post_json( - f"{self.proxy.url}/v1/chat/completions", - first_cursor_request(), - api_key="sk-from-cursor", - ) - - output = "\n".join(captured.output) - self.assertEqual(status, 200) - self.assertIn("incoming POST /v1/chat/completions", output) - self.assertIn("upstream request metadata", output) - self.assertIn("cursor request body", output) - self.assertIn("upstream request body", output) - self.assertIn("What is tomorrow's date?", output) - self.assertNotIn("sk-from-cursor", output) - - def test_trace_captures_full_non_streaming_replay_without_api_key(self) -> None: - with TemporaryDirectory() as temp_dir: - writer = TraceWriter(temp_dir) - self.proxy.server.trace_writer = writer - - status, payload = post_json( - f"{self.proxy.url}/v1/chat/completions", - first_cursor_request(), - api_key="sk-from-cursor", - ) - - trace = read_single_trace(writer.session_dir) - serialized = json.dumps(trace) - - self.assertEqual(status, 200) - self.assertEqual( - payload["choices"][0]["message"]["tool_calls"][0]["id"], "call_date" - ) - self.assertEqual(trace["completion"]["status"], "completed") - self.assertEqual( - trace["request"]["body"]["messages"][0]["content"], - "What is tomorrow's date?", - ) - self.assertEqual( - trace["transform"]["upstream_request_body"]["model"], - "deepseek-v4-pro", - ) - self.assertEqual( - trace["upstream"]["response"]["body"]["json"]["choices"][0]["message"][ - "reasoning_content" - ], - TOOL_REASONING, - ) - self.assertEqual( - trace["cursor_response"]["body"]["json"]["choices"][0]["message"][ - "reasoning_content" - ], - TOOL_REASONING, - ) - self.assertNotIn("sk-from-cursor", serialized) - - def test_proxy_rejects_missing_cursor_bearer_token(self) -> None: - request = Request( - f"{self.proxy.url}/v1/chat/completions", - data=json.dumps(first_cursor_request()).encode("utf-8"), - method="POST", - headers={"Content-Type": "application/json"}, - ) - - with self.assertRaises(HTTPError) as caught: - urlopen(request, timeout=5) - - self.assertEqual(caught.exception.code, 401) - self.assertEqual(FakeDeepSeekHandler.requests, []) - - def test_proxy_rejects_oversized_request_body(self) -> None: - self.proxy.server.config = replace( - self.proxy.server.config, max_request_body_bytes=10 - ) - - status, payload = post_json( - f"{self.proxy.url}/v1/chat/completions", - first_cursor_request(), - ) - - self.assertEqual(status, 413) - self.assertIn("too large", payload["error"]["message"]) - self.assertEqual(FakeDeepSeekHandler.requests, []) - - def test_proxy_rejects_uncached_cursor_tool_history_in_strict_mode( - self, - ) -> None: - self.proxy.server.config = replace( - self.proxy.server.config, - missing_reasoning_strategy="reject", - ) - - status, payload = post_json( - f"{self.proxy.url}/v1/chat/completions", - second_cursor_request(include_reasoning=False), - ) - - self.assertEqual(status, 409) - self.assertEqual(payload["error"]["missing_reasoning_messages"], 1) - self.assertIn("1 assistant message", payload["error"]["message"]) - self.assertIn("strict missing-reasoning mode", payload["error"]["message"]) - self.assertIn( - "--missing-reasoning-strategy recover", payload["error"]["message"] - ) - self.assertEqual(FakeDeepSeekHandler.requests, []) - - def test_proxy_recovers_uncached_cursor_tool_history(self) -> None: - with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: - status, payload = post_json( - f"{self.proxy.url}/v1/chat/completions", - third_cursor_request_missing_all_reasoning(), - ) - - self.assertEqual(status, 200) - self.assertTrue( - payload["choices"][0]["message"]["content"].startswith( - RECOVERY_NOTICE_CONTENT - ) - ) - self.assertEqual( - [ - message["role"] - for message in FakeDeepSeekHandler.requests[0]["messages"] - ], - ["system", "user"], - ) - self.assertIn( - "recovered this request", - FakeDeepSeekHandler.requests[0]["messages"][0]["content"], - ) - self.assertEqual( - FakeDeepSeekHandler.requests[0]["messages"][1], - {"role": "user", "content": "Thanks, now continue."}, - ) - self.assertIn( - "status=recovered", - "\n".join(captured.output), - ) - self.assertFalse( - any(record.startswith("WARNING:") for record in captured.output) - ) - - def test_trace_captures_recovery_diagnostics(self) -> None: - with TemporaryDirectory() as temp_dir: - writer = TraceWriter(temp_dir) - self.proxy.server.trace_writer = writer - - status, _ = post_json( - f"{self.proxy.url}/v1/chat/completions", - third_cursor_request_missing_all_reasoning(), - ) - - trace = read_single_trace(writer.session_dir) - - self.assertEqual(status, 200) - self.assertEqual(trace["transform"]["recovered_reasoning_messages"], 2) - self.assertEqual( - trace["transform"]["recovery_steps"][0]["strategy"], - "latest_user", - ) - missing_diagnostics = [ - item - for item in trace["transform"]["reasoning_diagnostics"] - if item["missing"] - ] - self.assertGreaterEqual(len(missing_diagnostics), 2) - self.assertIn("lookup_keys", missing_diagnostics[0]) - - def test_proxy_keeps_deepseek_context_after_recovery_boundary(self) -> None: - status, first = post_json( - f"{self.proxy.url}/v1/chat/completions", - third_cursor_request_missing_all_reasoning(), - ) - self.assertEqual(status, 200) - - recovered_assistant = dict(first["choices"][0]["message"]) - recovered_assistant.pop("reasoning_content", None) - payload = third_cursor_request_missing_all_reasoning() - payload["messages"].append(recovered_assistant) - payload["messages"].append( - { - "role": "tool", - "tool_call_id": "call_date", - "content": "2026-04-24", - } - ) - - status, second = post_json(f"{self.proxy.url}/v1/chat/completions", payload) - - self.assertEqual(status, 200) - self.assertFalse( - second["choices"][0]["message"]["content"].startswith( - RECOVERY_NOTICE_CONTENT - ) - ) - second_upstream_messages = FakeDeepSeekHandler.requests[1]["messages"] - self.assertEqual( - [message["role"] for message in second_upstream_messages], - ["system", "user", "assistant", "tool"], - ) - self.assertEqual( - second_upstream_messages[2]["reasoning_content"], TOOL_REASONING - ) - - -class InterleavedConversationTests(unittest.TestCase): - def setUp(self) -> None: - InterleavedFakeDeepSeekHandler.requests = [] - self.upstream = ServerFixture( - ThreadingHTTPServer(("127.0.0.1", 0), InterleavedFakeDeepSeekHandler) - ).start() - self.store = ReasoningStore(":memory:") - proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) - proxy.config = ProxyConfig( - upstream_base_url=self.upstream.url, - upstream_model="deepseek-v4-pro", - ) - proxy.reasoning_store = self.store - self.proxy = ServerFixture(proxy).start() - - def tearDown(self) -> None: - self.proxy.close() - self.upstream.close() - self.store.close() - - def test_one_proxy_repairs_two_interleaved_cursor_threads(self) -> None: - status, first_a = post_json( - f"{self.proxy.url}/v1/chat/completions", interleaved_first_request("A") - ) - self.assertEqual(status, 200) - - status, first_b = post_json( - f"{self.proxy.url}/v1/chat/completions", interleaved_first_request("B") - ) - self.assertEqual(status, 200) - - status, final_b = post_json( - f"{self.proxy.url}/v1/chat/completions", - interleaved_second_request("B", first_b["choices"][0]["message"]), - ) - self.assertEqual(status, 200) - - status, final_a = post_json( - f"{self.proxy.url}/v1/chat/completions", - interleaved_second_request("A", first_a["choices"][0]["message"]), - ) - self.assertEqual(status, 200) - - status, followup_a = post_json( - f"{self.proxy.url}/v1/chat/completions", - interleaved_third_request( - "A", first_a["choices"][0]["message"], final_a["choices"][0]["message"] - ), - ) - self.assertEqual(status, 200) - self.assertEqual( - followup_a["choices"][0]["message"]["content"], - "follow-up accepted for thread A", - ) - - status, followup_b = post_json( - f"{self.proxy.url}/v1/chat/completions", - interleaved_third_request( - "B", first_b["choices"][0]["message"], final_b["choices"][0]["message"] - ), - ) - self.assertEqual(status, 200) - self.assertEqual( - followup_b["choices"][0]["message"]["content"], - "follow-up accepted for thread B", - ) - - final_b_upstream_messages = InterleavedFakeDeepSeekHandler.requests[2][ - "messages" - ] - final_a_upstream_messages = InterleavedFakeDeepSeekHandler.requests[3][ - "messages" - ] - followup_a_upstream_messages = InterleavedFakeDeepSeekHandler.requests[4][ - "messages" - ] - followup_b_upstream_messages = InterleavedFakeDeepSeekHandler.requests[5][ - "messages" - ] - - self.assertEqual( - final_b_upstream_messages[1]["reasoning_content"], - "tool reasoning for thread B", - ) - self.assertEqual( - final_a_upstream_messages[1]["reasoning_content"], - "tool reasoning for thread A", - ) - self.assertEqual( - followup_a_upstream_messages[1]["reasoning_content"], - "tool reasoning for thread A", - ) - self.assertEqual( - followup_a_upstream_messages[3]["reasoning_content"], - "final reasoning for thread A", - ) - self.assertEqual( - followup_b_upstream_messages[1]["reasoning_content"], - "tool reasoning for thread B", - ) - self.assertEqual( - followup_b_upstream_messages[3]["reasoning_content"], - "final reasoning for thread B", - ) - - -class StreamingProxyTests(unittest.TestCase): - def setUp(self) -> None: - self.upstream = ServerFixture( - ThreadingHTTPServer(("127.0.0.1", 0), SlowAfterDoneStreamingDeepSeekHandler) - ).start() - self.store = ReasoningStore(":memory:") - proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) - proxy.config = ProxyConfig( - upstream_base_url=self.upstream.url, - upstream_model="deepseek-v4-pro", - ) - proxy.reasoning_store = self.store - self.proxy = ServerFixture(proxy).start() - - def tearDown(self) -> None: - self.proxy.close() - self.upstream.close() - self.store.close() - - def test_streaming_proxy_closes_after_done_even_if_upstream_stays_open( - self, - ) -> None: - request = Request( - f"{self.proxy.url}/v1/chat/completions", - data=json.dumps( - { - "model": "deepseek-v4-pro", - "stream": True, - "messages": [{"role": "user", "content": "stream"}], - } - ).encode("utf-8"), - method="POST", - headers={ - "Authorization": "Bearer sk-cursor-test", - "Content-Type": "application/json", - }, - ) - - started = time.monotonic() - with urlopen(request, timeout=1) as response: - body = response.read().decode("utf-8") - elapsed = time.monotonic() - started - - self.assertLess(elapsed, 1) - self.assertIn("data: [DONE]", body) - - -class ReasoningStreamingProxyTests(unittest.TestCase): - def setUp(self) -> None: - self.upstream = ServerFixture( - ThreadingHTTPServer(("127.0.0.1", 0), ReasoningStreamingDeepSeekHandler) - ).start() - self.store = ReasoningStore(":memory:") - proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) - proxy.config = ProxyConfig( - upstream_base_url=self.upstream.url, - upstream_model="deepseek-v4-pro", - ) - proxy.reasoning_store = self.store - self.proxy = ServerFixture(proxy).start() - - def tearDown(self) -> None: - self.proxy.close() - self.upstream.close() - self.store.close() - - def test_streaming_proxy_mirrors_reasoning_for_cursor_display( - self, - ) -> None: - request_messages = [{"role": "user", "content": "stream reasoning"}] - request = Request( - f"{self.proxy.url}/v1/chat/completions", - data=json.dumps( - { - "model": "deepseek-v4-pro", - "stream": True, - "messages": request_messages, - } - ).encode("utf-8"), - method="POST", - headers={ - "Authorization": "Bearer sk-cursor-test", - "Content-Type": "application/json", - }, - ) - - with urlopen(request, timeout=2) as response: - body = response.read().decode("utf-8") - - chunks = [ - json.loads(line.removeprefix("data: ")) - for line in body.splitlines() - if line.startswith("data: {") - ] - self.assertEqual( - chunks[0]["choices"][0]["delta"]["content"], - "
\nThinking\n\nNeed ", - ) - self.assertEqual(chunks[0]["choices"][0]["delta"]["reasoning_content"], "Need ") - self.assertEqual(chunks[1]["choices"][0]["delta"]["content"], "context.") - self.assertEqual( - chunks[2]["choices"][0]["delta"]["content"], - "\n
\n\n" + FINAL_CONTENT, - ) - - stored_message = { - "role": "assistant", - "content": FINAL_CONTENT, - "reasoning_content": "Need context.", - } - cache_namespace = reasoning_cache_namespace( - self.proxy.server.config, - "deepseek-v4-pro", - {"type": "enabled"}, - "high", - "Bearer sk-cursor-test", - ) - self.assertEqual( - self.store.get( - "scope:" - + conversation_scope(request_messages, cache_namespace) - + ":signature:" - + message_signature(stored_message) - ), - "Need context.", - ) - - def test_trace_captures_streaming_replay_chunks(self) -> None: - with TemporaryDirectory() as temp_dir: - writer = TraceWriter(temp_dir) - self.proxy.server.trace_writer = writer - request = Request( - f"{self.proxy.url}/v1/chat/completions", - data=json.dumps( - { - "model": "deepseek-v4-pro", - "stream": True, - "messages": [{"role": "user", "content": "stream reasoning"}], - } - ).encode("utf-8"), - method="POST", - headers={ - "Authorization": "Bearer sk-cursor-test", - "Content-Type": "application/json", - }, - ) - - with urlopen(request, timeout=2) as response: - response.read() - - trace = read_single_trace(writer.session_dir) - - self.assertEqual(trace["completion"]["status"], "completed") - self.assertIn( - "reasoning_content", - trace["upstream"]["stream"]["chunks"][0]["line"], - ) - self.assertIn( - "
", - trace["cursor_response"]["stream"]["chunks"][0]["line"], - ) - self.assertEqual( - trace["upstream"]["usage"]["completion_tokens_details"]["reasoning_tokens"], - 3, - ) - - def test_streaming_recovery_notice_is_visible_in_cursor_content(self) -> None: - payload = third_cursor_request_missing_all_reasoning() - payload["stream"] = True - request = Request( - f"{self.proxy.url}/v1/chat/completions", - data=json.dumps(payload).encode("utf-8"), - method="POST", - headers={ - "Authorization": "Bearer sk-cursor-test", - "Content-Type": "application/json", - }, - ) - - with urlopen(request, timeout=2) as response: - body = response.read().decode("utf-8") - - chunks = [ - json.loads(line.removeprefix("data: ")) - for line in body.splitlines() - if line.startswith("data: {") - ] - self.assertEqual( - chunks[2]["choices"][0]["delta"]["content"], - "\n
\n\n" + RECOVERY_NOTICE_CONTENT + FINAL_CONTENT, - ) - - -class StreamingToolRaceProxyTests(unittest.TestCase): - def setUp(self) -> None: - ToolCallStreamingBeforeDoneDeepSeekHandler.requests = [] - self.upstream = ServerFixture( - ThreadingHTTPServer( - ("127.0.0.1", 0), ToolCallStreamingBeforeDoneDeepSeekHandler - ) - ).start() - self.store = ReasoningStore(":memory:") - proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) - proxy.config = ProxyConfig( - upstream_base_url=self.upstream.url, - upstream_model="deepseek-v4-pro", - ) - proxy.reasoning_store = self.store - self.proxy = ServerFixture(proxy).start() - - def tearDown(self) -> None: - self.proxy.close() - self.upstream.close() - self.store.close() - - def test_streaming_tool_reasoning_is_available_before_done(self) -> None: - request_messages = [{"role": "user", "content": "stream tool"}] - request = Request( - f"{self.proxy.url}/v1/chat/completions", - data=json.dumps( - { - "model": "deepseek-v4-pro", - "stream": True, - "messages": request_messages, - "tools": [ - { - "type": "function", - "function": { - "name": "lookup", - "parameters": {"type": "object", "properties": {}}, - }, - } - ], - } - ).encode("utf-8"), - method="POST", - headers={ - "Authorization": "Bearer sk-cursor-test", - "Content-Type": "application/json", - }, - ) - - with urlopen(request, timeout=3) as response: - while True: - line = response.readline().decode("utf-8") - self.assertNotEqual(line, "") - if '"finish_reason":"tool_calls"' in line: - break - - status, payload = post_json( - f"{self.proxy.url}/v1/chat/completions", - { - "model": "deepseek-v4-pro", - "messages": [ - *request_messages, - { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_stream_tool", - "type": "function", - "function": { - "name": "lookup", - "arguments": "{}", - }, - } - ], - }, - { - "role": "tool", - "tool_call_id": "call_stream_tool", - "content": "tool result", - }, - ], - "tools": [ - { - "type": "function", - "function": { - "name": "lookup", - "parameters": {"type": "object", "properties": {}}, - }, - } - ], - }, - ) - response.read() - - self.assertEqual(status, 200, payload) - self.assertEqual( - payload["choices"][0]["message"]["content"], "stream follow-up accepted" - ) - - def test_streaming_tool_reasoning_is_available_before_finish_reason(self) -> None: - request_messages = [{"role": "user", "content": "stream tool"}] - request = Request( - f"{self.proxy.url}/v1/chat/completions", - data=json.dumps( - { - "model": "deepseek-v4-pro", - "stream": True, - "messages": request_messages, - "tools": [ - { - "type": "function", - "function": { - "name": "lookup", - "parameters": {"type": "object", "properties": {}}, - }, - } - ], - } - ).encode("utf-8"), - method="POST", - headers={ - "Authorization": "Bearer sk-cursor-test", - "Content-Type": "application/json", - }, - ) - - with urlopen(request, timeout=3) as response: - while True: - line = response.readline().decode("utf-8") - self.assertNotEqual(line, "") - if '"tool_calls"' in line: - break - - status, payload = post_json( - f"{self.proxy.url}/v1/chat/completions", - { - "model": "deepseek-v4-pro", - "messages": [ - *request_messages, - { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_stream_tool", - "type": "function", - "function": { - "name": "lookup", - "arguments": "{}", - }, - } - ], - }, - { - "role": "tool", - "tool_call_id": "call_stream_tool", - "content": "tool result", - }, - ], - "tools": [ - { - "type": "function", - "function": { - "name": "lookup", - "parameters": {"type": "object", "properties": {}}, - }, - } - ], - }, - ) - response.read() - - self.assertEqual(status, 200, payload) - self.assertEqual( - payload["choices"][0]["message"]["content"], "stream follow-up accepted" - ) - - -def first_cursor_request() -> dict: - return { - "model": "deepseek-v4-pro", - "messages": [{"role": "user", "content": "What is tomorrow's date?"}], - "tools": [ - { - "type": "function", - "function": { - "name": "get_date", - "description": "Get the current date", - "parameters": {"type": "object", "properties": {}}, - }, - } - ], - } - - -def second_cursor_request(include_reasoning: bool) -> dict: - assistant_message = { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_date", - "type": "function", - "function": {"name": "get_date", "arguments": "{}"}, - } - ], - } - if include_reasoning: - assistant_message["reasoning_content"] = TOOL_REASONING - return { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "What is tomorrow's date?"}, - assistant_message, - {"role": "tool", "tool_call_id": "call_date", "content": "2026-04-24"}, - ], - "tools": first_cursor_request()["tools"], - } - - -def third_cursor_request_missing_all_reasoning() -> dict: - payload = second_cursor_request(include_reasoning=False) - payload["messages"].append({"role": "assistant", "content": FINAL_CONTENT}) - payload["messages"].append({"role": "user", "content": "Thanks, now continue."}) - return payload - - -def thread_name_from_messages(messages: list[dict]) -> str | None: - if not messages: - return None - content = str(messages[0].get("content") or "") - if "thread A" in content: - return "A" - if "thread B" in content: - return "B" - return None - - -def interleaved_first_request(thread_name: str) -> dict: - return { - "model": "deepseek-v4-pro", - "messages": [{"role": "user", "content": f"Start thread {thread_name}."}], - "tools": [ - { - "type": "function", - "function": { - "name": "lookup", - "description": "Return a value.", - "parameters": {"type": "object", "properties": {}}, - }, - } - ], - } - - -def interleaved_second_request(thread_name: str, assistant_message: dict) -> dict: - cursor_assistant = dict(assistant_message) - cursor_assistant.pop("reasoning_content", None) - return { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": f"Start thread {thread_name}."}, - cursor_assistant, - { - "role": "tool", - "tool_call_id": "call_reused", - "content": f"tool result for thread {thread_name}", - }, - ], - "tools": interleaved_first_request(thread_name)["tools"], - } - - -def interleaved_third_request( - thread_name: str, tool_assistant_message: dict, final_message: dict -) -> dict: - payload = interleaved_second_request(thread_name, tool_assistant_message) - cursor_final = dict(final_message) - cursor_final.pop("reasoning_content", None) - payload["messages"].append(cursor_final) - payload["messages"].append( - {"role": "user", "content": f"Follow up in thread {thread_name}."} - ) - return payload - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_server.py b/tests/test_server.py index 69dd7da..7788229 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -1,24 +1,45 @@ +"""Server boundary, CLI, and operational tests. + +Pure helper tests (gzip, summarize) and stub-handler tests (client +disconnect) live near the top. The bottom of the file boots a real proxy + +tiny upstream to exercise things that need the HTTP layer: bearer token +forwarding, oversized body, missing-bearer rejection, logging modes, and +streaming connection close. +""" + from __future__ import annotations +from dataclasses import replace +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from io import BytesIO import gzip import json from pathlib import Path +import threading +import time from types import SimpleNamespace import unittest import zlib +from urllib.error import HTTPError +from urllib.request import Request, urlopen from deepseek_cursor_proxy.config import ProxyConfig from deepseek_cursor_proxy.reasoning_store import ReasoningStore from deepseek_cursor_proxy.server import ( DeepSeekProxyHandler, + DeepSeekProxyServer, build_arg_parser, read_response_body, summarize_chat_payload, ) -class FakeResponse: +# --------------------------------------------------------------------------- +# Stubs for fast in-process tests of internal handler methods +# --------------------------------------------------------------------------- + + +class _FakeResponse: def __init__(self, body: bytes, encoding: str = "", status: int = 200) -> None: self._body = BytesIO(body) self.headers = {"Content-Encoding": encoding} if encoding else {} @@ -28,7 +49,7 @@ class FakeResponse: return self._body.read() -class FakeStreamingResponse: +class _FakeStreamingResponse: status = 200 headers = {"Content-Type": "text/event-stream"} @@ -43,7 +64,7 @@ class FakeStreamingResponse: return self._lines.pop(0) -class FailingStreamingResponse: +class _FailingStreamingResponse: status = 200 headers = {"Content-Type": "text/event-stream"} @@ -51,7 +72,7 @@ class FailingStreamingResponse: raise OSError("record layer failure") -class BrokenPipeWfile: +class _BrokenPipeWfile: def write(self, body: bytes) -> None: raise BrokenPipeError("test disconnect") @@ -59,10 +80,10 @@ class BrokenPipeWfile: raise BrokenPipeError("test disconnect") -def make_proxy_handler(wfile: object) -> DeepSeekProxyHandler: +def _make_handler_stub(wfile: object, **config: object) -> DeepSeekProxyHandler: handler = object.__new__(DeepSeekProxyHandler) handler.server = SimpleNamespace( - config=ProxyConfig(), + config=ProxyConfig(**config), reasoning_store=ReasoningStore(":memory:"), ) handler.wfile = wfile @@ -73,8 +94,13 @@ def make_proxy_handler(wfile: object) -> DeepSeekProxyHandler: return handler -class ServerTests(unittest.TestCase): - def test_cli_boolean_overrides_have_on_and_off_forms(self) -> None: +# --------------------------------------------------------------------------- +# CLI / pure helpers +# --------------------------------------------------------------------------- + + +class CliAndHelperTests(unittest.TestCase): + def test_cli_boolean_flags_have_on_and_off_forms(self) -> None: args = build_arg_parser().parse_args( [ "--no-ngrok", @@ -86,7 +112,6 @@ class ServerTests(unittest.TestCase): "/tmp/dcp-traces", ] ) - self.assertFalse(args.ngrok) self.assertFalse(args.verbose) self.assertFalse(args.display_reasoning) @@ -94,19 +119,17 @@ class ServerTests(unittest.TestCase): self.assertTrue(args.cors) self.assertEqual(args.trace_dir, Path("/tmp/dcp-traces")) - def test_read_response_body_handles_gzip(self) -> None: - body = gzip.compress(b'{"ok":true}') - - self.assertEqual(read_response_body(FakeResponse(body, "gzip")), b'{"ok":true}') - - def test_read_response_body_handles_deflate(self) -> None: - body = zlib.compress(b'{"ok":true}') - + def test_read_response_body_decodes_gzip_and_deflate(self) -> None: self.assertEqual( - read_response_body(FakeResponse(body, "deflate")), b'{"ok":true}' + read_response_body(_FakeResponse(gzip.compress(b'{"ok":1}'), "gzip")), + b'{"ok":1}', + ) + self.assertEqual( + read_response_body(_FakeResponse(zlib.compress(b'{"ok":1}'), "deflate")), + b'{"ok":1}', ) - def test_summarize_chat_payload_does_not_include_message_content(self) -> None: + def test_summarize_chat_payload_omits_message_content(self) -> None: summary = summarize_chat_payload( { "model": "deepseek-v4-pro", @@ -116,18 +139,22 @@ class ServerTests(unittest.TestCase): "tool_choice": "auto", } ) - self.assertIn("model='deepseek-v4-pro'", summary) - self.assertIn("stream=True", summary) self.assertIn("messages=1", summary) - self.assertIn("tools=1", summary) self.assertNotIn("secret prompt", summary) + +# --------------------------------------------------------------------------- +# Client-disconnect / upstream-failure stubs (no real HTTP needed) +# --------------------------------------------------------------------------- + + +class HandlerStubTests(unittest.TestCase): def test_regular_response_handles_client_disconnect(self) -> None: - handler = make_proxy_handler(BrokenPipeWfile()) + handler = _make_handler_stub(_BrokenPipeWfile()) body = json.dumps( { - "id": "chatcmpl-test", + "id": "x", "object": "chat.completion", "model": "deepseek-v4-pro", "choices": [ @@ -139,116 +166,324 @@ class ServerTests(unittest.TestCase): ], } ).encode("utf-8") - try: with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: - sent = handler._proxy_regular_response( - FakeResponse(body), + result = handler._proxy_regular_response( + _FakeResponse(body), "deepseek-v4-pro", [{"role": "user", "content": "hi"}], - "cache-namespace", + "ns", ) finally: handler.server.reasoning_store.close() - - self.assertFalse(sent.sent) + self.assertFalse(result.sent) self.assertIn("sending upstream response body", "\n".join(captured.output)) def test_streaming_response_stops_on_client_disconnect(self) -> None: - handler = make_proxy_handler(BrokenPipeWfile()) + handler = _make_handler_stub(_BrokenPipeWfile()) chunk = { - "id": "chatcmpl-stream", + "id": "stream", "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "delta": {"role": "assistant", "content": "hello"}, - } - ], + "choices": [{"index": 0, "delta": {"role": "assistant", "content": "hi"}}], } - response = FakeStreamingResponse( + response = _FakeStreamingResponse( [ f"data: {json.dumps(chunk)}\n\n".encode("utf-8"), b"data: [DONE]\n\n", ] ) - try: with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: - sent = handler._proxy_streaming_response( + result = handler._proxy_streaming_response( response, "deepseek-v4-pro", [{"role": "user", "content": "hi"}], - "cache-namespace", + "ns", ) finally: handler.server.reasoning_store.close() - - self.assertFalse(sent.sent) + self.assertFalse(result.sent) self.assertEqual(response.readline_calls, 1) self.assertIn("sending streaming response chunk", "\n".join(captured.output)) def test_streaming_response_handles_upstream_read_failure(self) -> None: - handler = make_proxy_handler(BytesIO()) - + handler = _make_handler_stub(BytesIO()) try: with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: - sent = handler._proxy_streaming_response( - FailingStreamingResponse(), + result = handler._proxy_streaming_response( + _FailingStreamingResponse(), "deepseek-v4-pro", [{"role": "user", "content": "hi"}], - "cache-namespace", + "ns", ) finally: handler.server.reasoning_store.close() - - self.assertFalse(sent.sent) + self.assertFalse(result.sent) self.assertIn( - "upstream streaming response read failed", - "\n".join(captured.output), + "upstream streaming response read failed", "\n".join(captured.output) ) - def test_collapsible_reasoning_has_no_effect_when_display_is_disabled( - self, - ) -> None: + def test_collapsible_reasoning_no_effect_when_display_disabled(self) -> None: wfile = BytesIO() - handler = make_proxy_handler(wfile) - handler.server.config = ProxyConfig( - display_reasoning=False, - collapsible_reasoning=True, + handler = _make_handler_stub( + wfile, display_reasoning=False, collapsible_reasoning=True ) chunk = { - "id": "chatcmpl-stream", + "id": "stream", "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "delta": {"reasoning_content": "Need context."}, - } - ], + "choices": [{"index": 0, "delta": {"reasoning_content": "Need context."}}], } - response = FakeStreamingResponse( + response = _FakeStreamingResponse( [ f"data: {json.dumps(chunk)}\n\n".encode("utf-8"), b"data: [DONE]\n\n", ] ) - try: - sent = handler._proxy_streaming_response( + handler._proxy_streaming_response( response, "deepseek-v4-pro", [{"role": "user", "content": "hi"}], - "cache-namespace", + "ns", ) finally: handler.server.reasoning_store.close() - body = wfile.getvalue().decode("utf-8") - self.assertTrue(sent.sent) self.assertIn("reasoning_content", body) self.assertNotIn("
", body) - self.assertNotIn("", body) + + +# --------------------------------------------------------------------------- +# HTTP-level boundary tests: real proxy + tiny upstream +# --------------------------------------------------------------------------- + + +class _PlainFakeUpstream(BaseHTTPRequestHandler): + """Returns a fixed plain response and records every request.""" + + requests: list[dict[str, object]] = [] + auth_headers: list[str] = [] + delay_after_done: float = 0.0 + response: dict[str, object] = {} + + def log_message(self, fmt: str, *args: object) -> None: + return + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length") or 0) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + self.__class__.requests.append(payload) + self.__class__.auth_headers.append(self.headers.get("Authorization", "")) + + if payload.get("stream"): + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.end_headers() + self.wfile.write( + b'data: {"choices":[{"index":0,"delta":{"content":"x"}}]}\n\n' + ) + self.wfile.write(b"data: [DONE]\n\n") + self.wfile.flush() + if self.__class__.delay_after_done: + time.sleep(self.__class__.delay_after_done) + return + + body = json.dumps(self.__class__.response).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + +_BASE_RESPONSE: dict[str, object] = { + "id": "x", + "object": "chat.completion", + "created": 1, + "model": "deepseek-v4-pro", + "choices": [ + { + "index": 0, + "finish_reason": "stop", + "message": {"role": "assistant", "content": "ok"}, + } + ], + "usage": { + "prompt_tokens": 20, + "completion_tokens": 5, + "total_tokens": 25, + "prompt_cache_hit_tokens": 12, + "prompt_cache_miss_tokens": 8, + "completion_tokens_details": {"reasoning_tokens": 3}, + }, +} + + +class _Fixture: + def __init__(self, server: ThreadingHTTPServer) -> None: + self.server = server + self.thread = threading.Thread(target=server.serve_forever, daemon=True) + self.thread.start() + + @property + def url(self) -> str: + host, port = self.server.server_address + return f"http://{host}:{port}" + + def close(self) -> None: + self.server.shutdown() + self.server.server_close() + self.thread.join(timeout=5) + + +def _post(url: str, payload: dict, api_key: str = "sk-test") -> tuple[int, dict]: + request = Request( + url, + data=json.dumps(payload).encode("utf-8"), + method="POST", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + ) + try: + with urlopen(request, timeout=5) as response: + return response.status, json.loads(response.read().decode("utf-8")) + except HTTPError as exc: + return exc.code, json.loads(exc.read().decode("utf-8")) + + +class HttpBoundaryTests(unittest.TestCase): + """Real-HTTP tests that don't fit the protocol suite: things the proxy + must do at the HTTP boundary regardless of what DeepSeek answers.""" + + def setUp(self) -> None: + _PlainFakeUpstream.requests = [] + _PlainFakeUpstream.auth_headers = [] + _PlainFakeUpstream.delay_after_done = 0.0 + _PlainFakeUpstream.response = dict(_BASE_RESPONSE) + self.upstream = _Fixture( + ThreadingHTTPServer(("127.0.0.1", 0), _PlainFakeUpstream) + ) + self.store = ReasoningStore(":memory:") + proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) + proxy.config = ProxyConfig( + upstream_base_url=self.upstream.url, + upstream_model="deepseek-v4-pro", + ngrok=False, + ) + proxy.reasoning_store = self.store + self.proxy = _Fixture(proxy) + + def tearDown(self) -> None: + self.proxy.close() + self.upstream.close() + self.store.close() + + def _request(self) -> dict: + return { + "model": "deepseek-v4-pro", + "messages": [{"role": "user", "content": "hi"}], + } + + def test_rejects_missing_bearer_token(self) -> None: + request = Request( + f"{self.proxy.url}/v1/chat/completions", + data=json.dumps(self._request()).encode("utf-8"), + method="POST", + headers={"Content-Type": "application/json"}, + ) + with self.assertRaises(HTTPError) as caught: + urlopen(request, timeout=5) + self.assertEqual(caught.exception.code, 401) + self.assertEqual(_PlainFakeUpstream.requests, []) + + def test_rejects_oversized_request_body(self) -> None: + self.proxy.server.config = replace( + self.proxy.server.config, max_request_body_bytes=10 + ) + status, payload = _post( + f"{self.proxy.url}/v1/chat/completions", self._request() + ) + self.assertEqual(status, 413) + self.assertIn("too large", payload["error"]["message"]) + self.assertEqual(_PlainFakeUpstream.requests, []) + + def test_forwards_bearer_token_to_upstream(self) -> None: + status, _ = _post( + f"{self.proxy.url}/v1/chat/completions", + self._request(), + api_key="sk-from-cursor", + ) + self.assertEqual(status, 200) + self.assertEqual(_PlainFakeUpstream.auth_headers[0], "Bearer sk-from-cursor") + + def test_streaming_response_closes_after_done_when_upstream_lingers( + self, + ) -> None: + """Cursor relies on the proxy ending the SSE stream at [DONE], even + if the upstream socket stays open.""" + _PlainFakeUpstream.delay_after_done = 2.0 + request = Request( + f"{self.proxy.url}/v1/chat/completions", + data=json.dumps( + { + "model": "deepseek-v4-pro", + "stream": True, + "messages": [{"role": "user", "content": "stream"}], + } + ).encode("utf-8"), + method="POST", + headers={ + "Authorization": "Bearer sk-test", + "Content-Type": "application/json", + }, + ) + started = time.monotonic() + with urlopen(request, timeout=1) as response: + body = response.read().decode("utf-8") + self.assertLess(time.monotonic() - started, 1.0) + self.assertIn("data: [DONE]", body) + + def test_normal_logging_summarizes_without_bodies_or_keys(self) -> None: + with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: + status, _ = _post( + f"{self.proxy.url}/v1/chat/completions", + self._request(), + api_key="sk-from-cursor", + ) + # `└ stats` is emitted on the handler thread *after* the response + # body hits the socket, so the client may return before it lands. + deadline = time.monotonic() + 2 + while time.monotonic() < deadline and not any( + "└ stats" in record for record in captured.output + ): + time.sleep(0.01) + output = "\n".join(captured.output) + self.assertEqual(status, 200) + # Single-line stage records keep the log readable. + for marker in ("┌ cursor", "├ context", "├ send", "└ stats"): + self.assertIn(marker, output) + self.assertNotIn("hi", output.split("┌ cursor")[1].split("\n")[0]) + self.assertNotIn("sk-from-cursor", output) + + def test_verbose_logging_includes_bodies_but_redacts_api_key(self) -> None: + self.proxy.server.config = replace(self.proxy.server.config, verbose=True) + with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: + _post( + f"{self.proxy.url}/v1/chat/completions", + self._request(), + api_key="sk-from-cursor", + ) + output = "\n".join(captured.output) + self.assertIn("cursor request body", output) + self.assertIn("upstream request body", output) + self.assertNotIn("sk-from-cursor", output) + + def test_healthz_returns_ok(self) -> None: + with urlopen(f"{self.proxy.url}/healthz", timeout=2) as response: + self.assertEqual(response.status, 200) + self.assertEqual(json.loads(response.read())["ok"], True) if __name__ == "__main__": diff --git a/tests/test_streaming.py b/tests/test_streaming.py index d2019b1..9f594f7 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -6,6 +6,7 @@ from deepseek_cursor_proxy.reasoning_store import ReasoningStore, conversation_s from deepseek_cursor_proxy.streaming import ( CursorReasoningDisplayAdapter, StreamAccumulator, + fold_reasoning_into_content, ) @@ -430,5 +431,44 @@ class CursorReasoningDisplayAdapterTests(unittest.TestCase): self.assertIsNone(adapter.flush_chunk("deepseek-v4-pro")) +class FoldReasoningTests(unittest.TestCase): + def test_fold_reasoning_into_non_streaming_content(self) -> None: + """Non-streaming responses mirror reasoning_content into a visible +
block, matching the streaming layout.""" + payload = { + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "answer", + "reasoning_content": "thinking", + }, + } + ] + } + fold_reasoning_into_content(payload, collapsible=True) + self.assertEqual( + payload["choices"][0]["message"]["content"], + "
\nThinking\n\nthinking\n
\n\nanswer", + ) + + def test_fold_reasoning_skips_empty_reasoning(self) -> None: + payload = { + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "answer", + "reasoning_content": "", + }, + } + ] + } + fold_reasoning_into_content(payload, collapsible=True) + self.assertEqual(payload["choices"][0]["message"]["content"], "answer") + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_trace.py b/tests/test_trace.py index d2be77b..261c35d 100644 --- a/tests/test_trace.py +++ b/tests/test_trace.py @@ -1,14 +1,25 @@ +"""Trace writer tests, both as a unit (writes/redacts files) and integrated +through the proxy (captures real request flow on disk).""" + from __future__ import annotations +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import json +from pathlib import Path import stat +import threading from tempfile import TemporaryDirectory +import time import unittest +from urllib.request import Request, urlopen +from deepseek_cursor_proxy.config import ProxyConfig +from deepseek_cursor_proxy.reasoning_store import ReasoningStore +from deepseek_cursor_proxy.server import DeepSeekProxyHandler, DeepSeekProxyServer from deepseek_cursor_proxy.trace import TraceWriter -class TraceWriterTests(unittest.TestCase): +class TraceWriterUnitTests(unittest.TestCase): def test_writes_manifest_and_numbered_request_files(self) -> None: with TemporaryDirectory() as temp_dir: writer = TraceWriter(temp_dir) @@ -47,17 +58,244 @@ class TraceWriterTests(unittest.TestCase): headers={"Authorization": "Bearer sk-secret"}, ) trace.finish("completed", http_status=200) - - payload = json.loads(trace.path.read_text(encoding="utf-8")) - serialized = json.dumps(payload) - + serialized = trace.path.read_text(encoding="utf-8") self.assertNotIn("sk-secret", serialized) + payload = json.loads(serialized) self.assertEqual( - payload["request"]["headers"]["Authorization"]["present"], - True, + payload["request"]["headers"]["Authorization"]["present"], True ) self.assertIn("sha256", payload["request"]["headers"]["Authorization"]) +# --------------------------------------------------------------------------- +# Integration: trace writer attached to a running proxy. +# --------------------------------------------------------------------------- + + +class _CannedUpstream(BaseHTTPRequestHandler): + """Returns a tool-call response for the first POST and a streamed + reasoning response for the second.""" + + requests: list[dict[str, object]] = [] + + def log_message(self, fmt: str, *args: object) -> None: + return + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length") or 0) + payload = json.loads(self.rfile.read(length).decode("utf-8")) + self.__class__.requests.append(payload) + + if payload.get("stream"): + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.end_headers() + self.wfile.write( + b'data: {"id":"s","object":"chat.completion.chunk","choices":' + b'[{"index":0,"delta":{"role":"assistant","reasoning_content":"think"},' + b'"finish_reason":null}]}\n\n' + ) + self.wfile.write( + b'data: {"id":"s","object":"chat.completion.chunk","choices":' + b'[{"index":0,"delta":{"content":"answer"},"finish_reason":null}],' + b'"usage":{"completion_tokens_details":{"reasoning_tokens":1}}}\n\n' + ) + self.wfile.write( + b'data: {"id":"s","object":"chat.completion.chunk",' + b'"choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}\n\n' + ) + self.wfile.write(b"data: [DONE]\n\n") + self.wfile.flush() + return + + body = json.dumps( + { + "id": "tool", + "object": "chat.completion", + "model": "deepseek-v4-pro", + "choices": [ + { + "index": 0, + "finish_reason": "tool_calls", + "message": { + "role": "assistant", + "content": "", + "reasoning_content": "I need the date.", + "tool_calls": [ + { + "id": "call_date", + "type": "function", + "function": { + "name": "get_date", + "arguments": "{}", + }, + } + ], + }, + } + ], + } + ).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + +class _Fixture: + def __init__(self, server: ThreadingHTTPServer) -> None: + self.server = server + self.thread = threading.Thread(target=server.serve_forever, daemon=True) + self.thread.start() + + @property + def url(self) -> str: + host, port = self.server.server_address + return f"http://{host}:{port}" + + def close(self) -> None: + self.server.shutdown() + self.server.server_close() + self.thread.join(timeout=5) + + +def _read_single_trace(session_dir: Path) -> dict: + deadline = time.monotonic() + 2 + files = sorted(session_dir.glob("request-*.json")) + while not files and time.monotonic() < deadline: + time.sleep(0.01) + files = sorted(session_dir.glob("request-*.json")) + if len(files) != 1: + raise AssertionError(f"expected one trace, found {files}") + return json.loads(files[0].read_text(encoding="utf-8")) + + +class TraceIntegrationTests(unittest.TestCase): + def setUp(self) -> None: + _CannedUpstream.requests = [] + self.upstream = _Fixture(ThreadingHTTPServer(("127.0.0.1", 0), _CannedUpstream)) + self.store = ReasoningStore(":memory:") + self.temp_dir = TemporaryDirectory() + self.writer = TraceWriter(self.temp_dir.name) + proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) + proxy.config = ProxyConfig( + upstream_base_url=self.upstream.url, + upstream_model="deepseek-v4-pro", + ngrok=False, + ) + proxy.reasoning_store = self.store + proxy.trace_writer = self.writer + self.proxy = _Fixture(proxy) + + def tearDown(self) -> None: + self.proxy.close() + self.upstream.close() + self.store.close() + self.temp_dir.cleanup() + + def _post(self, payload: dict) -> dict: + request = Request( + f"{self.proxy.url}/v1/chat/completions", + data=json.dumps(payload).encode("utf-8"), + method="POST", + headers={ + "Authorization": "Bearer sk-from-cursor", + "Content-Type": "application/json", + }, + ) + with urlopen(request, timeout=5) as response: + return json.loads(response.read()) + + def test_captures_non_streaming_replay_without_api_key(self) -> None: + self._post( + { + "model": "deepseek-v4-pro", + "messages": [{"role": "user", "content": "What is tomorrow's date?"}], + } + ) + trace = _read_single_trace(self.writer.session_dir) + serialized = json.dumps(trace) + self.assertEqual(trace["completion"]["status"], "completed") + self.assertEqual( + trace["request"]["body"]["messages"][0]["content"], + "What is tomorrow's date?", + ) + self.assertEqual( + trace["upstream"]["response"]["body"]["json"]["choices"][0]["message"][ + "reasoning_content" + ], + "I need the date.", + ) + self.assertNotIn("sk-from-cursor", serialized) + + def test_captures_streaming_replay_chunks(self) -> None: + request = Request( + f"{self.proxy.url}/v1/chat/completions", + data=json.dumps( + { + "model": "deepseek-v4-pro", + "stream": True, + "messages": [{"role": "user", "content": "stream"}], + } + ).encode("utf-8"), + method="POST", + headers={ + "Authorization": "Bearer sk-test", + "Content-Type": "application/json", + }, + ) + with urlopen(request, timeout=2) as response: + response.read() + trace = _read_single_trace(self.writer.session_dir) + self.assertEqual(trace["completion"]["status"], "completed") + self.assertIn( + "reasoning_content", + trace["upstream"]["stream"]["chunks"][0]["line"], + ) + self.assertIn( + "
", trace["cursor_response"]["stream"]["chunks"][0]["line"] + ) + + def test_captures_recovery_diagnostics(self) -> None: + """A request that triggers cold-cache recovery records the recovery + steps + diagnostic counters in the trace.""" + self._post( + { + "model": "deepseek-v4-pro", + "messages": [ + {"role": "user", "content": "old"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_x", + "type": "function", + "function": {"name": "f", "arguments": "{}"}, + } + ], + }, + {"role": "tool", "tool_call_id": "call_x", "content": "result"}, + {"role": "user", "content": "new"}, + ], + } + ) + trace = _read_single_trace(self.writer.session_dir) + self.assertEqual( + trace["transform"]["recovery_steps"][0]["strategy"], "latest_user" + ) + self.assertGreaterEqual( + len( + [ + item + for item in trace["transform"]["reasoning_diagnostics"] + if item["missing"] + ] + ), + 1, + ) + + if __name__ == "__main__": unittest.main() diff --git a/tests/test_transform.py b/tests/test_transform.py index 6e8aac2..649a409 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -1,3 +1,11 @@ +"""Pure-function unit tests for transform.py. + +Anything that requires a fake DeepSeek upstream lives in test_protocol.py. +This file only exercises helpers that take dicts/strings and return +dicts/strings — content extraction, request normalization, response rewrite, +recovery-notice stripping, and warning behaviour for dropped fields. +""" + from __future__ import annotations import json @@ -11,1429 +19,239 @@ from deepseek_cursor_proxy.reasoning_store import ( ) from deepseek_cursor_proxy.transform import ( RECOVERY_NOTICE_CONTENT, + RECOVERY_NOTICE_TEXT, extract_text_content, + normalize_reasoning_effort, prepare_upstream_request, - reasoning_cache_namespace, rewrite_response_body, strip_cursor_thinking_blocks, + strip_recovery_notice_for_upstream, ) -DEFAULT_CONFIG = ProxyConfig() -DEFAULT_CACHE_NAMESPACE = reasoning_cache_namespace( - DEFAULT_CONFIG, - "deepseek-v4-pro", - {"type": "enabled"}, - "high", -) +class ContentHelpersTests(unittest.TestCase): + def test_extract_text_content_flattens_multipart_array(self) -> None: + content = [ + {"type": "text", "text": "hello"}, + {"type": "image_url", "image_url": {"url": "data:..."}}, + {"type": "input_text", "text": "world"}, + ] + self.assertEqual( + extract_text_content(content), + "hello\n[image_url omitted by DeepSeek text proxy]\nworld", + ) + + def test_extract_text_content_passes_through_string_and_none(self) -> None: + self.assertEqual(extract_text_content("plain"), "plain") + self.assertIsNone(extract_text_content(None)) + + def test_strip_cursor_thinking_blocks_removes_details_and_think(self) -> None: + self.assertEqual( + strip_cursor_thinking_blocks( + "
\nThinking\n\nplan\n
\n\nanswer" + ), + "answer", + ) + self.assertEqual( + strip_cursor_thinking_blocks("\nplan\n\n\nanswer"), + "answer", + ) + + def test_strip_cursor_thinking_blocks_preserves_unrelated_details(self) -> None: + kept = "
Diff\nrelevant\n
" + self.assertEqual(strip_cursor_thinking_blocks(kept), kept) + + def test_normalize_reasoning_effort_aliases(self) -> None: + self.assertEqual(normalize_reasoning_effort("low"), "high") + self.assertEqual(normalize_reasoning_effort("medium"), "high") + self.assertEqual(normalize_reasoning_effort("high"), "high") + self.assertEqual(normalize_reasoning_effort("max"), "max") + self.assertEqual(normalize_reasoning_effort("xhigh"), "max") + self.assertEqual(normalize_reasoning_effort("nonsense"), "high") -def cache_scope(messages: list[dict]) -> str: - return conversation_scope(messages, DEFAULT_CACHE_NAMESPACE) - - -class TransformTests(unittest.TestCase): +class RequestPreparationTests(unittest.TestCase): def setUp(self) -> None: self.store = ReasoningStore(":memory:") def tearDown(self) -> None: self.store.close() - def test_extracts_text_from_cursor_style_content_parts(self) -> None: - content = [ - {"type": "text", "text": "hello"}, - {"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}}, - {"type": "input_text", "text": "world"}, - ] - - self.assertEqual( - extract_text_content(content), - "hello\n[image_url omitted by DeepSeek text proxy]\nworld", - ) - - def test_strips_cursor_display_thinking_blocks_from_assistant_content( - self, - ) -> None: - self.assertEqual( - strip_cursor_thinking_blocks( - "\nNeed context.\n\n\nFinal answer." - ), - "Final answer.", - ) - self.assertEqual( - strip_cursor_thinking_blocks( - "
\n" - "Thinking\n\n" - "Need context.\n" - "
\n\n" - "Final answer." - ), - "Final answer.", - ) - - def test_preserves_regular_markdown_details_in_assistant_content(self) -> None: - content = ( - "
\n" - "Example\n\n" - "Visible details.\n" - "
\n\n" - "Final answer." - ) - - self.assertEqual(strip_cursor_thinking_blocks(content), content) - - def test_prepares_assistant_content_without_mirrored_thinking_blocks( - self, - ) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "hello"}, - { - "role": "assistant", - "content": ( - "
\n" - "Thinking\n\n" - "Hidden.\n" - "
\n\n" - "Visible answer." - ), - }, - {"role": "user", "content": "continue"}, - ], - } - - prepared = prepare_upstream_request( - payload, - ProxyConfig(missing_reasoning_strategy="reject"), - self.store, - ) - - self.assertEqual(prepared.payload["messages"][1]["content"], "Visible answer.") - - def test_prepares_thinking_request_and_converts_legacy_functions(self) -> None: - payload = { - "model": "deepseek-v4-flash", - "messages": [{"role": "user", "content": "hi"}], - "functions": [{"name": "lookup", "parameters": {"type": "object"}}], - "function_call": {"name": "lookup"}, - "max_completion_tokens": 123, - "parallel_tool_calls": True, - } - config = ProxyConfig() - - prepared = prepare_upstream_request(payload, config, self.store) - - self.assertEqual(prepared.original_model, "deepseek-v4-flash") - self.assertEqual(prepared.upstream_model, "deepseek-v4-flash") - self.assertEqual(prepared.payload["model"], "deepseek-v4-flash") - self.assertEqual(prepared.payload["thinking"], {"type": "enabled"}) - self.assertEqual(prepared.payload["reasoning_effort"], "high") - self.assertEqual(prepared.payload["max_tokens"], 123) - self.assertEqual(prepared.payload["tools"][0]["type"], "function") - self.assertEqual( - prepared.payload["tool_choice"], - {"type": "function", "function": {"name": "lookup"}}, - ) - self.assertNotIn("parallel_tool_calls", prepared.payload) - - def test_uses_config_model_only_when_request_model_is_missing(self) -> None: - prepared = prepare_upstream_request( - {"messages": [{"role": "user", "content": "hi"}]}, - ProxyConfig(upstream_model="deepseek-v4-flash"), - self.store, - ) - - self.assertEqual(prepared.original_model, "deepseek-v4-flash") - self.assertEqual(prepared.upstream_model, "deepseek-v4-flash") - self.assertEqual(prepared.payload["model"], "deepseek-v4-flash") - - def test_streaming_requests_include_usage_for_runtime_stats(self) -> None: + def test_legacy_functions_field_is_converted_to_tools(self) -> None: prepared = prepare_upstream_request( { "model": "deepseek-v4-pro", - "stream": True, - "stream_options": {"include_usage": False}, "messages": [{"role": "user", "content": "hi"}], + "functions": [{"name": "lookup", "parameters": {"type": "object"}}], + "function_call": "auto", }, ProxyConfig(), self.store, ) + self.assertEqual(prepared.payload["tools"][0]["function"]["name"], "lookup") + self.assertEqual(prepared.payload["tool_choice"], "auto") + self.assertNotIn("functions", prepared.payload) + self.assertNotIn("function_call", prepared.payload) - self.assertEqual(prepared.payload["stream_options"]["include_usage"], True) - - def test_preserves_required_tool_choice(self) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [{"role": "user", "content": "call a tool"}], - "tools": [{"type": "function", "function": {"name": "lookup"}}], - "tool_choice": "required", - } - + def test_named_function_call_becomes_named_tool_choice(self) -> None: prepared = prepare_upstream_request( - payload, - ProxyConfig(missing_reasoning_strategy="reject"), + { + "model": "deepseek-v4-pro", + "messages": [{"role": "user", "content": "hi"}], + "function_call": {"name": "lookup"}, + }, + ProxyConfig(), self.store, ) - - self.assertEqual(prepared.payload["tool_choice"], "required") - - def test_preserves_named_tool_choice(self) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [{"role": "user", "content": "call lookup"}], - "tools": [{"type": "function", "function": {"name": "lookup"}}], - "tool_choice": { - "type": "function", - "function": {"name": "lookup"}, - }, - } - - prepared = prepare_upstream_request(payload, ProxyConfig(), self.store) - self.assertEqual( prepared.payload["tool_choice"], {"type": "function", "function": {"name": "lookup"}}, ) - def test_restores_reasoning_content_for_cached_tool_call(self) -> None: - prior_messages = [{"role": "user", "content": "read README"}] - assistant_message = { - "role": "assistant", - "content": "", - "reasoning_content": "Need the file contents before answering.", - "tool_calls": [ - { - "id": "call_123", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - } - self.store.store_assistant_message( - assistant_message, cache_scope(prior_messages) + def test_max_completion_tokens_is_aliased_to_max_tokens(self) -> None: + prepared = prepare_upstream_request( + { + "model": "deepseek-v4-pro", + "messages": [{"role": "user", "content": "hi"}], + "max_completion_tokens": 256, + }, + ProxyConfig(), + self.store, ) + self.assertEqual(prepared.payload["max_tokens"], 256) - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "read README"}, + def test_standard_openai_fields_are_forwarded_without_warning(self) -> None: + # Cursor and the OpenAI SDK send these on every request; forward them + # so DeepSeek can use what it understands and ignore the rest. + with self.assertNoLogs("deepseek_cursor_proxy", level="WARNING"): + prepared = prepare_upstream_request( { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_123", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], + "model": "deepseek-v4-pro", + "messages": [{"role": "user", "content": "hi"}], + "user": "user-abc", + "seed": 42, + "n": 1, + "logit_bias": {"50256": -100}, }, - {"role": "tool", "tool_call_id": "call_123", "content": "file text"}, - {"role": "user", "content": "continue"}, - ], - } - - prepared = prepare_upstream_request( - payload, - ProxyConfig(missing_reasoning_strategy="reject"), - self.store, - ) - - self.assertEqual(prepared.patched_reasoning_messages, 1) - self.assertEqual( - prepared.payload["messages"][1]["reasoning_content"], - "Need the file contents before answering.", - ) - - def test_accepts_empty_reasoning_content_when_present_for_tool_call( - self, - ) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "read README"}, - { - "role": "assistant", - "content": "", - "reasoning_content": "", - "tool_calls": [ - { - "id": "call_empty", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - }, - {"role": "tool", "tool_call_id": "call_empty", "content": "file text"}, - ], - } - - prepared = prepare_upstream_request(payload, ProxyConfig(), self.store) - - self.assertEqual(prepared.patched_reasoning_messages, 0) - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertIn("reasoning_content", prepared.payload["messages"][1]) - self.assertEqual(prepared.payload["messages"][1]["reasoning_content"], "") - - def test_restores_empty_reasoning_content_from_cache(self) -> None: - prior_messages = [{"role": "user", "content": "read README"}] - tool_call = { - "id": "call_empty", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - self.store.store_assistant_message( - { - "role": "assistant", - "content": "", - "reasoning_content": "", - "tool_calls": [tool_call], - }, - cache_scope(prior_messages), - ) - - prepared = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - *prior_messages, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - { - "role": "tool", - "tool_call_id": "call_empty", - "content": "file text", - }, - ], - }, - ProxyConfig(), - self.store, - ) - - self.assertEqual(prepared.patched_reasoning_messages, 1) - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertIn("reasoning_content", prepared.payload["messages"][1]) - self.assertEqual(prepared.payload["messages"][1]["reasoning_content"], "") - - def test_restores_reasoning_content_for_cached_final_tool_turn_message( - self, - ) -> None: - prior_messages = [ - {"role": "user", "content": "read README"}, - { - "role": "assistant", - "content": "", - "reasoning_content": "Need the file contents before answering.", - "tool_calls": [ - { - "id": "call_123", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - }, - {"role": "tool", "tool_call_id": "call_123", "content": "file text"}, - ] - assistant_message = { - "role": "assistant", - "content": "Final answer after using the tool.", - "reasoning_content": "The tool result is enough to answer.", - } - self.store.store_assistant_message( - assistant_message, cache_scope(prior_messages) - ) - - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "read README"}, - { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_123", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - }, - {"role": "tool", "tool_call_id": "call_123", "content": "file text"}, - {"role": "assistant", "content": "Final answer after using the tool."}, - {"role": "user", "content": "another question"}, - ], - } - - prepared = prepare_upstream_request( - payload, - ProxyConfig(missing_reasoning_strategy="reject"), - self.store, - ) - - self.assertEqual(prepared.patched_reasoning_messages, 1) - self.assertEqual( - prepared.payload["messages"][3]["reasoning_content"], - "The tool result is enough to answer.", - ) - - def test_reasoning_cache_is_scoped_by_conversation_prefix(self) -> None: - tool_call = { - "id": "call_reused", - "type": "function", - "function": {"name": "lookup", "arguments": "{}"}, - } - assistant_a = { - "role": "assistant", - "content": "", - "reasoning_content": "Reasoning for thread A.", - "tool_calls": [tool_call], - } - assistant_b = { - "role": "assistant", - "content": "", - "reasoning_content": "Reasoning for thread B.", - "tool_calls": [tool_call], - } - prior_a = [{"role": "user", "content": "thread A"}] - prior_b = [{"role": "user", "content": "thread B"}] - - self.store.store_assistant_message(assistant_a, cache_scope(prior_a)) - self.store.store_assistant_message(assistant_b, cache_scope(prior_b)) - - payload_a = { - "model": "deepseek-v4-pro", - "messages": [ - *prior_a, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - ], - } - payload_b = { - "model": "deepseek-v4-pro", - "messages": [ - *prior_b, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - ], - } - - prepared_a = prepare_upstream_request(payload_a, ProxyConfig(), self.store) - prepared_b = prepare_upstream_request(payload_b, ProxyConfig(), self.store) - - self.assertEqual( - prepared_a.payload["messages"][1]["reasoning_content"], - "Reasoning for thread A.", - ) - self.assertEqual( - prepared_b.payload["messages"][1]["reasoning_content"], - "Reasoning for thread B.", - ) - - def test_exact_message_signature_wins_over_tool_call_id_fallback(self) -> None: - prior = [{"role": "user", "content": "same conversation prefix"}] - scope = cache_scope(prior) - first_tool_call = { - "id": "call_reused", - "type": "function", - "function": {"name": "lookup", "arguments": '{"value":"first"}'}, - } - second_tool_call = { - "id": "call_reused", - "type": "function", - "function": {"name": "lookup", "arguments": '{"value":"second"}'}, - } - self.store.store_assistant_message( - { - "role": "assistant", - "content": "", - "reasoning_content": "first reasoning", - "tool_calls": [first_tool_call], - }, - scope, - ) - self.store.store_assistant_message( - { - "role": "assistant", - "content": "", - "reasoning_content": "second reasoning", - "tool_calls": [second_tool_call], - }, - scope, - ) - - prepared = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - *prior, - { - "role": "assistant", - "content": "", - "tool_calls": [first_tool_call], - }, - ], - }, - ProxyConfig(), - self.store, - ) - - self.assertEqual( - prepared.payload["messages"][1]["reasoning_content"], "first reasoning" - ) - - def test_strict_hit_backfills_portable_cache_for_mode_switch(self) -> None: - agent_prior = [ - {"role": "system", "content": "Agent mode."}, - {"role": "user", "content": "set up the task"}, - {"role": "user", "content": "read README"}, - ] - plan_prior = [ - {"role": "system", "content": "Plan mode."}, - {"role": "user", "content": "set up the task"}, - {"role": "user", "content": "read README"}, - ] - tool_call = { - "id": "call_mode_switch", - "type": "function", - "function": {"name": "read_file", "arguments": '{"path":"README.md"}'}, - } - assistant_message = { - "role": "assistant", - "content": "", - "reasoning_content": "Need README before answering.", - "tool_calls": [tool_call], - } - self.store.store_assistant_message( - assistant_message, - cache_scope(agent_prior), - ) - - strict_prepared = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - *agent_prior, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - ], - }, - ProxyConfig(), - self.store, - ) - portable_prepared = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - *plan_prior, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - ], - }, - ProxyConfig(), - self.store, - ) - - self.assertEqual(strict_prepared.patched_reasoning_messages, 1) - self.assertEqual(portable_prepared.patched_reasoning_messages, 1) - self.assertEqual(portable_prepared.missing_reasoning_messages, 0) - self.assertEqual( - portable_prepared.payload["messages"][3]["reasoning_content"], - "Need README before answering.", - ) - self.assertTrue( - str(portable_prepared.reasoning_diagnostics[-1]["hit_kind"]).startswith( - "portable_" + ProxyConfig(), + self.store, ) - ) + self.assertEqual(prepared.payload["user"], "user-abc") + self.assertEqual(prepared.payload["seed"], 42) + self.assertEqual(prepared.payload["n"], 1) + self.assertEqual(prepared.payload["logit_bias"], {"50256": -100}) - def test_portable_turn_cache_restores_final_assistant_after_tool_result( - self, - ) -> None: - agent_user = {"role": "user", "content": "look up project state"} - plan_user = dict(agent_user) - tool_call = { - "id": "call_project_state", - "type": "function", - "function": {"name": "lookup", "arguments": '{"query":"state"}'}, - } - tool_result = { - "role": "tool", - "tool_call_id": "call_project_state", - "content": '{"state":"ready"}', - } - tool_assistant = { - "role": "assistant", - "content": "", - "reasoning_content": "Need the project state.", - "tool_calls": [tool_call], - } - final_assistant = { - "role": "assistant", - "content": "The project is ready.", - "reasoning_content": "The tool result is enough to answer.", - } - agent_initial_prior = [ - {"role": "system", "content": "Agent mode."}, - agent_user, - ] - agent_final_prior = [*agent_initial_prior, tool_assistant, tool_result] - self.store.store_assistant_message( - tool_assistant, - cache_scope(agent_initial_prior), - DEFAULT_CACHE_NAMESPACE, - agent_initial_prior, - ) - self.store.store_assistant_message( - final_assistant, - cache_scope(agent_final_prior), - DEFAULT_CACHE_NAMESPACE, - agent_final_prior, - ) - - prepared = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "system", "content": "Plan mode."}, - plan_user, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - tool_result, - {"role": "assistant", "content": "The project is ready."}, - {"role": "user", "content": "continue"}, - ], - }, - ProxyConfig(missing_reasoning_strategy="reject"), - self.store, - ) - - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertEqual(prepared.patched_reasoning_messages, 2) - self.assertEqual( - prepared.payload["messages"][4]["reasoning_content"], - "The tool result is enough to answer.", - ) - - def test_portable_turn_cache_isolated_for_reused_tool_call_id(self) -> None: - tool_call = { - "id": "call_reused", - "type": "function", - "function": {"name": "lookup", "arguments": "{}"}, - } - assistant_a = { - "role": "assistant", - "content": "", - "reasoning_content": "Reasoning for thread A.", - "tool_calls": [tool_call], - } - assistant_b = { - "role": "assistant", - "content": "", - "reasoning_content": "Reasoning for thread B.", - "tool_calls": [tool_call], - } - prior_a = [ - {"role": "system", "content": "Agent mode."}, - {"role": "user", "content": "thread A"}, - ] - prior_b = [ - {"role": "system", "content": "Agent mode."}, - {"role": "user", "content": "thread B"}, - ] - self.store.store_assistant_message( - assistant_a, - cache_scope(prior_a), - DEFAULT_CACHE_NAMESPACE, - prior_a, - ) - self.store.store_assistant_message( - assistant_b, - cache_scope(prior_b), - DEFAULT_CACHE_NAMESPACE, - prior_b, - ) - - prepared = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "system", "content": "Plan mode."}, - {"role": "user", "content": "thread A"}, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - ], - }, - ProxyConfig(), - self.store, - ) - - self.assertEqual( - prepared.payload["messages"][2]["reasoning_content"], - "Reasoning for thread A.", - ) - - def test_restores_reasoning_when_cursor_drops_tool_call_id_but_keeps_function_call( - self, - ) -> None: - prior = [{"role": "user", "content": "inspect repo"}] - assistant_message = { - "role": "assistant", - "content": "", - "reasoning_content": "Need to call the file tool.", - "tool_calls": [ + def test_unknown_request_fields_are_dropped_with_warning(self) -> None: + with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: + prepared = prepare_upstream_request( { - "id": "call_original", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - } - self.store.store_assistant_message(assistant_message, cache_scope(prior)) - - payload = { - "model": "deepseek-v4-pro", - "messages": [ - *prior, - { - "role": "assistant", - "content": None, - "tool_calls": [ - { - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], + "model": "deepseek-v4-pro", + "messages": [{"role": "user", "content": "hi"}], + "parallel_tool_calls": True, + "service_tier": "fast", }, - ], - } + ProxyConfig(), + self.store, + ) + self.assertNotIn("parallel_tool_calls", prepared.payload) + self.assertNotIn("service_tier", prepared.payload) + log = "\n".join(captured.output) + self.assertIn("parallel_tool_calls", log) + self.assertIn("service_tier", log) - prepared = prepare_upstream_request(payload, ProxyConfig(), self.store) - - self.assertEqual(prepared.patched_reasoning_messages, 1) - self.assertEqual(prepared.payload["messages"][1]["content"], "") - self.assertEqual( - prepared.payload["messages"][1]["reasoning_content"], - "Need to call the file tool.", - ) - - def test_restores_reasoning_when_cursor_history_contains_mirrored_think_block( - self, - ) -> None: - prior = [{"role": "user", "content": "inspect repo"}] - tool_call = { - "id": "call_original", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - self.store.store_assistant_message( - { - "role": "assistant", - "content": "", - "reasoning_content": "Need to call the file tool.", - "tool_calls": [tool_call], - }, - cache_scope(prior), - ) + def test_non_deepseek_model_is_rewritten_with_warning(self) -> None: + with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: + prepared = prepare_upstream_request( + { + "model": "gpt-4", + "messages": [{"role": "user", "content": "hi"}], + }, + ProxyConfig(upstream_model="deepseek-v4-pro"), + self.store, + ) + self.assertEqual(prepared.payload["model"], "deepseek-v4-pro") + self.assertIn("non-DeepSeek", "\n".join(captured.output)) + def test_thinking_disabled_strips_reasoning_from_assistant_history(self) -> None: prepared = prepare_upstream_request( { "model": "deepseek-v4-pro", "messages": [ - *prior, + {"role": "user", "content": "hi"}, { "role": "assistant", - "content": ( - "
\n" - "Thinking\n\n" - "Need to call the file tool.\n" - "
\n\n" - ), - "tool_calls": [tool_call], + "content": "answer", + "reasoning_content": "should be discarded", }, ], }, - ProxyConfig(), + ProxyConfig(thinking="disabled"), self.store, ) - - self.assertEqual(prepared.patched_reasoning_messages, 1) - self.assertEqual(prepared.payload["messages"][1]["content"], "") - self.assertEqual( - prepared.payload["messages"][1]["reasoning_content"], - "Need to call the file tool.", - ) - - def test_reports_missing_reasoning_for_uncached_assistant_tool_call(self) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "read README"}, - { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_uncached", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - }, - { - "role": "tool", - "tool_call_id": "call_uncached", - "content": "file text", - }, - ], - } - - prepared = prepare_upstream_request( - payload, - ProxyConfig(missing_reasoning_strategy="reject"), - self.store, - ) - - self.assertEqual(prepared.patched_reasoning_messages, 0) - self.assertEqual(prepared.missing_reasoning_messages, 1) + self.assertEqual(prepared.payload["thinking"], {"type": "disabled"}) self.assertNotIn("reasoning_content", prepared.payload["messages"][1]) - def test_can_recover_uncached_tool_history_from_latest_user(self) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "system", "content": "Follow project rules."}, - {"role": "user", "content": "read README"}, - { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_uncached", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - }, - { - "role": "tool", - "tool_call_id": "call_uncached", - "content": "file text", - }, - {"role": "user", "content": "continue with the summary"}, - ], - } - - prepared = prepare_upstream_request( - payload, - ProxyConfig(missing_reasoning_strategy="recover"), - self.store, - ) - - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertEqual(prepared.recovered_reasoning_messages, 1) - self.assertEqual(prepared.recovery_dropped_messages, 3) - self.assertEqual(prepared.recovery_notice, RECOVERY_NOTICE_CONTENT) - self.assertEqual( - [message["role"] for message in prepared.payload["messages"]], - ["system", "system", "user"], - ) - self.assertIn( - "recovered this request", prepared.payload["messages"][1]["content"] - ) - self.assertEqual( - prepared.payload["messages"][2], - {"role": "user", "content": "continue with the summary"}, - ) - - def test_recovery_boundary_preserves_later_deepseek_tool_context(self) -> None: - old_tool_call = { - "id": "call_old", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - new_tool_call = { - "id": "call_new", - "type": "function", - "function": { - "name": "lookup", - "arguments": '{"query":"new"}', - }, - } - first_recovered = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "old model turn"}, - { - "role": "assistant", - "content": "", - "tool_calls": [old_tool_call], - }, - { - "role": "tool", - "tool_call_id": "call_old", - "content": "old result", - }, - {"role": "user", "content": "continue with DeepSeek"}, - ], - }, - ProxyConfig(missing_reasoning_strategy="recover"), - self.store, - ) - recovered_tool_message = { - "role": "assistant", - "content": RECOVERY_NOTICE_CONTENT, - "reasoning_content": "Need the new lookup.", - "tool_calls": [new_tool_call], - } - self.store.store_assistant_message( - recovered_tool_message, - conversation_scope( - first_recovered.payload["messages"], - first_recovered.cache_namespace, - ), - ) - + def test_plain_chat_history_does_not_require_reasoning(self) -> None: prepared = prepare_upstream_request( { "model": "deepseek-v4-pro", "messages": [ - {"role": "user", "content": "old model turn"}, - { - "role": "assistant", - "content": "", - "tool_calls": [old_tool_call], - }, - { - "role": "tool", - "tool_call_id": "call_old", - "content": "old result", - }, - {"role": "user", "content": "continue with DeepSeek"}, - { - "role": "assistant", - "content": RECOVERY_NOTICE_CONTENT, - "tool_calls": [new_tool_call], - }, - { - "role": "tool", - "tool_call_id": "call_new", - "content": "new result", - }, + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + {"role": "user", "content": "again"}, ], }, - ProxyConfig(missing_reasoning_strategy="recover"), + ProxyConfig(), self.store, ) - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertEqual(prepared.recovered_reasoning_messages, 0) - self.assertEqual(prepared.recovery_dropped_messages, 0) - self.assertTrue(prepared.continued_recovery_boundary) - self.assertGreater(prepared.retired_prefix_messages, 0) - self.assertIsNone(prepared.recovery_notice) - self.assertEqual( - [message["role"] for message in prepared.payload["messages"]], - ["system", "user", "assistant", "tool"], - ) - self.assertEqual( - prepared.payload["messages"][2]["reasoning_content"], - "Need the new lookup.", - ) - self.assertEqual( - prepared.payload["messages"][3], - { - "role": "tool", - "tool_call_id": "call_new", - "content": "new result", - }, - ) - def test_recovered_response_is_recorded_under_pre_recovery_scope(self) -> None: - old_tool_call = { - "id": "call_old", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - new_tool_call = { - "id": "call_new", - "type": "function", - "function": {"name": "lookup", "arguments": '{"query":"new"}'}, - } - first_payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "old model turn"}, - {"role": "assistant", "content": "", "tool_calls": [old_tool_call]}, - {"role": "tool", "tool_call_id": "call_old", "content": "old result"}, - {"role": "user", "content": "continue with DeepSeek"}, - ], - } - first_recovered = prepare_upstream_request( - first_payload, - ProxyConfig(missing_reasoning_strategy="recover"), - self.store, - ) - self.assertEqual(first_recovered.recovered_reasoning_messages, 1) - response_body = json.dumps( +class RecoveryNoticeStrippingTests(unittest.TestCase): + def test_strips_only_the_recovery_notice_prefix(self) -> None: + messages = [ + {"role": "user", "content": "hi"}, { - "id": "chatcmpl-test", - "object": "chat.completion", - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "finish_reason": "tool_calls", - "message": { - "role": "assistant", - "content": "", - "reasoning_content": "Need the new lookup.", - "tool_calls": [new_tool_call], - }, - } - ], + "role": "assistant", + "content": RECOVERY_NOTICE_CONTENT + "real answer", + }, + {"role": "assistant", "content": "ordinary"}, + ] + result = strip_recovery_notice_for_upstream(messages) + self.assertEqual(result[1]["content"], "real answer") + self.assertEqual(result[2]["content"], "ordinary") + + def test_returns_a_copy_so_caller_keeps_with_prefix_messages(self) -> None: + original = [ + { + "role": "assistant", + "content": RECOVERY_NOTICE_CONTENT + "answer", } - ).encode() - rewritten = rewrite_response_body( - response_body, - "deepseek-v4-pro", - self.store, - first_recovered.payload["messages"], - first_recovered.cache_namespace, - content_prefix=first_recovered.recovery_notice, - recording_contexts=first_recovered.record_response_contexts, - ) - recovered_assistant = json.loads(rewritten)["choices"][0]["message"] - self.assertEqual(len(first_recovered.record_response_contexts), 2) - for scope, _messages in first_recovered.record_response_contexts: - self.assertEqual( - self.store.get( - f"scope:{scope}:signature:{message_signature(recovered_assistant)}" - ), - "Need the new lookup.", - ) - recovered_assistant.pop("reasoning_content", None) + ] + stripped = strip_recovery_notice_for_upstream(original) + # The cache scope is computed on the with-prefix history, so the + # caller's list must NOT be mutated in place. + self.assertEqual(original[0]["content"], RECOVERY_NOTICE_CONTENT + "answer") + self.assertEqual(stripped[0]["content"], "answer") + self.assertIsNot(stripped[0], original[0]) - second_payload = { - "model": "deepseek-v4-pro", - "messages": [ - *first_payload["messages"], - recovered_assistant, - {"role": "tool", "tool_call_id": "call_new", "content": "new result"}, - ], - } + def test_text_constant_matches_content_prefix(self) -> None: + # Sanity check that the user-visible text used as a boundary marker + # is consistent with the wire-format prefix. + self.assertTrue(RECOVERY_NOTICE_CONTENT.startswith(RECOVERY_NOTICE_TEXT)) - second_prepared = prepare_upstream_request( - second_payload, - ProxyConfig(missing_reasoning_strategy="recover"), - self.store, - ) - self.assertEqual(second_prepared.missing_reasoning_messages, 0) - self.assertEqual(second_prepared.recovered_reasoning_messages, 0) - self.assertEqual(second_prepared.recovery_dropped_messages, 0) - self.assertTrue(second_prepared.continued_recovery_boundary) - self.assertGreater(second_prepared.retired_prefix_messages, 0) - self.assertEqual( - second_prepared.payload["messages"][2]["reasoning_content"], - "Need the new lookup.", - ) +class ResponseRewriteTests(unittest.TestCase): + def setUp(self) -> None: + self.store = ReasoningStore(":memory:") - def test_recovery_boundary_accepts_legacy_notice_text(self) -> None: - legacy_recovery_notice = ( - "Note: recovered this DeepSeek chat because older tool-call reasoning " - "was unavailable; continuing with recent context only.\n\n" - ) - old_tool_call = { - "id": "call_old", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - new_tool_call = { - "id": "call_new", - "type": "function", - "function": { - "name": "lookup", - "arguments": '{"query":"new"}', - }, - } - first_recovered = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "old model turn"}, - { - "role": "assistant", - "content": "", - "tool_calls": [old_tool_call], - }, - { - "role": "tool", - "tool_call_id": "call_old", - "content": "old result", - }, - {"role": "user", "content": "continue with DeepSeek"}, - ], - }, - ProxyConfig(missing_reasoning_strategy="recover"), - self.store, - ) - self.store.store_assistant_message( - { - "role": "assistant", - "content": legacy_recovery_notice, - "reasoning_content": "Need the new lookup.", - "tool_calls": [new_tool_call], - }, - conversation_scope( - first_recovered.payload["messages"], - first_recovered.cache_namespace, - ), - ) + def tearDown(self) -> None: + self.store.close() - prepared = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "old model turn"}, - { - "role": "assistant", - "content": "", - "tool_calls": [old_tool_call], - }, - { - "role": "tool", - "tool_call_id": "call_old", - "content": "old result", - }, - {"role": "user", "content": "continue with DeepSeek"}, - { - "role": "assistant", - "content": legacy_recovery_notice, - "tool_calls": [new_tool_call], - }, - { - "role": "tool", - "tool_call_id": "call_new", - "content": "new result", - }, - ], - }, - ProxyConfig(missing_reasoning_strategy="recover"), - self.store, - ) - - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertEqual(prepared.recovered_reasoning_messages, 0) - self.assertEqual(prepared.recovery_dropped_messages, 0) - self.assertTrue(prepared.continued_recovery_boundary) - self.assertGreater(prepared.retired_prefix_messages, 0) - self.assertIsNone(prepared.recovery_notice) - self.assertEqual( - prepared.payload["messages"][2]["reasoning_content"], - "Need the new lookup.", - ) - - def test_reports_missing_reasoning_for_uncached_assistant_after_tool_result( - self, - ) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "read README"}, - { - "role": "assistant", - "content": "", - "reasoning_content": "Need file text.", - "tool_calls": [ - { - "id": "call_uncached", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - }, - { - "role": "tool", - "tool_call_id": "call_uncached", - "content": "file text", - }, - {"role": "assistant", "content": "Summary of file."}, - {"role": "user", "content": "continue"}, - ], - } - - prepared = prepare_upstream_request( - payload, - ProxyConfig(missing_reasoning_strategy="reject"), - self.store, - ) - - self.assertEqual(prepared.missing_reasoning_messages, 1) - self.assertNotIn("reasoning_content", prepared.payload["messages"][3]) - - def test_does_not_report_missing_reasoning_for_plain_chat_history(self) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "hello"}, - {"role": "assistant", "content": "hi"}, - {"role": "user", "content": "continue"}, - ], - } - - prepared = prepare_upstream_request(payload, ProxyConfig(), self.store) - - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertNotIn("reasoning_content", prepared.payload["messages"][1]) - - def test_does_not_repair_reasoning_when_thinking_is_disabled(self) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [ - {"role": "user", "content": "read README"}, - { - "role": "assistant", - "content": "", - "reasoning_content": "Should be removed in non-thinking mode.", - "tool_calls": [ - { - "id": "call_uncached", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - ], - }, - { - "role": "tool", - "tool_call_id": "call_uncached", - "content": "file text", - }, - ], - } - - prepared = prepare_upstream_request( - payload, ProxyConfig(thinking="disabled"), self.store - ) - - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertNotIn("reasoning_content", prepared.payload["messages"][1]) - - def test_reasoning_cache_is_namespaced_by_authorization(self) -> None: - config = ProxyConfig(missing_reasoning_strategy="reject") - prior = [{"role": "user", "content": "read README"}] - namespace_a = reasoning_cache_namespace( - config, - config.upstream_model, - {"type": "enabled"}, - "high", - "Bearer key-a", - ) - tool_call = { - "id": "call_123", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - self.store.store_assistant_message( - { - "role": "assistant", - "content": "", - "reasoning_content": "Reasoning for key A.", - "tool_calls": [tool_call], - }, - conversation_scope(prior, namespace_a), - ) - - prepared = prepare_upstream_request( - { - "model": "deepseek-v4-pro", - "messages": [ - *prior, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - ], - }, - config, - self.store, - authorization="Bearer key-b", - ) - - self.assertEqual(prepared.missing_reasoning_messages, 1) - self.assertNotIn("reasoning_content", prepared.payload["messages"][1]) - - def test_deepseek_pro_and_flash_share_reasoning_namespace(self) -> None: - config = ProxyConfig() - namespace_pro = reasoning_cache_namespace( - config, - "deepseek-v4-pro", - {"type": "enabled"}, - "high", - "Bearer key-a", - ) - namespace_flash = reasoning_cache_namespace( - config, - "deepseek-v4-flash", - {"type": "enabled"}, - "high", - "Bearer key-a", - ) - self.assertEqual(namespace_pro, namespace_flash) - - prior = [{"role": "user", "content": "read README"}] - tool_call = { - "id": "call_shared", - "type": "function", - "function": { - "name": "read_file", - "arguments": '{"path":"README.md"}', - }, - } - self.store.store_assistant_message( - { - "role": "assistant", - "content": "", - "reasoning_content": "Shared DeepSeek reasoning.", - "tool_calls": [tool_call], - }, - conversation_scope(prior, namespace_pro), - namespace_pro, - prior, - ) - - prepared = prepare_upstream_request( - { - "model": "deepseek-v4-flash", - "messages": [ - *prior, - {"role": "assistant", "content": "", "tool_calls": [tool_call]}, - ], - }, - config, - self.store, - authorization="Bearer key-a", - ) - - self.assertEqual(prepared.missing_reasoning_messages, 0) - self.assertEqual( - prepared.payload["messages"][1]["reasoning_content"], - "Shared DeepSeek reasoning.", - ) - - def test_converted_function_message_uses_tool_schema(self) -> None: - payload = { - "model": "deepseek-v4-pro", - "messages": [ - { - "role": "function", - "name": "lookup", - "tool_call_id": "call_1", - "content": {"ok": True}, - } - ], - } - - prepared = prepare_upstream_request(payload, ProxyConfig(), self.store) - - self.assertEqual( - prepared.payload["messages"][0], - {"role": "tool", "tool_call_id": "call_1", "content": '{"ok": true}'}, - ) - - def test_rewrite_response_records_reasoning_and_restores_model_name(self) -> None: + def test_records_reasoning_and_restores_original_model_name(self) -> None: body = json.dumps( { - "id": "chatcmpl-test", - "object": "chat.completion", - "model": "deepseek-v4-pro", - "choices": [ - { - "index": 0, - "finish_reason": "tool_calls", - "message": { - "role": "assistant", - "content": "", - "reasoning_content": "I need to inspect the repo.", - "tool_calls": [ - { - "id": "call_abc", - "type": "function", - "function": { - "name": "list_files", - "arguments": "{}", - }, - } - ], - }, - } - ], - } - ).encode() - - request_messages = [{"role": "user", "content": "inspect repo"}] - rewritten = rewrite_response_body( - body, "deepseek-v4-flash", self.store, request_messages - ) - payload = json.loads(rewritten) - - self.assertEqual(payload["model"], "deepseek-v4-flash") - self.assertEqual( - self.store.get( - f"scope:{conversation_scope(request_messages)}:tool_call:call_abc" - ), - "I need to inspect the repo.", - ) - - def test_rewrite_response_can_prefix_recovery_notice_before_storing( - self, - ) -> None: - body = json.dumps( - { - "id": "chatcmpl-test", + "id": "chatcmpl", "object": "chat.completion", "model": "deepseek-v4-pro", "choices": [ @@ -1442,48 +260,56 @@ class TransformTests(unittest.TestCase): "finish_reason": "stop", "message": { "role": "assistant", - "content": "Summary.", - "reasoning_content": "Tool result is enough.", + "content": "Final.", + "reasoning_content": "Done thinking.", }, } ], } ).encode() + request_messages = [{"role": "user", "content": "hi"}] + rewritten = rewrite_response_body( + body, "deepseek-v4-pro", self.store, request_messages + ) + payload = json.loads(rewritten) + self.assertEqual(payload["model"], "deepseek-v4-pro") + stored = self.store.get( + f"scope:{conversation_scope(request_messages)}:signature:" + f"{message_signature(payload['choices'][0]['message'])}" + ) + self.assertEqual(stored, "Done thinking.") - request_messages = [ - {"role": "user", "content": "read README"}, - {"role": "tool", "tool_call_id": "call_abc", "content": "file text"}, - ] + def test_recovery_notice_is_prefixed_into_response_content(self) -> None: + body = json.dumps( + { + "id": "chatcmpl", + "object": "chat.completion", + "model": "deepseek-v4-pro", + "choices": [ + { + "index": 0, + "finish_reason": "stop", + "message": {"role": "assistant", "content": "Final."}, + } + ], + } + ).encode() rewritten = rewrite_response_body( body, "deepseek-v4-pro", self.store, - request_messages, + [{"role": "user", "content": "hi"}], content_prefix=RECOVERY_NOTICE_CONTENT, ) - payload = json.loads(rewritten) - stored_message = { - "role": "assistant", - "content": RECOVERY_NOTICE_CONTENT + "Summary.", - "reasoning_content": "Tool result is enough.", - } - - self.assertEqual( - payload["choices"][0]["message"]["content"], - RECOVERY_NOTICE_CONTENT + "Summary.", - ) - self.assertEqual( - self.store.get( - f"scope:{conversation_scope(request_messages)}:signature:" - f"{message_signature(stored_message)}" - ), - "Tool result is enough.", + self.assertIn( + RECOVERY_NOTICE_CONTENT, + json.loads(rewritten)["choices"][0]["message"]["content"], ) - def test_rewrite_response_preserves_prompt_cache_usage_fields(self) -> None: + def test_preserves_prompt_cache_usage_fields(self) -> None: body = json.dumps( { - "id": "chatcmpl-test", + "id": "chatcmpl", "object": "chat.completion", "model": "deepseek-v4-pro", "choices": [ @@ -1502,12 +328,10 @@ class TransformTests(unittest.TestCase): }, } ).encode() - rewritten = rewrite_response_body(body, "deepseek-v4-flash", self.store, []) - payload = json.loads(rewritten) - - self.assertEqual(payload["usage"]["prompt_cache_hit_tokens"], 6) - self.assertEqual(payload["usage"]["prompt_cache_miss_tokens"], 4) + usage = json.loads(rewritten)["usage"] + self.assertEqual(usage["prompt_cache_hit_tokens"], 6) + self.assertEqual(usage["prompt_cache_miss_tokens"], 4) if __name__ == "__main__":