Gmail/proxy.py

266 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sqlite3
import random
from contextlib import contextmanager
from typing import List, Optional, Dict
class ProxyManagerSQLite:
def __init__(self, db_path="proxies.db", debug=True):
self.db_path = db_path
self.debug = debug
self._initialize_db()
def debug_print(self, *args):
"""仅在 debug 模式下输出调试信息"""
if self.debug:
print(*args)
def _initialize_db(self):
"""初始化或检查数据库结构"""
with self._get_connection() as conn:
# 启用 WAL 模式
current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
if current_mode != "wal":
self.debug_print("切换到 WAL 模式...")
conn.execute("PRAGMA journal_mode=WAL")
# 检查表是否存在
cursor = conn.execute("PRAGMA table_info(proxies)")
columns = [row[1] for row in cursor.fetchall()]
required_columns = ["id", "host", "port", "user", "password", "protocol", "region"]
if not columns:
self.debug_print("表不存在,正在创建表...")
elif columns != required_columns:
self.debug_print(f"表结构不一致,当前列: {columns}, 期望列: {required_columns}")
self.debug_print("正在重建表...")
conn.execute("DROP TABLE IF EXISTS proxies")
else:
self.debug_print("表结构检查通过,无需更改。")
return
# 创建表
conn.execute("""
CREATE TABLE proxies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
host TEXT NOT NULL,
port TEXT NOT NULL,
user TEXT NOT NULL,
password TEXT NOT NULL,
protocol TEXT DEFAULT 'http',
region TEXT NOT NULL
)
""")
self.debug_print("表已创建。")
@contextmanager
def _get_connection(self):
"""获取 SQLite 数据库连接"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
finally:
conn.close()
def import_proxies_with_classifier(self, file_path: str, classifier, clear_before_import: bool = True):
"""
从文件导入代理列表并分类
:param file_path: 文件路径,格式为 host:port:user:password
:param classifier: 分类函数,接受代理行字符串,返回国家/地区代码
:param clear_before_import: 是否在导入前清空数据库,默认为 True
"""
# 根据 clear_before_import 参数决定是否清空数据库
if clear_before_import:
self.clear()
try:
with open(file_path, "r") as file:
lines = file.read().replace("\r\n", "\n").strip().split("\n")
proxies = []
for line in lines:
parts = line.split(":")
if len(parts) == 4:
proxy = {
"host": parts[0],
"port": parts[1],
"user": parts[2],
"password": parts[3],
"region": classifier(line),
"protocol": "http",
}
proxies.append(proxy)
with self._get_connection() as conn:
conn.executemany("""
INSERT INTO proxies (host, port, user, password, protocol, region)
VALUES (:host, :port, :user, :password, :protocol, :region)
""", proxies)
conn.commit()
self.debug_print(f"成功导入 {len(proxies)} 条代理数据!")
except Exception as e:
self.debug_print(f"Error importing proxies: {str(e)}")
raise
def get_random_proxy_by_region(self, region: Optional[str] = None, remove_after_fetch: bool = False) -> Optional[
Dict]:
"""
随机获取代理,支持按区域筛选
:param region: 国家/地区代码,若为 None 则随机选择
:param remove_after_fetch: 是否从数据库中删除取出的代理
:return: 随机选取的代理字典或 None
"""
with self._get_connection() as conn:
if region is None or region == "ALL":
cursor = conn.execute("SELECT * FROM proxies ORDER BY RANDOM() LIMIT 1")
else:
cursor = conn.execute("SELECT * FROM proxies WHERE region = ? ORDER BY RANDOM() LIMIT 1", (region,))
proxy = cursor.fetchone()
if proxy and remove_after_fetch:
conn.execute("DELETE FROM proxies WHERE id = ?", (proxy[0],))
conn.commit()
return dict(zip(["id", "host", "port", "user", "password", "protocol", "region"], proxy)) if proxy else None
def get_proxy_count(self, region: Optional[str] = "ALL") -> int:
"""
获取指定区域的代理数量
:param region: 区域代码,默认为 "ALL"
:return: 代理数量
"""
with self._get_connection() as conn:
if region == "ALL":
cursor = conn.execute("SELECT COUNT(*) FROM proxies")
else:
cursor = conn.execute("SELECT COUNT(*) FROM proxies WHERE region = ?", (region,))
return cursor.fetchone()[0]
def get_summary(self) -> Dict[str, int]:
"""
获取所有区域的代理统计数量
:return: 包含区域代码和代理数量的字典
"""
with self._get_connection() as conn:
cursor = conn.execute("SELECT region, COUNT(*) FROM proxies GROUP BY region")
return {row[0]: row[1] for row in cursor.fetchall()}
def export_proxies(self, file_path: str, serializer=None):
"""
导出代理到文件
:param file_path: 文件路径
:param serializer: 可选的序列化函数,接受代理字典,返回字符串
"""
if serializer is None:
serializer = serializer_smartproxy # 使用默认的序列化器
try:
with self._get_connection() as conn:
cursor = conn.execute("SELECT host, port, user, password, protocol, region FROM proxies")
proxies = cursor.fetchall()
with open(file_path, "w") as file:
for proxy in proxies:
proxy_dict = dict(zip(["host", "port", "user", "password", "protocol", "region"], proxy))
try:
line = serializer(proxy_dict)
file.write(line + "\n")
except Exception as e:
self.debug_print(f"序列化失败: {e}")
continue # 跳过错误的代理数据
self.debug_print(f"成功导出代理到 {file_path}")
except Exception as e:
self.debug_print(f"Error exporting proxies: {str(e)}")
raise
def get_proxies(self, region: Optional[str] = None) -> List[Dict]:
"""
获取指定区域的所有代理列表
:param region: 指定区域代码,如果为 None 或 "ALL",返回所有代理列表
:return: 包含代理信息的字典列表
"""
with self._get_connection() as conn:
if region is None or region == "ALL":
cursor = conn.execute("SELECT * FROM proxies")
else:
cursor = conn.execute("SELECT * FROM proxies WHERE region = ?", (region,))
rows = cursor.fetchall()
return [
dict(zip(["id", "host", "port", "user", "password", "protocol", "region"], row))
for row in rows
]
def clear(self):
"""清空数据库中的所有数据"""
with self._get_connection() as conn:
conn.execute("DELETE FROM proxies")
conn.commit()
self.debug_print("数据库已清空。")
def classifier_smartproxy(proxy_line):
"""
从代理行中提取区域代码
区域代码格式: "_area-XX_", 提取 "XX" 部分作为区域代码。
:param proxy_line: 代理行字符串
:return: 区域代码 (如 "PL") 或 "OTHER" 如果提取失败
"""
try:
# 找到 "_area-" 的起始位置
start_index = proxy_line.find("_area-")
if start_index == -1:
return "OTHER"
# 区域代码从 "_area-" 之后开始,到下一个 "_" 之前结束
start_index += len("_area-")
end_index = proxy_line.find("_", start_index)
if end_index == -1:
return "OTHER" # 无法找到结束符,返回 "OTHER"
# 提取区域代码并返回
region_code = proxy_line[start_index:end_index]
return region_code.upper() # 返回大写的区域代码
except Exception as e:
print(f"Error in region classification: {str(e)}")
return "OTHER"
def serializer_smartproxy(proxy: Dict) -> str:
"""
默认的代理导出序列化函数
:param proxy: 代理字典
:return: 格式化后的字符串,格式为 host:port:user:password 没有区域信息的原因在于smartproxy 的格式里user 字段就包含了区域信息
"""
try:
# 使用 "|" 分隔符标记区域,方便后续导入时解析
return f"{proxy['host']}:{proxy['port']}:{proxy['user']}:{proxy['password']}"
except KeyError as e:
raise ValueError(f"代理信息缺少必要字段: {e}")
if __name__ == "__main__":
manager = ProxyManagerSQLite()
# 导入代理
manager.import_proxies_with_classifier("IP.txt", classifier=classifier_smartproxy)
# 获取汇总统计
print("代理统计:", manager.get_summary())
# 获取随机代理
proxy = manager.get_random_proxy_by_region(region="PL", remove_after_fetch=True)
print(f"取出的代理: {proxy}")
# 获取代理数量
print("所有代理总数:", manager.get_proxy_count("ALL"))
print("PL 区域代理数:", manager.get_proxy_count("PL"))
print("PL 区域当前列表:", manager.get_proxies("PL"))
print("目前所有的可用代理列表:", manager.get_proxies("ALL"))
manager.export_proxies("剩下的可用IP.txt")