from __future__ import annotations from dataclasses import replace from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import json import threading import time import unittest from urllib.error import HTTPError from urllib.request import Request, urlopen from deepseek_cursor_proxy.config import ProxyConfig from deepseek_cursor_proxy.reasoning_store import ( ReasoningStore, conversation_scope, message_signature, ) from deepseek_cursor_proxy.server import DeepSeekProxyHandler, DeepSeekProxyServer TOOL_REASONING = "I need the current date before answering." FINAL_REASONING = "The tool result gives the date, so I can answer." FINAL_CONTENT = "Final answer after using the tool." def post_json( url: str, payload: dict, api_key: str = "sk-cursor-test" ) -> tuple[int, dict]: body = json.dumps(payload).encode("utf-8") request = Request( url, data=body, method="POST", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, ) try: response = urlopen(request, timeout=5) with response: return response.status, json.loads(response.read().decode("utf-8")) except HTTPError as exc: return exc.code, json.loads(exc.read().decode("utf-8")) class FakeDeepSeekHandler(BaseHTTPRequestHandler): requests: list[dict] = [] auth_headers: list[str] = [] def log_message(self, fmt: str, *args: object) -> None: return def do_POST(self) -> None: length = int(self.headers.get("Content-Length") or 0) payload = json.loads(self.rfile.read(length).decode("utf-8")) self.__class__.requests.append(payload) self.__class__.auth_headers.append(self.headers.get("Authorization", "")) for index, message in enumerate(payload.get("messages", [])): if not isinstance(message, dict) or message.get("role") != "assistant": continue requires_reasoning = ( bool(message.get("tool_calls")) or message.get("content") == FINAL_CONTENT ) if requires_reasoning and not message.get("reasoning_content"): self._send_json( 400, { "error": { "message": "The reasoning_content in the thinking mode must be passed back to the API.", "type": "invalid_request_error", "param": None, "code": "invalid_request_error", "missing_index": index, } }, ) return call_number = len(self.__class__.requests) if call_number == 1: self._send_json(200, tool_call_response()) elif call_number == 2: self._send_json(200, final_response()) else: self._send_json(200, plain_response("Follow-up accepted.")) def _send_json(self, status: int, payload: dict) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) class InterleavedFakeDeepSeekHandler(BaseHTTPRequestHandler): requests: list[dict] = [] def log_message(self, fmt: str, *args: object) -> None: return def do_POST(self) -> None: length = int(self.headers.get("Content-Length") or 0) payload = json.loads(self.rfile.read(length).decode("utf-8")) self.__class__.requests.append(payload) messages = payload.get("messages", []) thread_name = thread_name_from_messages(messages) if thread_name not in {"A", "B"}: self._send_json(400, {"error": {"message": "unknown test thread"}}) return expected_tool_reasoning = f"tool reasoning for thread {thread_name}" expected_final_reasoning = f"final reasoning for thread {thread_name}" final_content = f"final answer for thread {thread_name}" for index, message in enumerate(messages): if not isinstance(message, dict) or message.get("role") != "assistant": continue if ( message.get("tool_calls") and message.get("reasoning_content") != expected_tool_reasoning ): self._send_missing_reasoning(index) return if ( message.get("content") == final_content and message.get("reasoning_content") != expected_final_reasoning ): self._send_missing_reasoning(index) return if len(messages) == 1: self._send_json(200, interleaved_tool_call_response(thread_name)) elif len(messages) == 3: self._send_json(200, interleaved_final_response(thread_name)) else: self._send_json( 200, plain_response(f"follow-up accepted for thread {thread_name}") ) def _send_missing_reasoning(self, index: int) -> None: self._send_json( 400, { "error": { "message": "The reasoning_content in the thinking mode must be passed back to the API.", "type": "invalid_request_error", "param": None, "code": "invalid_request_error", "missing_index": index, } }, ) def _send_json(self, status: int, payload: dict) -> None: body = json.dumps(payload).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) class SlowAfterDoneStreamingDeepSeekHandler(BaseHTTPRequestHandler): def log_message(self, fmt: str, *args: object) -> None: return def do_POST(self) -> None: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.end_headers() chunk = { "id": "chatcmpl-stream", "object": "chat.completion.chunk", "model": "deepseek-v4-pro", "choices": [ { "index": 0, "delta": {"role": "assistant", "content": "streamed"}, "finish_reason": None, } ], } self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode("utf-8")) self.wfile.write(b"data: [DONE]\n\n") self.wfile.flush() time.sleep(2) class ReasoningStreamingDeepSeekHandler(BaseHTTPRequestHandler): def log_message(self, fmt: str, *args: object) -> None: return def do_POST(self) -> None: self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.end_headers() chunks = [ { "id": "chatcmpl-stream", "object": "chat.completion.chunk", "created": 1, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "delta": {"role": "assistant", "reasoning_content": "Need "}, "finish_reason": None, } ], }, { "id": "chatcmpl-stream", "object": "chat.completion.chunk", "created": 1, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "delta": {"reasoning_content": "context."}, "finish_reason": None, } ], }, { "id": "chatcmpl-stream", "object": "chat.completion.chunk", "created": 1, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "delta": {"content": FINAL_CONTENT}, "finish_reason": None, } ], }, { "id": "chatcmpl-stream", "object": "chat.completion.chunk", "created": 1, "model": "deepseek-v4-pro", "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], }, ] for chunk in chunks: self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode("utf-8")) self.wfile.write(b"data: [DONE]\n\n") self.wfile.flush() def tool_call_response() -> dict: return { "id": "chatcmpl-tool", "object": "chat.completion", "created": 1, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "finish_reason": "tool_calls", "message": { "role": "assistant", "content": "", "reasoning_content": TOOL_REASONING, "tool_calls": [ { "id": "call_date", "type": "function", "function": {"name": "get_date", "arguments": "{}"}, } ], }, } ], } def interleaved_tool_call_response(thread_name: str) -> dict: return { "id": f"chatcmpl-tool-{thread_name}", "object": "chat.completion", "created": 1, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "finish_reason": "tool_calls", "message": { "role": "assistant", "content": "", "reasoning_content": f"tool reasoning for thread {thread_name}", "tool_calls": [ { "id": "call_reused", "type": "function", "function": {"name": "lookup", "arguments": "{}"}, } ], }, } ], } def interleaved_final_response(thread_name: str) -> dict: return { "id": f"chatcmpl-final-{thread_name}", "object": "chat.completion", "created": 2, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "finish_reason": "stop", "message": { "role": "assistant", "content": f"final answer for thread {thread_name}", "reasoning_content": f"final reasoning for thread {thread_name}", }, } ], } def final_response() -> dict: return { "id": "chatcmpl-final", "object": "chat.completion", "created": 2, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "finish_reason": "stop", "message": { "role": "assistant", "content": FINAL_CONTENT, "reasoning_content": FINAL_REASONING, }, } ], } def plain_response(content: str) -> dict: return { "id": "chatcmpl-plain", "object": "chat.completion", "created": 3, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "finish_reason": "stop", "message": {"role": "assistant", "content": content}, } ], } class ServerFixture: def __init__(self, server: ThreadingHTTPServer) -> None: self.server = server self.thread = threading.Thread(target=server.serve_forever, daemon=True) @property def url(self) -> str: host, port = self.server.server_address return f"http://{host}:{port}" def start(self) -> "ServerFixture": self.thread.start() return self def close(self) -> None: self.server.shutdown() self.server.server_close() self.thread.join(timeout=5) class ProxyEndToEndTests(unittest.TestCase): def setUp(self) -> None: FakeDeepSeekHandler.requests = [] FakeDeepSeekHandler.auth_headers = [] self.upstream = ServerFixture( ThreadingHTTPServer(("127.0.0.1", 0), FakeDeepSeekHandler) ).start() self.store = ReasoningStore(":memory:") proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) proxy.config = ProxyConfig( upstream_base_url=self.upstream.url, upstream_model="deepseek-v4-pro", ) proxy.reasoning_store = self.store self.proxy = ServerFixture(proxy).start() def tearDown(self) -> None: self.proxy.close() self.upstream.close() self.store.close() def test_missing_reasoning_reproduces_upstream_error_without_proxy_repair( self, ) -> None: status, payload = post_json( f"{self.upstream.url}/chat/completions", second_cursor_request(include_reasoning=False), ) self.assertEqual(status, 400) self.assertIn("reasoning_content", payload["error"]["message"]) def test_proxy_repairs_cursor_style_multi_round_tool_call_history(self) -> None: status, first = post_json( f"{self.proxy.url}/v1/chat/completions", first_cursor_request(), ) self.assertEqual(status, 200) tool_call_message = first["choices"][0]["message"] self.assertEqual(tool_call_message["reasoning_content"], TOOL_REASONING) status, second = post_json( f"{self.proxy.url}/v1/chat/completions", second_cursor_request(include_reasoning=False), ) self.assertEqual(status, 200) self.assertEqual(second["choices"][0]["message"]["content"], FINAL_CONTENT) status, third = post_json( f"{self.proxy.url}/v1/chat/completions", third_cursor_request_missing_all_reasoning(), ) self.assertEqual(status, 200) self.assertEqual( third["choices"][0]["message"]["content"], "Follow-up accepted." ) second_upstream_messages = FakeDeepSeekHandler.requests[1]["messages"] self.assertEqual( second_upstream_messages[1]["reasoning_content"], TOOL_REASONING ) third_upstream_messages = FakeDeepSeekHandler.requests[2]["messages"] self.assertEqual( third_upstream_messages[1]["reasoning_content"], TOOL_REASONING ) self.assertEqual( third_upstream_messages[3]["reasoning_content"], FINAL_REASONING ) def test_proxy_forwards_cursor_bearer_token_to_deepseek(self) -> None: status, _ = post_json( f"{self.proxy.url}/v1/chat/completions", first_cursor_request(), api_key="sk-from-cursor", ) self.assertEqual(status, 200) self.assertEqual(FakeDeepSeekHandler.auth_headers[0], "Bearer sk-from-cursor") def test_normal_mode_logs_safe_request_progress_without_bodies(self) -> None: with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: status, _ = post_json( f"{self.proxy.url}/v1/chat/completions", first_cursor_request(), api_key="sk-from-cursor", ) output = "\n".join(captured.output) self.assertEqual(status, 200) self.assertIn("cursor request: model='deepseek-v4-pro'", output) self.assertIn("request complete status=200", output) self.assertNotIn("What is tomorrow's date?", output) self.assertNotIn("sk-from-cursor", output) def test_verbose_mode_logs_metadata_and_bodies_without_api_key(self) -> None: self.proxy.server.config = replace(self.proxy.server.config, verbose=True) with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: status, _ = post_json( f"{self.proxy.url}/v1/chat/completions", first_cursor_request(), api_key="sk-from-cursor", ) output = "\n".join(captured.output) self.assertEqual(status, 200) self.assertIn("incoming POST /v1/chat/completions", output) self.assertIn("upstream request metadata", output) self.assertIn("cursor request body", output) self.assertIn("upstream request body", output) self.assertIn("What is tomorrow's date?", output) self.assertNotIn("sk-from-cursor", output) def test_proxy_rejects_missing_cursor_bearer_token(self) -> None: request = Request( f"{self.proxy.url}/v1/chat/completions", data=json.dumps(first_cursor_request()).encode("utf-8"), method="POST", headers={"Content-Type": "application/json"}, ) with self.assertRaises(HTTPError) as caught: urlopen(request, timeout=5) self.assertEqual(caught.exception.code, 401) self.assertEqual(FakeDeepSeekHandler.requests, []) def test_proxy_adds_fallback_reasoning_for_uncached_cursor_tool_history( self, ) -> None: status, _ = post_json( f"{self.proxy.url}/v1/chat/completions", second_cursor_request(include_reasoning=False), ) self.assertEqual(status, 200) upstream_messages = FakeDeepSeekHandler.requests[0]["messages"] self.assertIn("reasoning_content", upstream_messages[1]) class InterleavedConversationTests(unittest.TestCase): def setUp(self) -> None: InterleavedFakeDeepSeekHandler.requests = [] self.upstream = ServerFixture( ThreadingHTTPServer(("127.0.0.1", 0), InterleavedFakeDeepSeekHandler) ).start() self.store = ReasoningStore(":memory:") proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) proxy.config = ProxyConfig( upstream_base_url=self.upstream.url, upstream_model="deepseek-v4-pro", ) proxy.reasoning_store = self.store self.proxy = ServerFixture(proxy).start() def tearDown(self) -> None: self.proxy.close() self.upstream.close() self.store.close() def test_one_proxy_repairs_two_interleaved_cursor_threads(self) -> None: status, first_a = post_json( f"{self.proxy.url}/v1/chat/completions", interleaved_first_request("A") ) self.assertEqual(status, 200) status, first_b = post_json( f"{self.proxy.url}/v1/chat/completions", interleaved_first_request("B") ) self.assertEqual(status, 200) status, final_b = post_json( f"{self.proxy.url}/v1/chat/completions", interleaved_second_request("B", first_b["choices"][0]["message"]), ) self.assertEqual(status, 200) status, final_a = post_json( f"{self.proxy.url}/v1/chat/completions", interleaved_second_request("A", first_a["choices"][0]["message"]), ) self.assertEqual(status, 200) status, followup_a = post_json( f"{self.proxy.url}/v1/chat/completions", interleaved_third_request( "A", first_a["choices"][0]["message"], final_a["choices"][0]["message"] ), ) self.assertEqual(status, 200) self.assertEqual( followup_a["choices"][0]["message"]["content"], "follow-up accepted for thread A", ) status, followup_b = post_json( f"{self.proxy.url}/v1/chat/completions", interleaved_third_request( "B", first_b["choices"][0]["message"], final_b["choices"][0]["message"] ), ) self.assertEqual(status, 200) self.assertEqual( followup_b["choices"][0]["message"]["content"], "follow-up accepted for thread B", ) final_b_upstream_messages = InterleavedFakeDeepSeekHandler.requests[2][ "messages" ] final_a_upstream_messages = InterleavedFakeDeepSeekHandler.requests[3][ "messages" ] followup_a_upstream_messages = InterleavedFakeDeepSeekHandler.requests[4][ "messages" ] followup_b_upstream_messages = InterleavedFakeDeepSeekHandler.requests[5][ "messages" ] self.assertEqual( final_b_upstream_messages[1]["reasoning_content"], "tool reasoning for thread B", ) self.assertEqual( final_a_upstream_messages[1]["reasoning_content"], "tool reasoning for thread A", ) self.assertEqual( followup_a_upstream_messages[1]["reasoning_content"], "tool reasoning for thread A", ) self.assertEqual( followup_a_upstream_messages[3]["reasoning_content"], "final reasoning for thread A", ) self.assertEqual( followup_b_upstream_messages[1]["reasoning_content"], "tool reasoning for thread B", ) self.assertEqual( followup_b_upstream_messages[3]["reasoning_content"], "final reasoning for thread B", ) class StreamingProxyTests(unittest.TestCase): def setUp(self) -> None: self.upstream = ServerFixture( ThreadingHTTPServer(("127.0.0.1", 0), SlowAfterDoneStreamingDeepSeekHandler) ).start() self.store = ReasoningStore(":memory:") proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) proxy.config = ProxyConfig( upstream_base_url=self.upstream.url, upstream_model="deepseek-v4-pro", ) proxy.reasoning_store = self.store self.proxy = ServerFixture(proxy).start() def tearDown(self) -> None: self.proxy.close() self.upstream.close() self.store.close() def test_streaming_proxy_closes_after_done_even_if_upstream_stays_open( self, ) -> None: request = Request( f"{self.proxy.url}/v1/chat/completions", data=json.dumps( { "model": "deepseek-v4-pro", "stream": True, "messages": [{"role": "user", "content": "stream"}], } ).encode("utf-8"), method="POST", headers={ "Authorization": "Bearer sk-cursor-test", "Content-Type": "application/json", }, ) started = time.monotonic() with urlopen(request, timeout=1) as response: body = response.read().decode("utf-8") elapsed = time.monotonic() - started self.assertLess(elapsed, 1) self.assertIn("data: [DONE]", body) class ReasoningStreamingProxyTests(unittest.TestCase): def setUp(self) -> None: self.upstream = ServerFixture( ThreadingHTTPServer(("127.0.0.1", 0), ReasoningStreamingDeepSeekHandler) ).start() self.store = ReasoningStore(":memory:") proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) proxy.config = ProxyConfig( upstream_base_url=self.upstream.url, upstream_model="deepseek-v4-pro", ) proxy.reasoning_store = self.store self.proxy = ServerFixture(proxy).start() def tearDown(self) -> None: self.proxy.close() self.upstream.close() self.store.close() def test_streaming_proxy_mirrors_reasoning_for_cursor_display( self, ) -> None: request_messages = [{"role": "user", "content": "stream reasoning"}] request = Request( f"{self.proxy.url}/v1/chat/completions", data=json.dumps( { "model": "deepseek-v4-pro", "stream": True, "messages": request_messages, } ).encode("utf-8"), method="POST", headers={ "Authorization": "Bearer sk-cursor-test", "Content-Type": "application/json", }, ) with urlopen(request, timeout=2) as response: body = response.read().decode("utf-8") chunks = [ json.loads(line.removeprefix("data: ")) for line in body.splitlines() if line.startswith("data: {") ] self.assertEqual(chunks[0]["choices"][0]["delta"]["content"], "\nNeed ") self.assertEqual(chunks[0]["choices"][0]["delta"]["reasoning_content"], "Need ") self.assertEqual(chunks[1]["choices"][0]["delta"]["content"], "context.") self.assertEqual( chunks[2]["choices"][0]["delta"]["content"], "\n\n\n" + FINAL_CONTENT, ) stored_message = { "role": "assistant", "content": FINAL_CONTENT, "reasoning_content": "Need context.", } self.assertEqual( self.store.get( "scope:" + conversation_scope(request_messages) + ":signature:" + message_signature(stored_message) ), "Need context.", ) def first_cursor_request() -> dict: return { "model": "deepseek-v4-pro", "messages": [{"role": "user", "content": "What is tomorrow's date?"}], "tools": [ { "type": "function", "function": { "name": "get_date", "description": "Get the current date", "parameters": {"type": "object", "properties": {}}, }, } ], } def second_cursor_request(include_reasoning: bool) -> dict: assistant_message = { "role": "assistant", "content": "", "tool_calls": [ { "id": "call_date", "type": "function", "function": {"name": "get_date", "arguments": "{}"}, } ], } if include_reasoning: assistant_message["reasoning_content"] = TOOL_REASONING return { "model": "deepseek-v4-pro", "messages": [ {"role": "user", "content": "What is tomorrow's date?"}, assistant_message, {"role": "tool", "tool_call_id": "call_date", "content": "2026-04-24"}, ], "tools": first_cursor_request()["tools"], } def third_cursor_request_missing_all_reasoning() -> dict: payload = second_cursor_request(include_reasoning=False) payload["messages"].append({"role": "assistant", "content": FINAL_CONTENT}) payload["messages"].append({"role": "user", "content": "Thanks, now continue."}) return payload def thread_name_from_messages(messages: list[dict]) -> str | None: if not messages: return None content = str(messages[0].get("content") or "") if "thread A" in content: return "A" if "thread B" in content: return "B" return None def interleaved_first_request(thread_name: str) -> dict: return { "model": "deepseek-v4-pro", "messages": [{"role": "user", "content": f"Start thread {thread_name}."}], "tools": [ { "type": "function", "function": { "name": "lookup", "description": "Return a value.", "parameters": {"type": "object", "properties": {}}, }, } ], } def interleaved_second_request(thread_name: str, assistant_message: dict) -> dict: cursor_assistant = dict(assistant_message) cursor_assistant.pop("reasoning_content", None) return { "model": "deepseek-v4-pro", "messages": [ {"role": "user", "content": f"Start thread {thread_name}."}, cursor_assistant, { "role": "tool", "tool_call_id": "call_reused", "content": f"tool result for thread {thread_name}", }, ], "tools": interleaved_first_request(thread_name)["tools"], } def interleaved_third_request( thread_name: str, tool_assistant_message: dict, final_message: dict ) -> dict: payload = interleaved_second_request(thread_name, tool_assistant_message) cursor_final = dict(final_message) cursor_final.pop("reasoning_content", None) payload["messages"].append(cursor_final) payload["messages"].append( {"role": "user", "content": f"Follow up in thread {thread_name}."} ) return payload if __name__ == "__main__": unittest.main()