From 750390f8624046dbbd9509d6ba42af957bf79215 Mon Sep 17 00:00:00 2001 From: dghc2023 <3053714263@qq.com> Date: Sat, 23 Nov 2024 14:27:50 +0000 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- account.py | 199 +++++++++++++++++++++++++++++++++++++++++++++ export_to_excel.py | 41 ++++++++++ main.py | 191 +++++++++++++++++++++++++------------------ proxy.py | 107 ++++++++++++++++++++++++ 4 files changed, 459 insertions(+), 79 deletions(-) create mode 100644 account.py create mode 100644 export_to_excel.py create mode 100644 proxy.py diff --git a/account.py b/account.py new file mode 100644 index 0000000..37e2a57 --- /dev/null +++ b/account.py @@ -0,0 +1,199 @@ +import sqlite3 +import pandas as pd +from contextlib import contextmanager +from dataclasses import dataclass +from typing import List, Optional + +@dataclass +class AccountData: + email: str # 邮箱 + original_password: str # 原密码 + original_aux_email: str # 原辅助邮箱 + new_password: str # 新密码 + new_aux_email: str # 新辅助邮箱 + change_status: str # 是否更改完成 + +class AccountManagerSQLite: + def __init__(self, db_path="accounts.db"): + self.db_path = db_path + self._initialize_db() + + def _initialize_db(self): + """初始化数据库结构""" + with self._get_connection() as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS accounts ( + email TEXT PRIMARY KEY, + original_password TEXT, + original_aux_email TEXT, + new_password TEXT, + new_aux_email TEXT, + change_status TEXT + ) + """) + + @contextmanager + def _get_connection(self): + """获取 SQLite 数据库连接""" + conn = sqlite3.connect(self.db_path) + try: + yield conn + finally: + conn.close() + + def clear(self): + """清空数据库中的所有数据""" + with self._get_connection() as conn: + try: + conn.execute("DELETE FROM accounts") + conn.commit() + print("数据库已清空。") + except sqlite3.Error as e: + conn.rollback() + print(f"清空数据库失败:{e}") + raise + + def import_data(self, account_list: List[AccountData]): + """批量导入数据""" + with self._get_connection() as conn: + try: + conn.executemany(""" + INSERT OR REPLACE INTO accounts ( + email, original_password, original_aux_email, + new_password, new_aux_email, change_status + ) VALUES (?, ?, ?, ?, ?, ?) + """, [ + ( + account.email, account.original_password, account.original_aux_email, + account.new_password, account.new_aux_email, account.change_status + ) for account in account_list + ]) + conn.commit() + except sqlite3.Error as e: + conn.rollback() + print(f"Error importing data: {e}") + raise + + def export_data(self) -> List[AccountData]: + """导出所有数据""" + with self._get_connection() as conn: + cursor = conn.execute("SELECT * FROM accounts") + rows = cursor.fetchall() + return [AccountData(*row) for row in rows] + + def query(self, **kwargs) -> List[AccountData]: + """查询数据""" + query = "SELECT * FROM accounts WHERE " + " AND ".join([f"{key} = ?" for key in kwargs]) + values = tuple(kwargs.values()) + + with self._get_connection() as conn: + cursor = conn.execute(query, values) + rows = cursor.fetchall() + return [AccountData(*row) for row in rows] + + def update_record(self, email: str, **kwargs): + """ + 更新记录的指定字段 + :param email: 要更新的记录的邮箱 + :param kwargs: 要更新的字段和值,键为字段名,值为更新的值 + """ + if not kwargs: + raise ValueError("没有指定任何更新的字段") + + # 动态生成 SQL 的 SET 子句 + set_clause = ", ".join([f"{key} = ?" for key in kwargs.keys()]) + values = list(kwargs.values()) + values.append(email) # 将 email 添加到参数列表的最后 + + query = f""" + UPDATE accounts + SET {set_clause} + WHERE email = ? + """ + + with self._get_connection() as conn: + try: + conn.execute(query, values) + conn.commit() + print(f"成功更新记录: {email}") + except sqlite3.Error as e: + conn.rollback() + print(f"更新记录失败:{e}") + raise + + + def delete_account(self, email: str): + """删除某个账户""" + with self._get_connection() as conn: + try: + conn.execute("DELETE FROM accounts WHERE email = ?", (email,)) + conn.commit() + except sqlite3.Error as e: + conn.rollback() + print(f"Error deleting account: {e}") + raise + + def import_from_excel(self, excel_path: str, clear_old: bool = False): + """ + 从 Excel 文件导入数据 + :param excel_path: Excel 文件路径 + :param clear_old: 是否清空旧数据 + """ + try: + # 如果 clear_old 为 True,先清空数据库 + if clear_old: + self.clear() + + # 读取 Excel 文件的第一个工作簿 + df = pd.read_excel(excel_path, sheet_name=0) + + # 校验表格格式 + required_columns = ["邮箱", "原密码", "原辅助邮箱", "新密码", "新辅助邮箱", "是否更改完成"] + if not all(col in df.columns for col in required_columns): + raise ValueError(f"表格缺少必要的列:{required_columns}") + + # 将数据转换为 AccountData 对象 + account_list = [ + AccountData( + email=row["邮箱"], + original_password=row["原密码"], + original_aux_email=row["原辅助邮箱"], + new_password=row["新密码"], + new_aux_email=row["新辅助邮箱"], + change_status=row["是否更改完成"] + ) + for _, row in df.iterrows() + ] + + # 批量导入数据到数据库 + self.import_data(account_list) + print(f"成功导入 {len(account_list)} 条数据!") + except Exception as e: + print(f"导入失败:{e}") + raise + + def export_to_excel(self, excel_path: str): + """ + 导出数据到 Excel 文件 + :param excel_path: Excel 文件路径 + """ + try: + # 从数据库中获取所有数据 + accounts = self.export_data() + + # 转换为 DataFrame + df = pd.DataFrame([{ + "邮箱": account.email, + "原密码": account.original_password, + "原辅助邮箱": account.original_aux_email, + "新密码": account.new_password, + "新辅助邮箱": account.new_aux_email, + "是否更改完成": account.change_status + } for account in accounts]) + + # 写入 Excel 文件 + df.to_excel(excel_path, index=False, sheet_name="Accounts") + print(f"成功导出数据到 {excel_path}!") + except Exception as e: + print(f"导出失败:{e}") + raise diff --git a/export_to_excel.py b/export_to_excel.py new file mode 100644 index 0000000..178e154 --- /dev/null +++ b/export_to_excel.py @@ -0,0 +1,41 @@ +import os +import datetime + +from account import AccountManagerSQLite + + +def export_database_to_excel(output_directory=".", file_name_prefix="exported_accounts"): + """ + 从数据库导出数据到 Excel 文件。 + + 参数: + output_directory (str): 输出文件夹路径,默认为当前目录。 + file_name_prefix (str): 文件名前缀,默认是 'exported_accounts'。 + """ + try: + # 确保输出目录存在 + if not os.path.exists(output_directory): + os.makedirs(output_directory) + + # 生成 Excel 文件名,包含时间戳 + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + file_name = f"{file_name_prefix}_{timestamp}.xlsx" + excel_path = os.path.join(output_directory, file_name) + + # 创建数据库管理实例并导出数据 + db_manager = AccountManagerSQLite() + db_manager.export_to_excel(excel_path) + + print(f"数据成功导出到: {excel_path}") + return excel_path + except Exception as e: + print(f"导出失败: {e}") + raise + +# 如果此文件作为主程序运行 +if __name__ == "__main__": + # 设置导出文件夹(可以修改为你想要的路径) + output_directory = "./exports" + + # 调用导出功能 + export_database_to_excel(output_directory) diff --git a/main.py b/main.py index fd4a71a..b3f7e55 100644 --- a/main.py +++ b/main.py @@ -4,7 +4,6 @@ import socket import sys import threading from multiprocessing import freeze_support -import pandas as pd from DrissionPage import ChromiumOptions, ChromiumPage from DrissionPage.errors import * import random @@ -25,7 +24,10 @@ import multiprocessing import tkinter as tk from tkinter import filedialog, messagebox, ttk import string -from multiprocessing import Lock, Queue, Pool +from multiprocessing import Queue, Pool +from account import AccountManagerSQLite +from typing import List, Tuple +from proxy import ProxyManager # Windows 消息常量 WM_MOUSEMOVE = 0x0200 @@ -725,26 +727,19 @@ def manage_loop_count(): # 返回更新后的计数 return count -# 用来标记账号是否完成 -def update_excel_status(file_path, sheet_name, row_index, status): - """ - 更新 Excel 中指定行的 F 列为给定状态。 - 参数: - file_path (str): Excel 文件路径。 - sheet_name (str): 要更新的工作表名称。 - row_index (int): 要更新的行号(从 1 开始)。 - status (str): 要写入 F 列的状态。 +# 用来标记账号是否完成 +def update_status_in_db(email: str, status: str): """ - wb = load_workbook(file_path) - # 检查工作表是否存在 - if sheet_name not in wb.sheetnames: - raise ValueError(f"Sheet '{sheet_name}' not found in the workbook.") - # 选择动态传入的工作表 - sheet = wb[sheet_name] - sheet.cell(row=row_index, column=6, value=status) # F 列是第 6 列 - wb.save(file_path) - print(f"第{row_index}行已经更改状态为:{status}") + 更新数据库中指定账户的状态。 + """ + db_manager = AccountManagerSQLite(db_path="accounts.db") + + # 更新数据库中对应账户的状态 + db_manager.update_record(email=email, change_status=status) + + # 输出日志信息 + print(f"账号 {email} 的状态已更新为:{status}") @@ -872,7 +867,7 @@ def logutGoogle(tab) -> bool: return False # 无用分支,不过为了避免函数太长回头修改时候忘记,还是写个 False 以防万一 -def main(email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password,row_index, file_path, sheet_name): +def main(email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, proxy_username, proxy_password,row_index, file_path): global browser, plugin_path, user_dir # 生成一个 6 位的随机数 global random_number @@ -1024,7 +1019,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") - update_excel_status(file_path, sheet_name,row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) @@ -1046,7 +1041,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") - update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, @@ -1060,9 +1055,9 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, # 看下是否出现了手机号 telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) if telephone: - update_excel_status(file_path, sheet_name,row_index, '接码') + update_status_in_db(file_path, '接码') browser.quit() - return + return False print("检查出现手机号运行完毕") save_log(random_number, "检查出现手机号运行完毕") @@ -1076,7 +1071,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到密码输入框的元素:{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") - update_excel_status(file_path, sheet_name,row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) @@ -1100,7 +1095,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") - update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, @@ -1115,7 +1110,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到密码输入框的元素:{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") - update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, @@ -1128,9 +1123,9 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, # 看下是否出现了手机号 telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) if telephone: - update_excel_status(file_path, sheet_name, row_index, '接码') + update_status_in_db(file_path, '接码') browser.quit() - return + return email_account print("检查手机号2运行完毕") save_log(random_number, "检查手机号2运行完毕") @@ -1148,15 +1143,15 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15) alternate_email_button = tab.ele('@text()=确认您的辅助邮箱', timeout=15) if password_input and not alternate_email_button: - update_excel_status(file_path, sheet_name, row_index, '被盗') - return + update_status_in_db(file_path, '被盗') + return email_account password_change = True print("密码已经被更改过") save_log(random_number, "密码已经被更改过") except ElementNotFoundError as e: print(f"找不到密码输入框的元素{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") - update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) @@ -1174,7 +1169,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到输入辅助邮箱的元素:{e}" ) save_log(random_number, f"找不到输入辅助邮箱的元素:{e}") - update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) @@ -1204,7 +1199,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, # 捕获并打印异常,不中断后续代码执行 print(f"修改辅助邮箱的时候找不到元素: {e}") save_log(random_number, f"修改辅助邮箱的时候找不到元素: {e}") - update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) @@ -1218,10 +1213,10 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, save_log(random_number, "输入辅助邮箱运行完毕") if password_change and auxiliary_email_account_change: - update_excel_status(file_path, sheet_name, row_index, '已更改') + update_status_in_db(file_path, '已更改') logutGoogle(tab) browser.quit() - return + return email_account tab.handle_alert(accept=True) @@ -1233,7 +1228,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到继续按钮的元素:{e}") save_log(random_number, f"找不到继续按钮的元素:{e}") - update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, @@ -1250,7 +1245,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到头像的元素:{e}") save_log(random_number, f"找不到继续按钮的元素:{e}") - update_excel_status(file_path,sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path,"请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) @@ -1269,7 +1264,7 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, except ElementNotFoundError as e: print(f"找不到修改密码的元素:{e}") save_log(random_number, f"找不到修改密码的元素:{e}") - update_excel_status(file_path,sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) @@ -1278,15 +1273,15 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, try: if auxiliary_email_account_change: - update_excel_status(file_path, sheet_name, row_index, '已更改') - return + update_status_in_db(file_path, '已更改') + return email_account # 修改辅助邮箱2 flag = modify_the_secondary_email2(tab, new_recovery_email, new_password, hwnd) save_log(random_number, f"辅助邮箱是否被修改:{flag}") if flag: browser.quit() - update_excel_status(file_path, sheet_name, row_index, '已更改') - return + update_status_in_db(file_path, '已更改') + return email_account else: sent_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(sent_time) @@ -1321,9 +1316,9 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, else: print("辅助邮箱账号已经更改完毕") save_log(random_number, "辅助邮箱账号已经更改完毕") - update_excel_status(file_path, sheet_name, row_index, '已更改') + update_status_in_db(file_path, '已更改') logutGoogle(tab) - return + return email_account count += 1 print(f"已运行 {count} 次") save_log(random_number, f"已运行 {count} 次") @@ -1340,20 +1335,20 @@ def main(email_account, email_password, old_recovery_email, new_recovery_email, elif count == 60: print("验证码超过四分钟没接收到,自动退出") save_log(random_number, "验证码超过四分钟没接收到,自动退出") - update_excel_status(file_path, sheet_name, row_index, '未更改(验证码没收到)') + update_status_in_db(file_path, '未更改(验证码没收到)') return tab.wait(5) # 每5秒循环一次 except ElementNotFoundError as e: print(f"更改辅助邮箱账号的时候找不到元素:{e}") save_log(random_number, f"更改辅助邮箱账号的时候找不到元素:{e}") - update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") + update_status_in_db(file_path, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) except Exception as e: print(f"出现未知错误:{e}") save_log(random_number, f"出现未知错误:{e}") - update_excel_status(file_path, sheet_name, row_index, f'出现未知错误:{e}请重试') + update_status_in_db(file_path, f'出现未知错误:{e}请重试') return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) @@ -1435,7 +1430,7 @@ def run_gui(): return # 初始化 Excel 数据和进度条状态 - all_rows = read_excel_data_for_main(file_path, sheet_name) # 读取 Excel 数据 + all_rows = read_data_from_db(file_path) # 读取 Excel 数据 total_tasks = len(all_rows) completed_count = 0 failed_count = 0 @@ -1448,7 +1443,7 @@ def run_gui(): ) stop_flag.clear() # 清除停止标志 - exec_thread = threading.Thread(target=parallel_execution, + exec_thread = threading.Thread(target=parallel_execution_with_db, args=(file_path, sheet_name, max_concurrency, max_retries)) exec_thread.daemon = True # 设置为守护线程 exec_thread.start() @@ -1464,14 +1459,14 @@ def run_gui(): root.destroy() # 销毁主窗口 sys.exit(0) # 强制退出程序 - def parallel_execution(file_path, sheet_name, max_concurrency, max_retries): + def parallel_execution_with_db(file_path, sheet_name, max_concurrency, max_retries): progress_queue = Queue() - all_rows = read_excel_data_for_main(file_path, sheet_name) + all_rows = read_data_from_db(file_path) failed_rows = [] total_rows = len(all_rows) completed_count = 0 failed_count = 0 - + failed_emails = set() # 用于存储唯一的失败邮箱 # 更新总条目数 progress_bar['maximum'] = total_rows @@ -1492,7 +1487,7 @@ def run_gui(): # 传递锁给子进程 result = pool.apply_async( - main, args=(*row, file_path, sheet_name), + main, args=(*row, file_path), callback=lambda result: progress_queue.put(result) ) results.append(result) @@ -1504,15 +1499,31 @@ def run_gui(): try: retry_row = result.get() print(f"main函数返回的值: {retry_row}") - if retry_row: # 如果返回需要重试的数据 - failed_rows.append(retry_row) - failed_count += 1 - else: - completed_count += 1 - progress_bar['value'] = completed_count # 更新进度条 + + if isinstance(retry_row, tuple): # 返回的是元组,表示需要重试 + email = retry_row[0] # 假设元组的第一个元素是邮箱地址 + if email not in failed_emails: # 检查是否是新的失败邮箱 + failed_emails.add(email) # 添加到失败集合 + failed_count += 1 # 增加失败计数 + failed_rows.append(retry_row) # 添加到重试列表 + lbl_progress_status.config( text=f"完成:{completed_count}/{total_rows},失败:{failed_count}" ) + else: # 返回的是单个 account,表示成功 + completed_count += 1 + + # 更新数据库中记录的状态 + email = retry_row # 假设 retry_row 是邮箱地址 + update_status_in_db(email, "已完成") + + progress_bar['value'] = completed_count # 更新进度条值(百分比) + lbl_progress_status.config( + text=f"完成:{completed_count}/{total_rows},失败:{failed_count}" + ) + + time.sleep(2) # 模拟耗时操作 + except Exception as e: print(f"任务执行时发生错误: {e}") failed_count += 1 @@ -1534,32 +1545,56 @@ def run_gui(): break else: print("所有任务已完成。") - messagebox.showinfo('所有任务已经完成') + messagebox.showinfo('运行结束', '所有任务已经完成') - def read_excel_data_for_main(file_path, sheet_name): - df = pd.read_excel(file_path, sheet_name=sheet_name) + def read_data_from_db(file_path: str) -> List[Tuple]: + """ + 从数据库读取数据,同时保持与旧的接口和逻辑一致。 + """ + # 初始化数据库管理器 + db_manager = AccountManagerSQLite(db_path="accounts.db") skip_keywords = {"已更改", "接码", "被盗"} data = [] + + # 创建代理管理器 + proxy_manager = ProxyManager() + + # 导入 Excel 数据到数据库,清空旧数据 + db_manager.import_from_excel(file_path, clear_old=True) + + # 从数据库中查询所有数据 + accounts = db_manager.export_data() + global total_tasks, completed_tasks, failed_tasks # 全局变量以更新状态 - completed_tasks = sum( - df["是否更改完成"].isin(skip_keywords) - ) # 统计已完成的 - failed_tasks = len(df) - completed_tasks - df["是否更改完成"].isna().sum() - total_tasks = len(df) - completed_tasks + completed_tasks = sum(account.change_status in skip_keywords for account in accounts) + failed_tasks = len(accounts) - completed_tasks - sum(account.change_status is None for account in accounts) + total_tasks = len(accounts) - completed_tasks - for i in range(len(df)): - if pd.isna(df.iloc[i, 0]): + for i, account in enumerate(accounts): + if not account.email: break - if pd.notna(df.iloc[i, 6]) and df.iloc[i, 5] not in skip_keywords and ':' in df.iloc[i, 6]: - proxy_parts = df.iloc[i, 6].split(':') - if len(proxy_parts) == 4: - proxy_host, proxy_port, proxy_user, proxy_pass = proxy_parts + if account.change_status not in skip_keywords: + print(f'测试是否是空的:{proxy_manager.is_empty()}') + print(proxy_manager.import_proxies('IP.txt')) + print(f'再测试是否是空的:{proxy_manager.is_empty()}') + + proxy = proxy_manager.get_random_proxy() + print(f"获取到的随机代理:{proxy}") + + if proxy: # 如果代理测试成功 + print( + f"分配的代理信息:Host: {proxy['host']}, Port: {proxy['port']}, User: {proxy['user']}, Password: {proxy['password']}") + proxy_host, proxy_port, proxy_user, proxy_pass = proxy['host'], proxy['port'], proxy['user'], proxy[ + 'password'] data.append(( - df.iloc[i, 0], df.iloc[i, 1], df.iloc[i, 2], - df.iloc[i, 4], df.iloc[i, 3], proxy_host, + account.email, account.original_password, account.original_aux_email, + account.new_password, account.new_aux_email, proxy_host, proxy_port, proxy_user, proxy_pass, i + 2 )) + else: + # 如果代理无效,则跳过该账号的代理分配 + print(f"代理测试失败,跳过账号 {account.email} 的代理分配。") return data root = tk.Tk() @@ -1600,8 +1635,6 @@ def run_gui(): btn_stop = tk.Button(root, text="停止处理", command=stop_processing) btn_stop.grid(row=7, column=0, columnspan=2, padx=20, pady=20, sticky="ew") - - root.protocol("WM_DELETE_WINDOW", on_closing) # 绑定窗口关闭事件 root.mainloop() diff --git a/proxy.py b/proxy.py new file mode 100644 index 0000000..6329c0c --- /dev/null +++ b/proxy.py @@ -0,0 +1,107 @@ +import random +import requests + +class ProxyManager: + def __init__(self): + self.proxies = [] + + def import_proxies(self, file_path): + """ + 导入代理列表,支持文件路径,格式为 host:port:user:password + """ + try: + with open(file_path, 'r') as file: + content = file.read().replace('\r\n', '\n') # 替换 Windows 风格换行符 + lines = content.strip().split('\n') + for line in lines: + parts = line.split(':') + if len(parts) == 4: # 确保格式正确 + proxy = { + 'host': parts[0], + 'port': parts[1], + 'user': parts[2], + 'password': parts[3], + 'protocol': 'http' + } + self.proxies.append(proxy) + except FileNotFoundError: + print(f"Error: File not found at {file_path}.") + return False + except Exception as e: + print(f"Error: {str(e)}") + return False + + return True + + def get_random_proxy(self): + """ + 随机获取一个代理 + """ + if not self.proxies: + print("No proxies available.") + return None + return random.choice(self.proxies) + + def test_proxy(self, proxy): + """ + 测试代理的对外 IP + :param proxy: 格式为 {'host': '...', 'port': '...', 'user': '...', 'password': '...', 'protocol': '...'} + """ + if not proxy: + print("Invalid proxy provided.") + return None + + proxy_url = f"{proxy['protocol']}://{proxy['user']}:{proxy['password']}@{proxy['host']}:{proxy['port']}" + proxies = {'http': proxy_url, 'https': proxy_url} + + try: + response = requests.get("http://jsonip.com", proxies=proxies, timeout=5) + if response.status_code == 200: + return response.json() + else: + print(f"Failed with status code: {response.status_code}") + return None + except requests.RequestException as e: + print(f"Request failed: {str(e)}") + return None + + def is_empty(self): + return not self.proxies + + def get_and_test_random_proxy(self): + """ + 从代理列表中随机获取一个代理,测试联通性,并从列表中移除 + """ + if not self.proxies: + print("No proxies available.") + return None + + proxy = random.choice(self.proxies) + test_result = self.test_proxy(proxy) + if test_result: + print(f"Proxy works: {test_result}") + self.proxies.remove(proxy) # 移除成功的代理 + return proxy, test_result + else: + print("Proxy failed. Removing from the list.") + self.proxies.remove(proxy) # 移除失败的代理 + return None, None + + +if __name__ == "__main__": + manager = ProxyManager() + print(f'测试是否是空的:{manager.is_empty()}') + print(manager.import_proxies('IP.txt')) + print(f'再测试是否是空的:{manager.is_empty()}') + + random_proxy = manager.get_random_proxy() + print(f"获取到的随机代理:{random_proxy}") + + test_result = manager.test_proxy(random_proxy) + print(f"随机代理的出口 IP:{test_result}") + + proxy, result = manager.get_and_test_random_proxy() + if result: + print(f"测试成功 : {result},取得的代理是:{proxy},这个代理已经从代理管理器里移除。") + else: + print("测试失败.")