上传文件至 /

修改了一些bug,比如修改辅助邮箱的时候点不到,优化登录后直接跳转到设置等等
main
dghc2023 2024-12-01 06:03:41 +00:00
parent 7d7c5d9ead
commit 2930f7d935
3 changed files with 154 additions and 80 deletions

24
mail.py
View File

@ -80,7 +80,6 @@ def parse_email_date(date_str, default_tz=timezone.utc):
print(f"Warning: Failed to parse date '{date_str}': {e}") print(f"Warning: Failed to parse date '{date_str}': {e}")
return None return None
class EmailClient: class EmailClient:
def __init__(self, host, username, password): def __init__(self, host, username, password):
self.host = host self.host = host
@ -93,6 +92,21 @@ class EmailClient:
self.connection.login(self.username, self.password) self.connection.login(self.username, self.password)
self.connection.select("inbox") self.connection.select("inbox")
def ensure_connection(self):
"""
确保 IMAP 连接是活跃的如果断开或未连接则重连
"""
if self.connection is None:
print("Connection is not established. Reconnecting...")
self.connect() # 如果连接不存在,则进行连接
return
try:
self.connection.noop() # 发送 NOOP 命令以检查连接是否有效
except (imaplib.IMAP4.abort, imaplib.IMAP4.error):
print("Connection lost. Reconnecting...")
self.connect()
def disconnect(self): def disconnect(self):
if self.connection: if self.connection:
self.connection.logout() self.connection.logout()
@ -112,6 +126,7 @@ class EmailClient:
Raises: Raises:
Exception: 如果搜索失败 Exception: 如果搜索失败
""" """
self.ensure_connection() # 确保连接
result, data = self.connection.search(None, search_criteria) result, data = self.connection.search(None, search_criteria)
if result != "OK": if result != "OK":
raise Exception(f"Failed to search emails with criteria: {search_criteria}") raise Exception(f"Failed to search emails with criteria: {search_criteria}")
@ -139,6 +154,7 @@ class EmailClient:
返回 返回
list: 符合条件的邮件 ID 列表按接收时间倒序排列 list: 符合条件的邮件 ID 列表按接收时间倒序排列
""" """
self.ensure_connection() # 确保连接
search_criteria = build_search_criteria( search_criteria = build_search_criteria(
from_email=from_email, from_email=from_email,
subject=subject, subject=subject,
@ -168,6 +184,7 @@ class EmailClient:
keyword_regex = re.compile(keyword_pattern) if keyword_pattern else None keyword_regex = re.compile(keyword_pattern) if keyword_pattern else None
subject_regex = re.compile(subject_pattern) if subject_pattern else None subject_regex = re.compile(subject_pattern) if subject_pattern else None
all_matched_emails = [] all_matched_emails = []
self.ensure_connection() # 确保连接
for email_id in email_ids: for email_id in email_ids:
try: try:
@ -380,13 +397,14 @@ def codeTest():
password = "g1l2o0hld84" password = "g1l2o0hld84"
client = EmailClient(server, username, password) client = EmailClient(server, username, password)
client.connect() # client.connect()
code_receiver = GoogleCodeReceiver(client) code_receiver = GoogleCodeReceiver(client)
code = '' code = ''
try: try:
code = code_receiver.wait_code( code = code_receiver.wait_code(
username="RibeAchour875@gmail.com", timeout=2, interval=1, username="elenagrosu265@gmail.com", timeout=2, interval=1,
start_time=datetime(2024, 11, 10)) start_time=datetime(2024, 11, 10))
print(f"收到谷歌验证码: {code}") print(f"收到谷歌验证码: {code}")
except Exception as e: except Exception as e:

173
main.py
View File

@ -21,7 +21,7 @@ import string
from multiprocessing import Queue, Pool from multiprocessing import Queue, Pool
from account import AccountManagerSQLite from account import AccountManagerSQLite
from typing import List, Tuple from typing import List, Tuple
import win32gui
from mail import GoogleCodeReceiver, EmailClient from mail import GoogleCodeReceiver, EmailClient
from proxy import ProxyManagerSQLite, classifier_smartproxy from proxy import ProxyManagerSQLite, classifier_smartproxy
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@ -46,8 +46,6 @@ root_plugin_dir = os.path.join(os.getcwd(), 'proxy_auth_plugin')
proxy_manager = ProxyManagerSQLite(db_path="proxies.db") proxy_manager = ProxyManagerSQLite(db_path="proxies.db")
db_manager = AccountManagerSQLite(db_path="accounts.db") db_manager = AccountManagerSQLite(db_path="accounts.db")
proxy_manager.import_proxies_with_classifier( "Ip.txt",classifier=classifier_smartproxy)
def init_worker(): def init_worker():
global proxy_manager, db_manager global proxy_manager, db_manager
@ -72,6 +70,20 @@ ua_templates = [
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}" "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}"
] ]
# 自定义异常
class WindowMinimizedError(Exception):
def __init__(self, message="浏览器窗口已最小化,无法执行操作。"):
self.message = message
super().__init__(self.message)
class GoogleCaptchaError(Exception):
"""出现谷歌文字验证错误"""
pass
def is_window_minimized(hwnd):
"""检查窗口是否最小化"""
return win32gui.IsIconic(hwnd)
def generate_user_agent(): def generate_user_agent():
# 随机选择一个Chrome的UA模板 # 随机选择一个Chrome的UA模板
@ -92,7 +104,7 @@ def generate_user_agent():
return user_agent return user_agent
def random_sleep(tab, min_seconds=0, max_seconds=2): def random_sleep(tab, min_seconds=1, max_seconds=3):
"""在 min_seconds 和 max_seconds 之间随机停顿""" """在 min_seconds 和 max_seconds 之间随机停顿"""
tab.wait(random.uniform(min_seconds, max_seconds)) tab.wait(random.uniform(min_seconds, max_seconds))
@ -289,6 +301,9 @@ def input_email(tab, hwnd, email, email_input, max_retries=3):
:param email_input: 输入框对象 :param email_input: 输入框对象
:param max_retries: 最大尝试次数 :param max_retries: 最大尝试次数
""" """
if is_window_minimized(hwnd):
raise WindowMinimizedError("浏览器窗口已最小化,无法执行操作。")
email_input.clear() email_input.clear()
# 模拟人类输入,每次输入一个字符,并随机延迟 # 模拟人类输入,每次输入一个字符,并随机延迟
@ -327,12 +342,12 @@ def input_email(tab, hwnd, email, email_input, max_retries=3):
retry_count += 1 # 增加尝试次数 retry_count += 1 # 增加尝试次数
print(f"{retry_count} 次尝试失败,按钮未消失,重新尝试...") print(f"{retry_count} 次尝试失败,按钮未消失,重新尝试...")
if retry_count >= max_retries: if retry_count >= max_retries:
# 两次尝试都失败调整按钮大小为10px # 两次尝试都失败调整按钮大小为10px
next_input.set.style('width', '10px') next_input.set.style('width', '10px')
next_input.set.style('height', '10px') next_input.set.style('height', '10px')
print(f"尝试 {max_retries} 次仍未成功,调整按钮大小并退出。") print(f"尝试 {max_retries} 次仍未成功,调整按钮大小并退出。")
return False # 超过最大尝试次数后退出函数 raise GoogleCaptchaError("出现谷歌文字验证")
# 点击重试按钮 # 点击重试按钮
@ -341,6 +356,10 @@ def recover(tab, recover_a):
def input_password(tab, hwnd, password, password_input, max_retries=2): def input_password(tab, hwnd, password, password_input, max_retries=2):
# 检查窗口是否最小化
if is_window_minimized(hwnd):
raise WindowMinimizedError("浏览器窗口已最小化,无法执行操作。")
password_input.clear() password_input.clear()
""" """
输入密码并点击下一步按钮 输入密码并点击下一步按钮
@ -400,7 +419,7 @@ def input_password(tab, hwnd, password, password_input, max_retries=2):
next_input.set.style('width', '10px') next_input.set.style('width', '10px')
next_input.set.style('height', '10px') next_input.set.style('height', '10px')
print(f"尝试 {max_retries} 次仍未成功,调整按钮大小并退出。") print(f"尝试 {max_retries} 次仍未成功,调整按钮大小并退出。")
return False # 超过最大尝试次数后退出函数 raise GoogleCaptchaError("输入密码后无法点击下一步")
@ -448,11 +467,10 @@ def click_use_other_account_button2(tab, next_span):
# 修改辅助邮箱账号 # 修改辅助邮箱账号
def modify_the_secondary_email1(tab, auxiliary_email_account): def modify_the_secondary_email1(tab, button, auxiliary_email_account):
button = tab.ele('@@tag()=i@@text()=edit', timeout=15)
button.click(by_js=True) button.click(by_js=True)
tab.wait(2) tab.wait(2)
input = tab.ele('@type=email', timeout=15) input = button
input.clear() input.clear()
for char in auxiliary_email_account: for char in auxiliary_email_account:
input.input(char) # Enter one character at a time input.input(char) # Enter one character at a time
@ -701,7 +719,7 @@ def logutGoogle(tab) -> bool:
def main(email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, proxy_username, proxy_password,region): def main(email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, proxy_username, proxy_password,region):
update_status_in_db(email_account, '开始处理,没有改好') update_status_in_db(email_account, '初始化中')
print("邮箱账户:", email_account) # 邮箱账户 print("邮箱账户:", email_account) # 邮箱账户
print("邮箱密码:", email_password) # 邮箱密码 print("邮箱密码:", email_password) # 邮箱密码
@ -834,20 +852,26 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
click_the_login_button(tab, login_a) click_the_login_button(tab, login_a)
random_sleep(tab) random_sleep(tab)
tab.wait(3) login = tab.ele('@@tag()=span@@text()=登录', timeout=10)
if login:
random_sleep(tab)
# 将随机数设置为窗口标题
script = f"document.title = '{random_number}';"
# 执行 JS 脚本
tab.run_js(script)
print("登录运行完毕")
save_log(random_number, "已经进入登录界面")
else:
update_status_in_db(email_account, '登录超时')
return (
email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host,
proxy_port,
proxy_username, proxy_password, region)
random_sleep(tab)
# 将随机数设置为窗口标题
script = f"document.title = '{random_number}';"
# 执行 JS 脚本
tab.run_js(script)
actual_title = tab.run_js("return document.title;") actual_title = tab.run_js("return document.title;")
print("登录运行完毕")
save_log(random_number, "已经进入登录界面")
tab.wait(7)
save_log(random_number, f"设置标题为: {random_number}, 实际标题为: {actual_title}\n") save_log(random_number, f"设置标题为: {random_number}, 实际标题为: {actual_title}\n")
# 获取所有 Chrome 窗口 # 获取所有 Chrome 窗口
@ -876,7 +900,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
if email_input: if email_input:
flag = input_email(tab, hwnd, email_account, email_input) # 使用传入的邮箱账号 flag = input_email(tab, hwnd, email_account, email_input) # 使用传入的邮箱账号
if not flag: if not flag:
update_status_in_db(random_number, "输入完邮箱后点击下一步出错") update_status_in_db(random_number, "出现谷歌文字验证")
return ( return (
email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host,
proxy_port, proxy_port,
@ -992,18 +1016,6 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
print("如果出现错误,输入密码运行完毕") print("如果出现错误,输入密码运行完毕")
save_log(random_number, "如果出现错误后,输入密码运行完毕") save_log(random_number, "如果出现错误后,输入密码运行完毕")
# 看下是否出现了手机号
telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=5)
if telephone:
save_log(random_number, '接码')
update_status_in_db(email_account, '接码')
browser.quit()
return email_account
print("检查手机号2运行完毕")
save_log(random_number, "检查手机号2运行完毕")
# 确定密码是否被修改的开关 # 确定密码是否被修改的开关
password_change = False password_change = False
@ -1015,7 +1027,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
input_password(tab, hwnd, new_password, password_input) input_password(tab, hwnd, new_password, password_input)
save_log(random_number, f"输入新密码完毕") save_log(random_number, f"输入新密码完毕")
tab.wait(7) tab.wait(7)
if tab.wait.ele_deleted('#passwordNext', timeout=5) == False: if tab.wait.ele_deleted('#passwordNext', timeout=10) == False:
update_status_in_db(email_account, '被盗') update_status_in_db(email_account, '被盗')
return email_account return email_account
password_change = True password_change = True
@ -1033,10 +1045,31 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
print("输入新密码运行完毕") print("输入新密码运行完毕")
save_log(random_number, "输入新密码运行完毕") save_log(random_number, "输入新密码运行完毕")
# 看下是否出现了手机号
telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=5)
if telephone:
save_log(random_number, '接码')
update_status_in_db(email_account, '接码')
return email_account
print("检查手机号2运行完毕")
save_log(random_number, "检查手机号2运行完毕")
wrong = tab.ele('@@tag()=a@@text()=了解详情', timeout=5)
alternate_email_button = tab.ele('@text()=确认您的辅助邮箱', timeout=5)
if wrong and not alternate_email_button:
save_log(random_number, '出现账号状态异常')
update_status_in_db(email_account, '出现账号状态异常')
return (
email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host,
proxy_port,
proxy_username, proxy_password, region)
print("检查谷歌账号是否存在异常活动完毕")
save_log(random_number, "检查谷歌账号是否存在异常活动完毕")
try: try:
# 使用辅助邮箱验证
alternate_email_button = tab.ele('@text()=确认您的辅助邮箱', timeout=15)
if alternate_email_button: if alternate_email_button:
click_alternate_email_verification_button(tab, alternate_email_button) click_alternate_email_verification_button(tab, alternate_email_button)
random_sleep(tab) random_sleep(tab)
@ -1067,10 +1100,11 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
print("我要开始修改辅助邮箱账号了") print("我要开始修改辅助邮箱账号了")
try: try:
button = tab.ele('@@tag()=i@@text()=edit', timeout=15) button = tab.ele('@type=email', timeout=15)
if button: if button:
modify_the_secondary_email1(tab, new_recovery_email) # 使用传入的新辅助邮箱 modify_the_secondary_email1(tab,button, new_recovery_email) # 使用传入的新辅助邮箱
print("修改完成") print("修改完成")
auxiliary_email_account_change = True
else: else:
print("没有修改辅助邮箱的界面,跳过") print("没有修改辅助邮箱的界面,跳过")
# auxiliary_email_account_change = True # auxiliary_email_account_change = True
@ -1094,36 +1128,35 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
if password_change and auxiliary_email_account_change: if password_change and auxiliary_email_account_change:
logutGoogle(tab) logutGoogle(tab)
tab.wait(3)
update_status_in_db(email_account, '已更改') update_status_in_db(email_account, '已更改')
return email_account return email_account
tab.handle_alert(accept=True) tab.handle_alert(accept=True)
# 点击继续按钮 # # 点击继续按钮
continue_div1 = tab.ele('@text()=Continue with smart features', timeout=15) # continue_div1 = tab.ele('@text()=Continue with smart features', timeout=15)
if continue_div1: # if continue_div1:
try: # try:
click_continue_button(tab, continue_div1) # click_continue_button(tab, continue_div1)
except ElementNotFoundError as e: # except ElementNotFoundError as e:
print(f"找不到继续按钮的元素:{e}") # print(f"找不到继续按钮的元素:{e}")
save_log(random_number, f"找不到继续按钮的元素:{e}") # save_log(random_number, f"找不到继续按钮的元素:{e}")
update_status_in_db(email_account, "请求错误,请重试") # update_status_in_db(email_account, "请求错误,请重试")
return ( # return (
email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, # email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host,
proxy_port, # proxy_port,
proxy_username, proxy_password, region) # proxy_username, proxy_password, region)
try: try:
# 点击头像进入邮箱的安全设置 # 安全设置
tab.handle_alert(accept=True) tab.handle_alert(accept=True)
tab.get("https://ogs.google.com/u/0/widget/account?cn=account") tab.get("https://ogs.google.com/u/0/widget/account?cn=account")
tab.handle_alert(accept=True) tab.handle_alert(accept=True)
tab.get("https://myaccount.google.com/security?gar=WzEyMF0") tab.get("https://myaccount.google.com/security?gar=WzEyMF0")
tab.handle_alert(accept=True)
except ElementNotFoundError as e: except ElementNotFoundError as e:
print(f"找不到头像的元素:{e}") save_log(random_number, f"进入安全设置后出错:{e}")
save_log(random_number, f"找不到继续按钮的元素:{e}") update_status_in_db(email_account,"进入安全设置的时候请求错误,请重试")
update_status_in_db(email_account,"请求错误,请重试")
return ( return (
email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host,
proxy_port, proxy_port,
@ -1131,6 +1164,16 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
tab.wait(3) tab.wait(3)
wrong = tab.ele("@text()=要查看和调整您的安全设置并获取有助于确保您账号安全的建议,请登录您的账号", timeout=5)
if wrong:
print("进入安全设置后仍然未登录")
save_log(random_number, '进入安全设置后仍然未登录')
update_status_in_db(email_account, '登录失败')
return (
email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host,
proxy_port,
proxy_username, proxy_password, region)
try: try:
# 如果密码没被修改 # 如果密码没被修改
if not password_change: if not password_change:
@ -1154,6 +1197,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
try: try:
if auxiliary_email_account_change: if auxiliary_email_account_change:
logutGoogle(tab) logutGoogle(tab)
tab.wait(3)
update_status_in_db(email_account, '已更改') update_status_in_db(email_account, '已更改')
return email_account return email_account
# 修改辅助邮箱2 # 修改辅助邮箱2
@ -1161,6 +1205,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
save_log(random_number, f"辅助邮箱是否被修改:{flag}") save_log(random_number, f"辅助邮箱是否被修改:{flag}")
if flag: if flag:
logutGoogle(tab) logutGoogle(tab)
tab.wait(3)
update_status_in_db(email_account, '已更改') update_status_in_db(email_account, '已更改')
return email_account return email_account
else: else:
@ -1222,8 +1267,10 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
print("辅助邮箱账号已经更改完毕") print("辅助邮箱账号已经更改完毕")
save_log(random_number, "辅助邮箱账号已经更改完毕") save_log(random_number, "辅助邮箱账号已经更改完毕")
logutGoogle(tab) logutGoogle(tab)
tab.wait(3)
update_status_in_db(email_account, "已更改") update_status_in_db(email_account, "已更改")
break return email_account
else: else:
# 如果未找到验证码,点击重新发送按钮并进入下一次循环 # 如果未找到验证码,点击重新发送按钮并进入下一次循环
print("未找到验证码,点击重新发送按钮") print("未找到验证码,点击重新发送按钮")
@ -1390,6 +1437,8 @@ def run_gui():
return return
db_manager.import_from_excel(file_path, clear_old=True) db_manager.import_from_excel(file_path, clear_old=True)
proxy_manager.import_proxies_with_classifier("Ip.txt", classifier=classifier_smartproxy)
# 初始化 Excel 数据和进度条状态 # 初始化 Excel 数据和进度条状态
all_rows = read_data_from_db() # 读取 Excel 数据 all_rows = read_data_from_db() # 读取 Excel 数据
total_tasks = len(all_rows) total_tasks = len(all_rows)

View File

@ -5,7 +5,7 @@ from typing import List, Optional, Dict
class ProxyManagerSQLite: class ProxyManagerSQLite:
def __init__(self, db_path="proxies.db", debug=False): def __init__(self, db_path="proxies.db", debug=True):
self.db_path = db_path self.db_path = db_path
self.debug = debug self.debug = debug
self._initialize_db() self._initialize_db()
@ -27,8 +27,8 @@ class ProxyManagerSQLite:
# 检查表是否存在 # 检查表是否存在
cursor = conn.execute("PRAGMA table_info(proxies)") cursor = conn.execute("PRAGMA table_info(proxies)")
columns = [row[1] for row in cursor.fetchall()] columns = [row[1] for row in cursor.fetchall()]
required_columns = ["host", "port", "user", "password", "protocol", "region"] required_columns = ["id", "host", "port", "user", "password", "protocol", "region"]
if not columns: if not columns:
self.debug_print("表不存在,正在创建表...") self.debug_print("表不存在,正在创建表...")
@ -63,16 +63,21 @@ class ProxyManagerSQLite:
finally: finally:
conn.close() conn.close()
def import_proxies_with_classifier(self, file_path: str, classifier): def import_proxies_with_classifier(self, file_path: str, classifier, clear_before_import: bool = True):
""" """
从文件导入代理列表并分类 从文件导入代理列表并分类
:param file_path: 文件路径格式为 host:port:user:password :param file_path: 文件路径格式为 host:port:user:password
:param classifier: 分类函数接受代理行字符串返回国家/地区代码 :param classifier: 分类函数接受代理行字符串返回国家/地区代码
:param clear_before_import: 是否在导入前清空数据库默认为 True
""" """
# 根据 clear_before_import 参数决定是否清空数据库
if clear_before_import:
self.clear()
try: try:
with open(file_path, "r") as file: with open(file_path, "r") as file:
lines = file.read().replace("\r\n", "\n").strip().split("\n") lines = file.read().replace("\r\n", "\n").strip().split("\n")
proxies = [] proxies = []
for line in lines: for line in lines:
parts = line.split(":") parts = line.split(":")
@ -99,7 +104,8 @@ class ProxyManagerSQLite:
self.debug_print(f"Error importing proxies: {str(e)}") self.debug_print(f"Error importing proxies: {str(e)}")
raise raise
def get_random_proxy_by_region(self, region: Optional[str] = None, remove_after_fetch: bool = False) -> Optional[Dict]: def get_random_proxy_by_region(self, region: Optional[str] = None, remove_after_fetch: bool = False) -> Optional[
Dict]:
""" """
随机获取代理支持按区域筛选 随机获取代理支持按区域筛选
:param region: 国家/地区代码若为 None 则随机选择 :param region: 国家/地区代码若为 None 则随机选择
@ -111,7 +117,7 @@ class ProxyManagerSQLite:
cursor = conn.execute("SELECT * FROM proxies ORDER BY RANDOM() LIMIT 1") cursor = conn.execute("SELECT * FROM proxies ORDER BY RANDOM() LIMIT 1")
else: else:
cursor = conn.execute("SELECT * FROM proxies WHERE region = ? ORDER BY RANDOM() LIMIT 1", (region,)) cursor = conn.execute("SELECT * FROM proxies WHERE region = ? ORDER BY RANDOM() LIMIT 1", (region,))
proxy = cursor.fetchone() proxy = cursor.fetchone()
if proxy and remove_after_fetch: if proxy and remove_after_fetch:
conn.execute("DELETE FROM proxies WHERE id = ?", (proxy[0],)) conn.execute("DELETE FROM proxies WHERE id = ?", (proxy[0],))
@ -164,7 +170,7 @@ class ProxyManagerSQLite:
except Exception as e: except Exception as e:
self.debug_print(f"序列化失败: {e}") self.debug_print(f"序列化失败: {e}")
continue # 跳过错误的代理数据 continue # 跳过错误的代理数据
self.debug_print(f"成功导出代理到 {file_path}") self.debug_print(f"成功导出代理到 {file_path}")
except Exception as e: except Exception as e:
self.debug_print(f"Error exporting proxies: {str(e)}") self.debug_print(f"Error exporting proxies: {str(e)}")
@ -181,7 +187,7 @@ class ProxyManagerSQLite:
cursor = conn.execute("SELECT * FROM proxies") cursor = conn.execute("SELECT * FROM proxies")
else: else:
cursor = conn.execute("SELECT * FROM proxies WHERE region = ?", (region,)) cursor = conn.execute("SELECT * FROM proxies WHERE region = ?", (region,))
rows = cursor.fetchall() rows = cursor.fetchall()
return [ return [
dict(zip(["id", "host", "port", "user", "password", "protocol", "region"], row)) dict(zip(["id", "host", "port", "user", "password", "protocol", "region"], row))
@ -195,6 +201,7 @@ class ProxyManagerSQLite:
conn.commit() conn.commit()
self.debug_print("数据库已清空。") self.debug_print("数据库已清空。")
def classifier_smartproxy(proxy_line): def classifier_smartproxy(proxy_line):
""" """
从代理行中提取区域代码 从代理行中提取区域代码
@ -207,20 +214,21 @@ def classifier_smartproxy(proxy_line):
start_index = proxy_line.find("_area-") start_index = proxy_line.find("_area-")
if start_index == -1: if start_index == -1:
return "OTHER" return "OTHER"
# 区域代码从 "_area-" 之后开始,到下一个 "_" 之前结束 # 区域代码从 "_area-" 之后开始,到下一个 "_" 之前结束
start_index += len("_area-") start_index += len("_area-")
end_index = proxy_line.find("_", start_index) end_index = proxy_line.find("_", start_index)
if end_index == -1: if end_index == -1:
return "OTHER" # 无法找到结束符,返回 "OTHER" return "OTHER" # 无法找到结束符,返回 "OTHER"
# 提取区域代码并返回 # 提取区域代码并返回
region_code = proxy_line[start_index:end_index] region_code = proxy_line[start_index:end_index]
return region_code.upper() # 返回大写的区域代码 return region_code.upper() # 返回大写的区域代码
except Exception as e: except Exception as e:
print(f"Error in region classification: {str(e)}") print(f"Error in region classification: {str(e)}")
return "OTHER" return "OTHER"
def serializer_smartproxy(proxy: Dict) -> str: def serializer_smartproxy(proxy: Dict) -> str:
""" """
默认的代理导出序列化函数 默认的代理导出序列化函数
@ -250,9 +258,8 @@ if __name__ == "__main__":
print("所有代理总数:", manager.get_proxy_count("ALL")) print("所有代理总数:", manager.get_proxy_count("ALL"))
print("PL 区域代理数:", manager.get_proxy_count("PL")) print("PL 区域代理数:", manager.get_proxy_count("PL"))
print("PL 区域当前列表:",manager.get_proxies("PL")) print("PL 区域当前列表:", manager.get_proxies("PL"))
print("目前所有的可用代理列表:",manager.get_proxies("ALL")) print("目前所有的可用代理列表:", manager.get_proxies("ALL"))
manager.export_proxies("剩下的可用IP.txt") manager.export_proxies("剩下的可用IP.txt")