import sqlite3 import random from contextlib import contextmanager from typing import List, Optional, Dict class ProxyManagerSQLite: def __init__(self, db_path="proxies.db", debug=True): self.db_path = db_path self.debug = debug self._initialize_db() def debug_print(self, *args): """仅在 debug 模式下输出调试信息""" if self.debug: print(*args) def _initialize_db(self): """初始化或检查数据库结构""" with self._get_connection() as conn: # 启用 WAL 模式 current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0] if current_mode != "wal": self.debug_print("切换到 WAL 模式...") conn.execute("PRAGMA journal_mode=WAL") # 检查表是否存在 cursor = conn.execute("PRAGMA table_info(proxies)") columns = [row[1] for row in cursor.fetchall()] required_columns = ["id", "host", "port", "user", "password", "protocol", "region"] if not columns: self.debug_print("表不存在,正在创建表...") elif columns != required_columns: self.debug_print(f"表结构不一致,当前列: {columns}, 期望列: {required_columns}") self.debug_print("正在重建表...") conn.execute("DROP TABLE IF EXISTS proxies") else: self.debug_print("表结构检查通过,无需更改。") return # 创建表 conn.execute(""" CREATE TABLE proxies ( id INTEGER PRIMARY KEY AUTOINCREMENT, host TEXT NOT NULL, port TEXT NOT NULL, user TEXT NOT NULL, password TEXT NOT NULL, protocol TEXT DEFAULT 'http', region TEXT NOT NULL ) """) self.debug_print("表已创建。") @contextmanager def _get_connection(self): """获取 SQLite 数据库连接""" conn = sqlite3.connect(self.db_path) try: yield conn finally: conn.close() def import_proxies_with_classifier(self, file_path: str, classifier, clear_before_import: bool = True): """ 从文件导入代理列表并分类 :param file_path: 文件路径,格式为 host:port:user:password :param classifier: 分类函数,接受代理行字符串,返回国家/地区代码 :param clear_before_import: 是否在导入前清空数据库,默认为 True """ # 根据 clear_before_import 参数决定是否清空数据库 if clear_before_import: self.clear() try: with open(file_path, "r") as file: lines = file.read().replace("\r\n", "\n").strip().split("\n") proxies = [] for line in lines: parts = line.split(":") if len(parts) == 4: proxy = { "host": parts[0], "port": parts[1], "user": parts[2], "password": parts[3], "region": classifier(line), "protocol": "http", } proxies.append(proxy) with self._get_connection() as conn: conn.executemany(""" INSERT INTO proxies (host, port, user, password, protocol, region) VALUES (:host, :port, :user, :password, :protocol, :region) """, proxies) conn.commit() self.debug_print(f"成功导入 {len(proxies)} 条代理数据!") except Exception as e: self.debug_print(f"Error importing proxies: {str(e)}") raise def get_random_proxy_by_region(self, region: Optional[str] = None, remove_after_fetch: bool = False) -> Optional[ Dict]: """ 随机获取代理,支持按区域筛选 :param region: 国家/地区代码,若为 None 则随机选择 :param remove_after_fetch: 是否从数据库中删除取出的代理 :return: 随机选取的代理字典或 None """ with self._get_connection() as conn: if region is None or region == "ALL": cursor = conn.execute("SELECT * FROM proxies ORDER BY RANDOM() LIMIT 1") else: cursor = conn.execute("SELECT * FROM proxies WHERE region = ? ORDER BY RANDOM() LIMIT 1", (region,)) proxy = cursor.fetchone() if proxy and remove_after_fetch: conn.execute("DELETE FROM proxies WHERE id = ?", (proxy[0],)) conn.commit() return dict(zip(["id", "host", "port", "user", "password", "protocol", "region"], proxy)) if proxy else None def get_proxy_count(self, region: Optional[str] = "ALL") -> int: """ 获取指定区域的代理数量 :param region: 区域代码,默认为 "ALL" :return: 代理数量 """ with self._get_connection() as conn: if region == "ALL": cursor = conn.execute("SELECT COUNT(*) FROM proxies") else: cursor = conn.execute("SELECT COUNT(*) FROM proxies WHERE region = ?", (region,)) return cursor.fetchone()[0] def get_summary(self) -> Dict[str, int]: """ 获取所有区域的代理统计数量 :return: 包含区域代码和代理数量的字典 """ with self._get_connection() as conn: cursor = conn.execute("SELECT region, COUNT(*) FROM proxies GROUP BY region") return {row[0]: row[1] for row in cursor.fetchall()} def export_proxies(self, file_path: str, serializer=None): """ 导出代理到文件 :param file_path: 文件路径 :param serializer: 可选的序列化函数,接受代理字典,返回字符串 """ if serializer is None: serializer = serializer_smartproxy # 使用默认的序列化器 try: with self._get_connection() as conn: cursor = conn.execute("SELECT host, port, user, password, protocol, region FROM proxies") proxies = cursor.fetchall() with open(file_path, "w") as file: for proxy in proxies: proxy_dict = dict(zip(["host", "port", "user", "password", "protocol", "region"], proxy)) try: line = serializer(proxy_dict) file.write(line + "\n") except Exception as e: self.debug_print(f"序列化失败: {e}") continue # 跳过错误的代理数据 self.debug_print(f"成功导出代理到 {file_path}!") except Exception as e: self.debug_print(f"Error exporting proxies: {str(e)}") raise def get_proxies(self, region: Optional[str] = None) -> List[Dict]: """ 获取指定区域的所有代理列表 :param region: 指定区域代码,如果为 None 或 "ALL",返回所有代理列表 :return: 包含代理信息的字典列表 """ with self._get_connection() as conn: if region is None or region == "ALL": cursor = conn.execute("SELECT * FROM proxies") else: cursor = conn.execute("SELECT * FROM proxies WHERE region = ?", (region,)) rows = cursor.fetchall() return [ dict(zip(["id", "host", "port", "user", "password", "protocol", "region"], row)) for row in rows ] def clear(self): """清空数据库中的所有数据""" with self._get_connection() as conn: conn.execute("DELETE FROM proxies") conn.commit() self.debug_print("数据库已清空。") def classifier_smartproxy(proxy_line): """ 从代理行中提取区域代码 区域代码格式: "_area-XX_", 提取 "XX" 部分作为区域代码。 :param proxy_line: 代理行字符串 :return: 区域代码 (如 "PL") 或 "OTHER" 如果提取失败 """ try: # 找到 "_area-" 的起始位置 start_index = proxy_line.find("_area-") if start_index == -1: return "OTHER" # 区域代码从 "_area-" 之后开始,到下一个 "_" 之前结束 start_index += len("_area-") end_index = proxy_line.find("_", start_index) if end_index == -1: return "OTHER" # 无法找到结束符,返回 "OTHER" # 提取区域代码并返回 region_code = proxy_line[start_index:end_index] return region_code.upper() # 返回大写的区域代码 except Exception as e: print(f"Error in region classification: {str(e)}") return "OTHER" def serializer_smartproxy(proxy: Dict) -> str: """ 默认的代理导出序列化函数 :param proxy: 代理字典 :return: 格式化后的字符串,格式为 host:port:user:password 没有区域信息的原因在于,smartproxy 的格式里user 字段就包含了区域信息 """ try: # 使用 "|" 分隔符标记区域,方便后续导入时解析 return f"{proxy['host']}:{proxy['port']}:{proxy['user']}:{proxy['password']}" except KeyError as e: raise ValueError(f"代理信息缺少必要字段: {e}") if __name__ == "__main__": manager = ProxyManagerSQLite() # 导入代理 manager.import_proxies_with_classifier("IP.txt", classifier=classifier_smartproxy) # 获取汇总统计 print("代理统计:", manager.get_summary()) # 获取随机代理 proxy = manager.get_random_proxy_by_region(region="PL", remove_after_fetch=True) print(f"取出的代理: {proxy}") # 获取代理数量 print("所有代理总数:", manager.get_proxy_count("ALL")) print("PL 区域代理数:", manager.get_proxy_count("PL")) print("PL 区域当前列表:", manager.get_proxies("PL")) print("目前所有的可用代理列表:", manager.get_proxies("ALL")) manager.export_proxies("剩下的可用IP.txt")