"""Server boundary, CLI, and operational tests. Pure helper tests (gzip, summarize) and stub-handler tests (client disconnect) live near the top. The bottom of the file boots a real proxy + tiny upstream to exercise things that need the HTTP layer: bearer token forwarding, oversized body, missing-bearer rejection, logging modes, and streaming connection close. """ from __future__ import annotations from dataclasses import replace from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from io import BytesIO import gzip import json import logging from pathlib import Path import re import threading import time from types import SimpleNamespace import unittest import zlib from urllib.error import HTTPError from urllib.request import Request, urlopen from deepseek_cursor_proxy.config import ProxyConfig from deepseek_cursor_proxy.logging import ( ConsoleLogFormatter, TerminalSpinner, ) from deepseek_cursor_proxy.reasoning_store import ReasoningStore from deepseek_cursor_proxy.server import ( DeepSeekProxyHandler, DeepSeekProxyServer, build_arg_parser, read_response_body, summarize_chat_payload, ) # --------------------------------------------------------------------------- # Stubs for fast in-process tests of internal handler methods # --------------------------------------------------------------------------- class _FakeResponse: def __init__(self, body: bytes, encoding: str = "", status: int = 200) -> None: self._body = BytesIO(body) self.headers = {"Content-Encoding": encoding} if encoding else {} self.status = status def read(self) -> bytes: return self._body.read() class _FakeStreamingResponse: status = 200 headers = {"Content-Type": "text/event-stream"} def __init__(self, lines: list[bytes]) -> None: self._lines = lines self.readline_calls = 0 def readline(self) -> bytes: self.readline_calls += 1 if not self._lines: return b"" return self._lines.pop(0) class _FailingStreamingResponse: status = 200 headers = {"Content-Type": "text/event-stream"} def readline(self) -> bytes: raise OSError("record layer failure") class _BrokenPipeWfile: def write(self, body: bytes) -> None: raise BrokenPipeError("test disconnect") def flush(self) -> None: raise BrokenPipeError("test disconnect") class _FakeConsole: def __init__(self, *, tty: bool) -> None: self.tty = tty self.writes: list[str] = [] def isatty(self) -> bool: return self.tty def write(self, text: str) -> None: self.writes.append(text) def flush(self) -> None: return def _make_handler_stub(wfile: object, **config: object) -> DeepSeekProxyHandler: handler = object.__new__(DeepSeekProxyHandler) handler.server = SimpleNamespace( config=ProxyConfig(**config), reasoning_store=ReasoningStore(":memory:"), ) handler.wfile = wfile handler.close_connection = False handler.send_response = lambda status: None handler.send_header = lambda name, value: None handler.end_headers = lambda: None return handler # --------------------------------------------------------------------------- # CLI / pure helpers # --------------------------------------------------------------------------- class CliAndHelperTests(unittest.TestCase): def test_cli_boolean_flags_have_on_and_off_forms(self) -> None: args = build_arg_parser().parse_args( [ "--no-ngrok", "--no-verbose", "--no-display-reasoning", "--no-collasible-resoning", "--cors", "--trace-dir", "/tmp/dcp-traces", ] ) self.assertFalse(args.ngrok) self.assertFalse(args.verbose) self.assertFalse(args.display_reasoning) self.assertFalse(args.collapsible_reasoning) self.assertTrue(args.cors) self.assertEqual(args.trace_dir, Path("/tmp/dcp-traces")) def test_cli_accepts_ngrok_url(self) -> None: args = build_arg_parser().parse_args( ["--ngrok-url", "https://example.ngrok.app"] ) self.assertEqual(args.ngrok_url, "https://example.ngrok.app") def test_default_console_logging_hides_info_prefix_and_timestamp(self) -> None: formatter = ConsoleLogFormatter(verbose=False) info_record = logging.LogRecord( "deepseek_cursor_proxy", logging.INFO, __file__, 1, "listening on %s", ("http://127.0.0.1:9000/v1",), None, ) warning_record = logging.LogRecord( "deepseek_cursor_proxy", logging.WARNING, __file__, 1, "trace logging enabled", (), None, ) self.assertEqual( formatter.format(info_record), "listening on http://127.0.0.1:9000/v1", ) self.assertEqual( formatter.format(warning_record), "WARNING trace logging enabled" ) def test_verbose_console_logging_shows_timestamp_and_level(self) -> None: formatter = ConsoleLogFormatter(verbose=True) record = logging.LogRecord( "deepseek_cursor_proxy", logging.INFO, __file__, 1, "listening on %s", ("http://127.0.0.1:9000/v1",), None, ) self.assertRegex( formatter.format(record), re.compile( r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3} INFO listening on " ), ) def test_terminal_spinner_animates_only_for_tty(self) -> None: tty = _FakeConsole(tty=True) spinner = TerminalSpinner( enabled=True, text="└ {frame}", stream=tty, interval=0.001 ).start() deadline = time.monotonic() + 0.2 while time.monotonic() < deadline and not tty.writes: time.sleep(0.001) spinner.stop() output = "".join(tty.writes) self.assertIn(TerminalSpinner.hide_cursor, output) self.assertIn("└ ⠋", output) self.assertIn(TerminalSpinner.show_cursor, output) self.assertTrue(output.endswith(TerminalSpinner.show_cursor)) non_tty = _FakeConsole(tty=False) TerminalSpinner( enabled=True, text="└ {frame}", stream=non_tty, interval=0.001 ).start().stop() self.assertEqual(non_tty.writes, []) def test_read_response_body_decodes_gzip_and_deflate(self) -> None: self.assertEqual( read_response_body(_FakeResponse(gzip.compress(b'{"ok":1}'), "gzip")), b'{"ok":1}', ) self.assertEqual( read_response_body(_FakeResponse(zlib.compress(b'{"ok":1}'), "deflate")), b'{"ok":1}', ) def test_summarize_chat_payload_omits_message_content(self) -> None: summary = summarize_chat_payload( { "model": "deepseek-v4-pro", "stream": True, "messages": [{"role": "user", "content": "secret prompt"}], "tools": [{"type": "function"}], "tool_choice": "auto", } ) self.assertIn("model='deepseek-v4-pro'", summary) self.assertIn("messages=1", summary) self.assertNotIn("secret prompt", summary) # --------------------------------------------------------------------------- # Client-disconnect / upstream-failure stubs (no real HTTP needed) # --------------------------------------------------------------------------- class HandlerStubTests(unittest.TestCase): def test_regular_response_handles_client_disconnect(self) -> None: handler = _make_handler_stub(_BrokenPipeWfile()) body = json.dumps( { "id": "x", "object": "chat.completion", "model": "deepseek-v4-pro", "choices": [ { "index": 0, "finish_reason": "stop", "message": {"role": "assistant", "content": "ok"}, } ], } ).encode("utf-8") try: with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: result = handler._proxy_regular_response( _FakeResponse(body), "deepseek-v4-pro", [{"role": "user", "content": "hi"}], "ns", ) finally: handler.server.reasoning_store.close() self.assertFalse(result.sent) self.assertIn("sending upstream response body", "\n".join(captured.output)) def test_streaming_response_stops_on_client_disconnect(self) -> None: handler = _make_handler_stub(_BrokenPipeWfile()) chunk = { "id": "stream", "model": "deepseek-v4-pro", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "hi"}}], } response = _FakeStreamingResponse( [ f"data: {json.dumps(chunk)}\n\n".encode("utf-8"), b"data: [DONE]\n\n", ] ) try: with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: result = handler._proxy_streaming_response( response, "deepseek-v4-pro", [{"role": "user", "content": "hi"}], "ns", ) finally: handler.server.reasoning_store.close() self.assertFalse(result.sent) self.assertEqual(response.readline_calls, 1) self.assertIn("sending streaming response chunk", "\n".join(captured.output)) def test_streaming_response_handles_upstream_read_failure(self) -> None: handler = _make_handler_stub(BytesIO()) try: with self.assertLogs("deepseek_cursor_proxy", level="WARNING") as captured: result = handler._proxy_streaming_response( _FailingStreamingResponse(), "deepseek-v4-pro", [{"role": "user", "content": "hi"}], "ns", ) finally: handler.server.reasoning_store.close() self.assertFalse(result.sent) self.assertIn( "upstream streaming response read failed", "\n".join(captured.output) ) def test_collapsible_reasoning_no_effect_when_display_disabled(self) -> None: wfile = BytesIO() handler = _make_handler_stub( wfile, display_reasoning=False, collapsible_reasoning=True ) chunk = { "id": "stream", "model": "deepseek-v4-pro", "choices": [{"index": 0, "delta": {"reasoning_content": "Need context."}}], } response = _FakeStreamingResponse( [ f"data: {json.dumps(chunk)}\n\n".encode("utf-8"), b"data: [DONE]\n\n", ] ) try: handler._proxy_streaming_response( response, "deepseek-v4-pro", [{"role": "user", "content": "hi"}], "ns", ) finally: handler.server.reasoning_store.close() body = wfile.getvalue().decode("utf-8") self.assertIn("reasoning_content", body) self.assertNotIn("
", body) # --------------------------------------------------------------------------- # HTTP-level boundary tests: real proxy + tiny upstream # --------------------------------------------------------------------------- class _PlainFakeUpstream(BaseHTTPRequestHandler): """Returns a fixed plain response and records every request.""" requests: list[dict[str, object]] = [] auth_headers: list[str] = [] delay_after_done: float = 0.0 response: dict[str, object] = {} def log_message(self, fmt: str, *args: object) -> None: return def do_POST(self) -> None: length = int(self.headers.get("Content-Length") or 0) payload = json.loads(self.rfile.read(length).decode("utf-8")) self.__class__.requests.append(payload) self.__class__.auth_headers.append(self.headers.get("Authorization", "")) if payload.get("stream"): self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.end_headers() self.wfile.write( b'data: {"choices":[{"index":0,"delta":{"content":"x"}}]}\n\n' ) self.wfile.write(b"data: [DONE]\n\n") self.wfile.flush() if self.__class__.delay_after_done: time.sleep(self.__class__.delay_after_done) return body = json.dumps(self.__class__.response).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) _BASE_RESPONSE: dict[str, object] = { "id": "x", "object": "chat.completion", "created": 1, "model": "deepseek-v4-pro", "choices": [ { "index": 0, "finish_reason": "stop", "message": {"role": "assistant", "content": "ok"}, } ], "usage": { "prompt_tokens": 20, "completion_tokens": 5, "total_tokens": 25, "prompt_cache_hit_tokens": 12, "prompt_cache_miss_tokens": 8, "completion_tokens_details": {"reasoning_tokens": 3}, }, } class _Fixture: def __init__(self, server: ThreadingHTTPServer) -> None: self.server = server self.thread = threading.Thread(target=server.serve_forever, daemon=True) self.thread.start() @property def url(self) -> str: host, port = self.server.server_address return f"http://{host}:{port}" def close(self) -> None: self.server.shutdown() self.server.server_close() self.thread.join(timeout=5) def _post(url: str, payload: dict, api_key: str = "sk-test") -> tuple[int, dict]: request = Request( url, data=json.dumps(payload).encode("utf-8"), method="POST", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, ) try: with urlopen(request, timeout=5) as response: return response.status, json.loads(response.read().decode("utf-8")) except HTTPError as exc: return exc.code, json.loads(exc.read().decode("utf-8")) class HttpBoundaryTests(unittest.TestCase): """Real-HTTP tests that don't fit the protocol suite: things the proxy must do at the HTTP boundary regardless of what DeepSeek answers.""" def setUp(self) -> None: _PlainFakeUpstream.requests = [] _PlainFakeUpstream.auth_headers = [] _PlainFakeUpstream.delay_after_done = 0.0 _PlainFakeUpstream.response = dict(_BASE_RESPONSE) self.upstream = _Fixture( ThreadingHTTPServer(("127.0.0.1", 0), _PlainFakeUpstream) ) self.store = ReasoningStore(":memory:") proxy = DeepSeekProxyServer(("127.0.0.1", 0), DeepSeekProxyHandler) proxy.config = ProxyConfig( upstream_base_url=self.upstream.url, upstream_model="deepseek-v4-pro", ngrok=False, ) proxy.reasoning_store = self.store self.proxy = _Fixture(proxy) def tearDown(self) -> None: self.proxy.close() self.upstream.close() self.store.close() def _request(self) -> dict: return { "model": "deepseek-v4-pro", "messages": [{"role": "user", "content": "hi"}], } def test_rejects_missing_bearer_token(self) -> None: request = Request( f"{self.proxy.url}/v1/chat/completions", data=json.dumps(self._request()).encode("utf-8"), method="POST", headers={"Content-Type": "application/json"}, ) with self.assertRaises(HTTPError) as caught: urlopen(request, timeout=5) self.assertEqual(caught.exception.code, 401) self.assertEqual(_PlainFakeUpstream.requests, []) def test_rejects_oversized_request_body(self) -> None: self.proxy.server.config = replace( self.proxy.server.config, max_request_body_bytes=10 ) status, payload = _post( f"{self.proxy.url}/v1/chat/completions", self._request() ) self.assertEqual(status, 413) self.assertIn("too large", payload["error"]["message"]) self.assertEqual(_PlainFakeUpstream.requests, []) def test_forwards_bearer_token_to_upstream(self) -> None: status, _ = _post( f"{self.proxy.url}/v1/chat/completions", self._request(), api_key="sk-from-cursor", ) self.assertEqual(status, 200) self.assertEqual(_PlainFakeUpstream.auth_headers[0], "Bearer sk-from-cursor") def test_streaming_response_closes_after_done_when_upstream_lingers( self, ) -> None: """Cursor relies on the proxy ending the SSE stream at [DONE], even if the upstream socket stays open.""" _PlainFakeUpstream.delay_after_done = 2.0 request = Request( f"{self.proxy.url}/v1/chat/completions", data=json.dumps( { "model": "deepseek-v4-pro", "stream": True, "messages": [{"role": "user", "content": "stream"}], } ).encode("utf-8"), method="POST", headers={ "Authorization": "Bearer sk-test", "Content-Type": "application/json", }, ) started = time.monotonic() with urlopen(request, timeout=1) as response: body = response.read().decode("utf-8") self.assertLess(time.monotonic() - started, 1.0) self.assertIn("data: [DONE]", body) def test_normal_logging_summarizes_without_bodies_or_keys(self) -> None: with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: status, _ = _post( f"{self.proxy.url}/v1/chat/completions", self._request(), api_key="sk-from-cursor", ) # `└ stats` is emitted on the handler thread *after* the response # body hits the socket, so the client may return before it lands. deadline = time.monotonic() + 2 while time.monotonic() < deadline and not any( "└ stats" in record for record in captured.output ): time.sleep(0.01) output = "\n".join(captured.output) self.assertEqual(status, 200) self.assertIn("┌ request model=deepseek-v4-pro effort=max messages=1", output) self.assertIn("├ context status=ok reasoning_context=0", output) self.assertIn("└ stats", output) self.assertNotIn(" tools=", output) self.assertNotIn("├ send", output) self.assertNotIn("hi", output.split("┌ request")[1].split("\n")[0]) self.assertNotIn("sk-from-cursor", output) def test_verbose_logging_includes_bodies_but_redacts_api_key(self) -> None: self.proxy.server.config = replace(self.proxy.server.config, verbose=True) with self.assertLogs("deepseek_cursor_proxy", level="INFO") as captured: _post( f"{self.proxy.url}/v1/chat/completions", self._request(), api_key="sk-from-cursor", ) output = "\n".join(captured.output) self.assertIn("cursor request body", output) self.assertIn("upstream request body", output) self.assertNotIn("sk-from-cursor", output) def test_healthz_returns_ok(self) -> None: with urlopen(f"{self.proxy.url}/healthz", timeout=2) as response: self.assertEqual(response.status, 200) self.assertEqual(json.loads(response.read())["ok"], True) if __name__ == "__main__": unittest.main()