From e6842c6d7f270424256e6e865088f5d2fae707e3 Mon Sep 17 00:00:00 2001 From: dghc2023 <3053714263@qq.com> Date: Tue, 26 Nov 2024 16:30:00 +0000 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- account.py | 127 ++++++---- mail.py | 434 ++++++++++++++++++++++++++++++++++ main.py | 677 +++++++++++++++++++++-------------------------------- proxy.py | 335 ++++++++++++++++++-------- 4 files changed, 1027 insertions(+), 546 deletions(-) create mode 100644 mail.py diff --git a/account.py b/account.py index 37e2a57..deb7191 100644 --- a/account.py +++ b/account.py @@ -12,25 +12,63 @@ class AccountData: new_password: str # 新密码 new_aux_email: str # 新辅助邮箱 change_status: str # 是否更改完成 + proxy: str # 代理 + region: Optional[str] = None # 区域(可选) class AccountManagerSQLite: - def __init__(self, db_path="accounts.db"): + def __init__(self, db_path="accounts.db", debug=False): self.db_path = db_path + self.debug = debug self._initialize_db() + def debug_print(self, *args): + """仅在 debug 模式下输出调试信息""" + if self.debug: + print(*args) + def _initialize_db(self): - """初始化数据库结构""" + """初始化或检查数据库结构""" with self._get_connection() as conn: + # 启用 WAL 模式 + current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0] + if current_mode != "wal": + self.debug_print("切换到 WAL 模式...") + conn.execute("PRAGMA journal_mode=WAL") + + # 检查表是否存在 + cursor = conn.execute("PRAGMA table_info(accounts)") + columns = [row[1] for row in cursor.fetchall()] + + # 定义表结构 + required_columns = [ + "email", "original_password", "original_aux_email", + "new_password", "new_aux_email", "change_status", "proxy", "region" + ] + + if not columns: # 表不存在 + self.debug_print("表不存在,正在创建表...") + elif columns != required_columns: # 表存在但结构不一致 + self.debug_print(f"表结构不一致,当前列: {columns}, 期望列: {required_columns}") + self.debug_print("正在重建表...") + conn.execute("DROP TABLE IF EXISTS accounts") + else: # 表存在且结构一致 + self.debug_print("表结构检查通过,无需更改。") + return + + # 创建表 conn.execute(""" - CREATE TABLE IF NOT EXISTS accounts ( + CREATE TABLE accounts ( email TEXT PRIMARY KEY, original_password TEXT, original_aux_email TEXT, new_password TEXT, new_aux_email TEXT, - change_status TEXT + change_status TEXT, + proxy TEXT, + region TEXT ) """) + self.debug_print("表已创建。") @contextmanager def _get_connection(self): @@ -47,10 +85,10 @@ class AccountManagerSQLite: try: conn.execute("DELETE FROM accounts") conn.commit() - print("数据库已清空。") + self.debug_print("数据库已清空。") except sqlite3.Error as e: conn.rollback() - print(f"清空数据库失败:{e}") + self.debug_print(f"清空数据库失败:{e}") raise def import_data(self, account_list: List[AccountData]): @@ -60,18 +98,20 @@ class AccountManagerSQLite: conn.executemany(""" INSERT OR REPLACE INTO accounts ( email, original_password, original_aux_email, - new_password, new_aux_email, change_status - ) VALUES (?, ?, ?, ?, ?, ?) + new_password, new_aux_email, change_status, proxy, region + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, [ ( account.email, account.original_password, account.original_aux_email, - account.new_password, account.new_aux_email, account.change_status + account.new_password, account.new_aux_email, account.change_status, + account.proxy, account.region ) for account in account_list ]) conn.commit() + self.debug_print(f"成功导入 {len(account_list)} 条数据!") except sqlite3.Error as e: conn.rollback() - print(f"Error importing data: {e}") + self.debug_print(f"Error importing data: {e}") raise def export_data(self) -> List[AccountData]: @@ -92,11 +132,7 @@ class AccountManagerSQLite: return [AccountData(*row) for row in rows] def update_record(self, email: str, **kwargs): - """ - 更新记录的指定字段 - :param email: 要更新的记录的邮箱 - :param kwargs: 要更新的字段和值,键为字段名,值为更新的值 - """ + """更新记录的指定字段""" if not kwargs: raise ValueError("没有指定任何更新的字段") @@ -115,44 +151,50 @@ class AccountManagerSQLite: try: conn.execute(query, values) conn.commit() - print(f"成功更新记录: {email}") + self.debug_print(f"成功更新记录: {email}") except sqlite3.Error as e: conn.rollback() - print(f"更新记录失败:{e}") + self.debug_print(f"更新记录失败:{e}") raise - def delete_account(self, email: str): """删除某个账户""" with self._get_connection() as conn: try: conn.execute("DELETE FROM accounts WHERE email = ?", (email,)) conn.commit() + self.debug_print(f"成功删除账户: {email}") except sqlite3.Error as e: conn.rollback() - print(f"Error deleting account: {e}") + self.debug_print(f"Error deleting account: {e}") raise def import_from_excel(self, excel_path: str, clear_old: bool = False): - """ - 从 Excel 文件导入数据 - :param excel_path: Excel 文件路径 - :param clear_old: 是否清空旧数据 - """ + """从 Excel 文件导入数据""" try: - # 如果 clear_old 为 True,先清空数据库 if clear_old: self.clear() - # 读取 Excel 文件的第一个工作簿 + # 读取 Excel 文件 df = pd.read_excel(excel_path, sheet_name=0) # 校验表格格式 - required_columns = ["邮箱", "原密码", "原辅助邮箱", "新密码", "新辅助邮箱", "是否更改完成"] - if not all(col in df.columns for col in required_columns): - raise ValueError(f"表格缺少必要的列:{required_columns}") + required_columns = [ + "邮箱", "原密码", "原辅助邮箱", "新密码", "新辅助邮箱", "是否更改完成", "代理" + ] + optional_columns = ["区域"] + all_columns = required_columns + optional_columns - # 将数据转换为 AccountData 对象 + missing_columns = [col for col in required_columns if col not in df.columns] + if missing_columns: + raise ValueError(f"表格缺少必要的列:{missing_columns}") + + # 添加缺失的可选列并填充默认值 + for col in optional_columns: + if col not in df.columns: + df[col] = None + + # 转换数据为 AccountData 对象 account_list = [ AccountData( email=row["邮箱"], @@ -160,40 +202,35 @@ class AccountManagerSQLite: original_aux_email=row["原辅助邮箱"], new_password=row["新密码"], new_aux_email=row["新辅助邮箱"], - change_status=row["是否更改完成"] + change_status=row["是否更改完成"], + proxy=row["代理"], + region=row.get("区域", None) ) for _, row in df.iterrows() ] - # 批量导入数据到数据库 self.import_data(account_list) - print(f"成功导入 {len(account_list)} 条数据!") except Exception as e: - print(f"导入失败:{e}") + self.debug_print(f"导入失败:{e}") raise def export_to_excel(self, excel_path: str): - """ - 导出数据到 Excel 文件 - :param excel_path: Excel 文件路径 - """ + """导出数据到 Excel 文件""" try: - # 从数据库中获取所有数据 accounts = self.export_data() - - # 转换为 DataFrame df = pd.DataFrame([{ "邮箱": account.email, "原密码": account.original_password, "原辅助邮箱": account.original_aux_email, "新密码": account.new_password, "新辅助邮箱": account.new_aux_email, - "是否更改完成": account.change_status + "是否更改完成": account.change_status, + "代理": account.proxy, + "区域": account.region } for account in accounts]) - # 写入 Excel 文件 df.to_excel(excel_path, index=False, sheet_name="Accounts") - print(f"成功导出数据到 {excel_path}!") + self.debug_print(f"成功导出数据到 {excel_path}!") except Exception as e: - print(f"导出失败:{e}") + self.debug_print(f"导出失败:{e}") raise diff --git a/mail.py b/mail.py new file mode 100644 index 0000000..851a050 --- /dev/null +++ b/mail.py @@ -0,0 +1,434 @@ +import imaplib +import email +import re +from email.header import decode_header +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime +import time +from datetime import datetime, timedelta + + +def to_utf7_imap(text): + """ + 将文本转换为 IMAP UTF-7 编码格式。 + + 参数: + text (str): 要转换的文本。 + + 返回: + str: 转换后的 UTF-7 编码文本。 + """ + if not text: + return "" + + # 匹配所有非 ASCII 字符 + def encode_match(match): + return "&" + match.group(0).encode("utf-16be").hex().upper() + "-" + + # 替换非 ASCII 字符为 UTF-7 格式 + return re.sub(r"[^\x20-\x7E]", encode_match, text) + + +def build_search_criteria(from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False): + """ + 构建 IMAP 搜索条件字符串,支持简单查询逻辑。 + + 参数: + from_email (str): 发件人邮箱地址。 + subject (str): 邮件标题的关键字。 + body_keyword (str): 邮件正文的关键字。 + since (datetime): 起始时间,筛选此时间之后的邮件。 + before (datetime): 截止时间,筛选此时间之前的邮件。 + unseen (bool): 是否仅筛选未读邮件。 + + 返回: + str: 构建的 IMAP 搜索条件字符串。 + """ + criteria = [] + + if unseen: + criteria.append("UNSEEN") + + if from_email: + criteria.append(f'FROM "{to_utf7_imap(from_email)}"') + + if subject: + criteria.append(f'SUBJECT "{to_utf7_imap(subject)}"') + + if body_keyword: + criteria.append(f'BODY "{to_utf7_imap(body_keyword)}"') + + if since: + criteria.append(f'SINCE {since.strftime("%d-%b-%Y")}') + + if before: + criteria.append(f'BEFORE {before.strftime("%d-%b-%Y")}') + + # 用空格拼接所有条件(IMAP 默认 AND 逻辑) + return " ".join(criteria) if criteria else "ALL" + + +def parse_email_date(date_str, default_tz=timezone.utc): + """ + 安全解析邮件日期。 + """ + if not date_str: + return None + try: + email_date = parsedate_to_datetime(date_str) + if email_date and email_date.tzinfo is None: + email_date = email_date.replace(tzinfo=default_tz) + return email_date + except Exception as e: + print(f"Warning: Failed to parse date '{date_str}': {e}") + return None + + +class EmailClient: + def __init__(self, host, username, password): + self.host = host + self.username = username + self.password = password + self.connection = None + + def connect(self): + self.connection = imaplib.IMAP4(self.host) + self.connection.login(self.username, self.password) + self.connection.select("inbox") + + def disconnect(self): + if self.connection: + self.connection.logout() + + def search_emails(self, search_criteria): + """ + 使用自由构建的搜索条件执行 IMAP 搜索。 + + 参数: + search_criteria (str): 用于 IMAP 搜索的条件字符串。 + + 返回: + tuple: 搜索结果 (result, data),其中: + - result (str): 搜索状态 ("OK" 表示成功)。 + - data (list): 搜索到的邮件 ID 列表。 + + Raises: + Exception: 如果搜索失败。 + """ + result, data = self.connection.search(None, search_criteria) + if result != "OK": + raise Exception(f"Failed to search emails with criteria: {search_criteria}") + + email_ids = data[0].split() + if not email_ids: + print("Debug: No matching emails found.") + return result, [] # 返回空列表以便后续处理 + + return result, data + + def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None, + unseen=False, max_count=100): + """ + 使用构建的搜索条件查询最近的邮件。 + + 参数: + from_email (str): 发件人邮箱地址。 + subject (str): 邮件标题关键字。 + body_keyword (str): 邮件正文关键字。 + since (datetime): 起始时间。 + before (datetime): 截止时间。 + unseen (bool): 是否只查询未读邮件。 + max_count (int): 返回的邮件数量上限。 + + 返回: + list: 符合条件的邮件 ID 列表,按接收时间倒序排列。 + """ + search_criteria = build_search_criteria( + from_email=from_email, + subject=subject, + body_keyword=body_keyword, + since=since, + before=before, + unseen=unseen + ) + result, data = self.search_emails(search_criteria) + + # 检查 `data` 是否有效 + if not data or not data[0]: + print("Debug: No emails found matching the criteria.") + return [] # 返回空列表 + + # 正常处理邮件 ID + email_ids = data[0].split()[-max_count:] + return list(reversed(email_ids)) + + def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, + start_time=None, max_results=None): + if max_results is not None and max_results <= 0: + raise ValueError("max_results must be a positive integer or None") + + sender_regex = re.compile(sender_pattern) if sender_pattern else None + keyword_regex = re.compile(keyword_pattern) if keyword_pattern else None + subject_regex = re.compile(subject_pattern) if subject_pattern else None + all_matched_emails = [] + + for email_id in email_ids: + try: + result, data = self.connection.fetch(email_id, "(RFC822)") + if result != "OK": + print(f"Warning: Failed to fetch email with ID {email_id}") + continue + + msg = email.message_from_bytes(data[0][1]) + + from_email = email.utils.parseaddr(msg["From"])[1] if msg["From"] else "" + if sender_regex and not sender_regex.search(from_email): + continue + + subject, encoding = decode_header(msg["Subject"])[0] + subject = subject.decode(encoding or "utf-8") if isinstance(subject, bytes) else subject + if subject_regex and not subject_regex.search(subject): + continue + + print("1没报错") + + content = "" + for part in msg.walk(): + content_type = part.get_content_type() + print("2没报错") + try: + if content_type == "text/plain": + content += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8") + print("3没报错") + elif content_type == "text/html" and not content: + content += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8") + print("4没报错") + except Exception as e: + print(f"Warning: Failed to decode {content_type} content: {e}") + + if keyword_regex and not keyword_regex.search(content): + continue + + + date_str = msg.get("Date") + email_date = parse_email_date(date_str) if date_str else None + # print('email_date:',email_date) + # print('start_time:',start_time) + if start_time and email_date and email_date < start_time: + continue + + matched_email = { + "subject": subject, + "from": from_email, + "content": content, + "date": email_date + } + all_matched_emails.append(matched_email) + + if max_results and len(all_matched_emails) >= max_results: + break + + except Exception as e: + print(f"Error: Failed to process email ID {email_id}: {e}") + + return all_matched_emails + + def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None, + subject_pattern=None, start_time=None): + matched_emails = self.fetch_all_matching_emails( + email_ids=email_ids, + sender_pattern=sender_pattern, + keyword_pattern=keyword_pattern, + subject_pattern=subject_pattern, + start_time=start_time, + max_results=1 + ) + return matched_emails[0] if matched_emails else None + + +class GoogleCodeReceiver: + def __init__(self, email_client): + self.email_client = email_client + self.last_timestamp = datetime.now(timezone.utc) + + def _update_last_timestamp(self): + self.last_timestamp = datetime.now(timezone.utc) + + def wait_code(self, username, timeout=60, interval=3, start_time=None): + """ + 等待 Google 验证码邮件。 + + 参数: + username (str): 用户名,用于在正文中检索匹配邮件。 + timeout (int): 最大等待时间,单位为秒。 + interval (int): 轮询间隔,单位为秒。 + start_time (datetime): 开始检索邮件的时间。默认值为当前时间。 + + 返回: + str: 提取的验证码。 + """ + # 如果未指定 `start_time`,使用当前时间 + if start_time is None: + start_time = datetime.now(timezone.utc) # 默认使用当前 UTC 时间 + elif start_time.tzinfo is None: + # 如果 `start_time` 没有时区信息,假定为本地时间,并转为 UTC + local_time = start_time.astimezone() + start_time = local_time.astimezone(timezone.utc) + else: + # 如果已经有时区信息,统一转为 UTC + start_time = start_time.astimezone(timezone.utc) + + # 确定 `since` 时间,取 `start_time` 往前推 2 天 + since = start_time - timedelta(days=2) + + # 更新 `last_timestamp` + self._update_last_timestamp() + + end_time = datetime.now(timezone.utc) + timedelta(seconds=timeout) + subject_pattern = r"(?:Email verification code|电子邮件验证码)[::]?\s*(\d{6})" + # sender_pattern = r"noreply@google\.com" + # keyword_pattern = re.escape(username) + + while datetime.now(timezone.utc) < end_time: + try: + # Fetch recent emails with the adjusted `since` + email_ids = self.email_client.fetch_recent_emails( + max_count=10, + from_email="noreply@google.com", + body_keyword=username, + since=since # 从 `start_time` 往前推 2 天开始检索 + ) + + matched_email = self.email_client.filter_emails_by_sender_and_keyword( + email_ids=email_ids, + # sender_pattern=sender_pattern, + # keyword_pattern=keyword_pattern, + subject_pattern=subject_pattern, + start_time=start_time # 确保只检索 `start_time` 之后的邮件 + ) + + if matched_email: + match = re.search(subject_pattern, matched_email["subject"]) + if match: + return match.group(1) + + except Exception as e: + print(f"Warning: Error occurred while fetching code: {e}") + + time.sleep(interval) + + raise TimeoutError("Timeout waiting for verification code.") + + +# 老的通用邮箱测试 +def mailTest(): + server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 + username = "gmailvinted@mailezu.com" + password = "g1l2o0hld84" + client = EmailClient(server, username, password) + client.connect() + start_time = datetime(2024, 11, 22) + sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱 + keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式 + try: + + email_ids = client.fetch_recent_emails( + max_count=10, + from_email="noreply@google.com", + # subject='Email verification code',#中文邮件叫 '电子邮件验证码‘ + body_keyword='RibeAchour875@gmail.com', + since=start_time, + ) + + # 获取时间上最新的匹配项,应用起始时间过滤器 + latest_matched_email = client.filter_emails_by_sender_and_keyword(email_ids, sender_pattern, keyword_pattern) + if latest_matched_email: + print("\n时间上最新的匹配邮件:") + print("主题:", latest_matched_email["subject"]) + print("发件人:", latest_matched_email["from"]) + print("内容:", latest_matched_email["content"]) + print("时间:", latest_matched_email["date"]) + else: + print("没有符合条件的时间上最新的匹配邮件") + + # print(f"ids:{email_ids}") + # 获取所有匹配的邮件,应用起始时间过滤器 + all_matched_emails = client.fetch_all_matching_emails(email_ids, sender_pattern, keyword_pattern) + if all_matched_emails: + print("\n所有匹配的邮件:") + for idx, email in enumerate(all_matched_emails): + print(f"邮件 {idx + 1}:") + print("主题:", email["subject"]) + print("发件人:", email["from"]) + print("内容:", email["content"], "\n") # 显示内容 + print("时间:", email["date"]) + else: + print("没有符合条件的所有匹配邮件") + + finally: + client.disconnect() + + +def codeTest(): + server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 + username = "gmailvinted@mailezu.com" + password = "g1l2o0hld84" + + client = EmailClient(server, username, password) + client.connect() + try: + code_receiver = GoogleCodeReceiver(client) + # 这里改成要捕获的目标邮件地址 + code = code_receiver.wait_code( + username="RibeAchour875@gmail.com", timeout=300, interval=5, + start_time=datetime(2024, 11, 10)) + # code = code_receiver.wait_code(username="RibeAchour875@gmail.com", timeout=300, interval=5) + + print(f"收到谷歌验证码: {code}") + except TimeoutError as e: + print(e) + except Exception as e: + print(f"An unexpected error occurred: {e}") + finally: + client.disconnect() + + +def test3(): + server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 + username = "gmailvinted@mailezu.com" + password = "g1l2o0hld84" + + # server = "imap.qq.com" + # username = "bigemon@foxmail.com" + # password = "ejudkkdfiuemcaaj" + + client = EmailClient(server, username, password) + client.connect() + + # mailTest() + + ok, email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024') + if email_ids: + print(email_ids) + sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱 + keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式 + all_matched_emails = client.fetch_all_matching_emails(email_ids) + if all_matched_emails: + print("\n所有匹配的邮件:") + for idx, email in enumerate(all_matched_emails): + print(f"邮件 {idx + 1}:") + print("主题:", email["subject"]) + print("发件人:", email["from"]) + print("内容:", email["content"], "\n") # 显示内容 + print("时间:", email["date"]) + else: + print("没有符合条件的所有匹配邮件") + else: + print("查不到") + + +# 使用示例 +if __name__ == "__main__": + # test3() + codeTest() + diff --git a/main.py b/main.py index b3f7e55..c8c4a57 100644 --- a/main.py +++ b/main.py @@ -7,14 +7,8 @@ from multiprocessing import freeze_support from DrissionPage import ChromiumOptions, ChromiumPage from DrissionPage.errors import * import random -import imaplib -import email -from email.header import decode_header -import re import time -from datetime import datetime, timedelta import os -from dotenv import load_dotenv, set_key from filelock import FileLock from openpyxl import load_workbook import ctypes @@ -27,7 +21,10 @@ import string from multiprocessing import Queue, Pool from account import AccountManagerSQLite from typing import List, Tuple -from proxy import ProxyManager + +from mail import GoogleCodeReceiver, EmailClient +from proxy import ProxyManagerSQLite, classifier_smartproxy +from datetime import datetime, timedelta, timezone # Windows 消息常量 WM_MOUSEMOVE = 0x0200 @@ -35,6 +32,10 @@ WM_LBUTTONDOWN = 0x0201 WM_LBUTTONUP = 0x0202 MK_LBUTTON = 0x0001 +server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 +username = "gmailvinted@mailezu.com" +password = "g1l2o0hld84" + # 停止标志 stop_event = threading.Event() @@ -42,10 +43,14 @@ user32 = ctypes.windll.user32 root_plugin_dir = os.path.join(os.getcwd(), 'proxy_auth_plugin') -# 输入邮箱信息 -IMAP_SERVER = "server-10474.cuiqiu.vip" # 替换为你的邮件服务的IMAP服务器地址,例如Gmail的为"imap.gmail.com" -EMAIL_ACCOUNT = "gmailvinted@mailezu.com" # 主账户邮箱地址 -EMAIL_PASSWORD = "g1l2o0hld84" # 邮箱密码或应用专用密码 +# 初始化数据库 +db_manager = AccountManagerSQLite(db_path="accounts.db") +proxy_manager = ProxyManagerSQLite() + +proxy_manager.import_proxies_with_classifier("IP.txt", classifier=classifier_smartproxy) + +# 全局写入锁 +write_lock = multiprocessing.Lock() ua_templates = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", @@ -85,6 +90,7 @@ def random_sleep(tab, min_seconds=0, max_seconds=2): tab.wait(random.uniform(min_seconds, max_seconds)) + def get_free_port(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('localhost', 0)) # 使用端口0让操作系统自动选择一个未占用的端口 @@ -365,9 +371,11 @@ def input_recovery_code(tab, recovery_code): recovery_input.input(char) # 输入一个字符 tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 - next_input = tab.ele('@data-idom-class=nCP5yc AjY5Oe DuMIQc LQeN7 BqKGqe Jskylb TrZEUc lw1w4b', timeout=15) + next_input = tab.ele('@@tag()=span@@text()=下一步', timeout=15) next_input.click(by_js=True) + print("到这里我已经点完下一步,进入修改辅助邮箱的地方了") + tab.wait.load_start() tab.wait(1) @@ -376,6 +384,8 @@ def input_recovery_code(tab, recovery_code): if retry: return True + return False + # 找到修改邮箱的按钮 def click_use_other_account_button2(tab, next_span): @@ -386,14 +396,14 @@ def click_use_other_account_button2(tab, next_span): # 修改辅助邮箱账号 def modify_the_secondary_email1(tab, auxiliary_email_account): - button = tab.ele('@@tag()=i@@text()=edit', timeout=15) - button.click(by_js=True) - tab.wait(2) - input = tab.ele('@type=email', timeout=15) - input.clear() - for char in auxiliary_email_account: - input.input(char) # Enter one character at a time - tab.wait(random.uniform(0.1, 0.3)) # Random delay between 0.1 and 0.3 seconds + # button = tab.ele('@@tag()=i@@text()=edit', timeout=15) + # button.click(by_js=True) + # tab.wait(2) + # input = tab.ele('@type=email', timeout=15) + # input.clear() + # for char in auxiliary_email_account: + # input.input(char) # Enter one character at a time + # tab.wait(random.uniform(0.1, 0.3)) # Random delay between 0.1 and 0.3 seconds # 点击保存 save_span = tab.ele('@text()=Save', timeout=15) @@ -543,189 +553,6 @@ def clean(text): return "".join(c if c.isalnum() else "_" for c in text) -# 计算每次运行后的总邮件数量 -def update_env_variable(key, value): - load_dotenv() - - env_vars = {} - - # 读取现有的 .env 文件内容 - if os.path.exists(".env"): - with open(".env", "r") as f: - lines = f.readlines() - for line in lines: - if "=" in line: - k, v = line.strip().split("=", 1) - env_vars[k] = v - - # 更新或新增变量 - env_vars[key] = str(value) - - # 写回到 .env 文件 - with open(".env", "w") as f: - for k, v in env_vars.items(): - f.write(f"{k}={v}\n") - - -# 获取辅助邮箱验证码 -def check_emails(): - """检查邮件并提取验证码和邮箱账号""" - try: - # 连接到IMAP服务器 - mail = imaplib.IMAP4_SSL(IMAP_SERVER) - mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) - - # 选择邮箱文件夹(通常是 'INBOX') - mail.select("inbox") - - # 搜索邮件,搜索条件可以是 'FROM' 限定谷歌发件人 - status, messages = mail.search(None, 'FROM "google.com"') # 搜索来自谷歌的邮件 - mail_ids = messages[0].split() - total_mails = len(mail_ids) - - print(f"总邮件数量: {len(mail_ids)}") - - # 获取环境变量 - total_mails = os.getenv('TOTAL_MAILS') # 获取总邮件数量 - loop_count = os.getenv('LOOP_COUNT') # 获取循环运行的次数 - - # 打印输出变量 - print(f"总邮件数量 (TOTAL_MAILS): {total_mails}") - print(f"循环运行的次数 (LOOP_COUNT): {loop_count}") - - update_env_variable("TOTAL_MAILS", total_mails) # 更新到 .env 文件中 - - for i in mail_ids[-30:]: # 仅获取最近10封邮件 - # 获取邮件内容 - res, msg = mail.fetch(i, "(RFC822)") - for response in msg: - if isinstance(response, tuple): - # 解析邮件内容 - msg = email.message_from_bytes(response[1]) - # 获取邮件主题 - subject, encoding = decode_header(msg["Subject"])[0] - if isinstance(subject, bytes): - # 如果是字节,需要解码 - subject = subject.decode(encoding if encoding else "utf-8") - - # 获取发件人 - from_ = msg.get("From") - - # 创建存储结果的字典列表 - results = [] - - # 如果邮件有正文部分 - if msg.is_multipart(): - for part in msg.walk(): - # 如果是文本/HTML内容 - if part.get_content_type() == "text/plain": - body = part.get_payload(decode=True).decode() - # 提取验证码和邮箱账号 - match = re.search(r'(\d{6})[\s\S]*?([a-zA-Z0-9._%+-]+@gmail\.com)', body) - if match: - code = match.group(1) - gmail_account = match.group(2) - # 获取邮件接收时间 - date_tuple = email.utils.parsedate_tz(msg["Date"]) - received_time = datetime.fromtimestamp( - email.utils.mktime_tz(date_tuple)) if date_tuple else None - # 添加到字典之前检查是否重复 - entry = { - "验证码": code, - "发送的邮箱账号": gmail_account, - "收到的时间": received_time.strftime( - "%Y-%m-%d %H:%M:%S") if received_time else "未知" - } - print(entry) - - # 无需检查是否在字典中,直接保存到文件 - save_to_file(entry) # 调用保存函数 - else: - # 如果邮件不是多部分(简单邮件) - body = msg.get_payload(decode=True).decode() - # 提取验证码和邮箱账号 - match = re.search(r'(\d{6})[\s\S]*?([a-zA-Z0-9._%+-]+@gmail\.com)', body) - if match: - code = match.group(1) - gmail_account = match.group(2) - # 获取邮件接收时间 - date_tuple = email.utils.parsedate_tz(msg["Date"]) - received_time = datetime.fromtimestamp( - email.utils.mktime_tz(date_tuple)) if date_tuple else None - # 添加到字典之前检查是否重复 - entry = { - "验证码": code, - "发送的邮箱账号": gmail_account, - "收到的时间": received_time.strftime( - "%Y-%m-%d %H:%M:%S") if received_time else "未知" - } - print(entry) - - # 无需检查是否在字典中,直接保存到文件 - save_to_file(entry) # 调用保存函数 - - # 关闭连接 - mail.logout() - - except Exception as e: - print(f"发生错误: {e}") - - -def get_verification_codes(email_account, sent_time): - """ - 根据邮箱和发送时间筛选验证码。 - - :param email: 发送验证码的邮箱账号 - :param sent_time: 验证码发送的时间(格式:YYYY-MM-DD HH:MM:SS) - :return: 符合条件的验证码列表 - """ - sent_time = datetime.strptime(sent_time, "%Y-%m-%d %H:%M:%S") - latest_code = None - latest_time = None - - try: - # 打开文件并逐行读取 - with open("code.txt", "r", encoding="utf-8") as file: - for line in file: - # 检查当前行是否包含指定邮箱 - if f"发送的邮箱账号: {email_account}".lower() in line: - # 提取时间和验证码 - time_str = line.split("收到的时间:")[-1].strip() - code = line.split("验证码:")[1].split(",")[0].strip() - - # 将时间字符串转换为 datetime 对象 - received_time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") - - # 筛选条件:发送时间 <= 收到的时间 - if sent_time <= received_time + timedelta(seconds=10): - # 更新为最新的验证码 - if latest_time is None or received_time > latest_time: - latest_time = received_time - latest_code = code - except FileNotFoundError: - print("错误: 找不到 code.txt 文件。") - except Exception as e: - print(f"错误: {e}") - - return latest_code - - -# 计算循环次数的函数 -def manage_loop_count(): - # 加载环境变量 - load_dotenv() - - # 获取当前 LOOP_COUNT 值,如果不存在则默认为 0 - count = int(os.getenv("LOOP_COUNT", 0)) - - # 增加计数器 - count += 1 - - # 更新 .env 文件中的 LOOP_COUNT - set_key(".env", "LOOP_COUNT", str(count)) - - # 返回更新后的计数 - return count # 用来标记账号是否完成 @@ -803,51 +630,6 @@ def get_absolute_path(relative_path): return absolute_path -# 判断验证码是否出现报错 -js_code = """ -function isElementVisible(element) { - if (!(element instanceof Element)) { - throw new Error("参数必须是一个 DOM 元素"); - } - - if (!document.body.contains(element)) { - return false; - } - - const style = window.getComputedStyle(element); - if (style.display === "none" || style.visibility === "hidden" || style.opacity === "0") { - return false; - } - - const rect = element.getBoundingClientRect(); - if (rect.width === 0 || rect.height === 0) { - return false; - } - - const { top, right, bottom, left } = rect; - const viewportWidth = window.innerWidth || document.documentElement.clientWidth; - const viewportHeight = window.innerHeight || document.documentElement.clientHeight; - - if (bottom < 0 || top > viewportHeight || right < 0 || left > viewportWidth) { - return false; - } - - const centerX = (left + right) / 2; - const centerY = (top + bottom) / 2; - const elementAtCenter = document.elementFromPoint(centerX, centerY); - - if (elementAtCenter && !element.contains(elementAtCenter) && !elementAtCenter.contains(element)) { - return false; - } - - return true; -} - -var element = document.querySelector("#c4"); -return isElementVisible(element); -""" - - def logutGoogle(tab) -> bool: tab.get('https://accounts.google.com/Logout?hl=zh-CN') if tab.ele('@@tag()=input@@type=email', timeout=0) != None: # 已注销就会出登录页元素 @@ -867,8 +649,24 @@ def logutGoogle(tab) -> bool: return False # 无用分支,不过为了避免函数太长回头修改时候忘记,还是写个 False 以防万一 -def main(email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, proxy_username, proxy_password,row_index, file_path): - global browser, plugin_path, user_dir +def main(email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, proxy_username, proxy_password,region, row_index): + # 初始化 + client = EmailClient(server, username, password) + client.connect() + code_receiver = GoogleCodeReceiver(client) + + print("邮箱账户:", email_account) # 邮箱账户 + print("邮箱密码:", email_password) # 邮箱密码 + print("旧的恢复邮箱:", old_recovery_email) # 旧的恢复邮箱 + print("新的密码:", new_password) # 新的密码 + print("新的恢复邮箱:", new_recovery_email) # 新的恢复邮箱 + print("代理主机:", proxy_host) # 代理主机 + print("代理端口:", proxy_port) # 代理端口 + print("代理用户名:", proxy_username) # 代理用户名 + print("代理密码:", proxy_password) # 代理密码 + print("代理区域:", region) + print("行索引:", row_index) # 行索引 + global browser, plugin_path, user_dir, attempt # 生成一个 6 位的随机数 global random_number @@ -942,6 +740,13 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re print(temp_path) print(user_dir) + options.set_pref(arg='profile.default_content_settings.popups', value='0') + options.set_pref('translate.enabled', False) # 禁用翻译功能 + options.set_pref('translate.newtld', False) # 禁用新的翻译扩展功能 + + # 确保不显示翻译提示 + options.set_pref('profile.content_settings.exceptions.translate', '{"pattern":"*","setting":2}') + browser = ChromiumPage(options) # 清除缓存和cookies @@ -958,13 +763,12 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re if not flag: browser.quit() return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + proxy_username, proxy_password, region, row_index) save_log(random_number, "已经访问谷歌页面") - # 执行JavaScript清除localStorage tab.run_js("window.localStorage.clear();") @@ -1006,9 +810,9 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re print("无法找到窗口句柄") save_log(random_number, "无法找到窗口句柄") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + proxy_username, proxy_password, region, row_index) try: # 输入邮箱账号 @@ -1019,10 +823,11 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, region, row_index) print("输入邮箱账号运行完毕") save_log(random_number, "输入邮箱账号运行完毕") @@ -1041,11 +846,11 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + proxy_username, proxy_password, region, row_index) retry_with_recovery(tab, hwnd, email_account) @@ -1055,9 +860,9 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re # 看下是否出现了手机号 telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) if telephone: - update_status_in_db(file_path, '接码') + update_status_in_db(email_account, '接码') browser.quit() - return False + return print("检查出现手机号运行完毕") save_log(random_number, "检查出现手机号运行完毕") @@ -1071,14 +876,14 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到密码输入框的元素:{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, region, row_index) - - print("输入密码运行完毕") - save_log(random_number, "输入密码运行完毕") + print("输入旧密码运行完毕") + save_log(random_number, "输入旧密码运行完毕") wrong = tab.ele("@text()=抱歉,出了点问题。请重试。", timeout=15) save_log(random_number, f"谷歌验证按钮是否出现:{wrong}") @@ -1095,7 +900,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, @@ -1110,11 +915,11 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到密码输入框的元素:{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + proxy_username, proxy_password, region, row_index) print("如果出现错误,输入密码运行完毕") save_log(random_number, "如果出现错误后,输入密码运行完毕") @@ -1123,7 +928,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re # 看下是否出现了手机号 telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) if telephone: - update_status_in_db(file_path, '接码') + update_status_in_db(email_account, '接码') browser.quit() return email_account @@ -1135,15 +940,14 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re try: # 查看密码是否更改 - password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15) - if password_input: - - input_password(tab, hwnd, new_password, password_input) - tab.wait(7) + if tab.wait.ele_deleted('#passwordNext', timeout=5) == False: + save_log(random_number, f"旧密码出错,使用新密码") password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15) - alternate_email_button = tab.ele('@text()=确认您的辅助邮箱', timeout=15) - if password_input and not alternate_email_button: - update_status_in_db(file_path, '被盗') + input_password(tab, hwnd, new_password, password_input) + save_log(random_number, f"输入新密码完毕") + tab.wait(7) + if tab.wait.ele_deleted('#passwordNext', timeout=5) == False: + update_status_in_db(email_account, '被盗') return email_account password_change = True print("密码已经被更改过") @@ -1151,10 +955,11 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到密码输入框的元素{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, region, row_index) print("输入新密码运行完毕") save_log(random_number, "输入新密码运行完毕") @@ -1169,10 +974,11 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到输入辅助邮箱的元素:{e}" ) save_log(random_number, f"找不到输入辅助邮箱的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, region, row_index) print("点击辅助邮箱验证运行完毕") save_log(random_number, "点击辅助邮箱验证运行完毕") @@ -1184,36 +990,37 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re recovery_input = tab.ele('@aria-label=输入辅助邮箱地址', timeout=15) if recovery_input: auxiliary_email_account_change = input_recovery_code(tab, old_recovery_email) # 使用传入的旧辅助邮箱 + print(f"是否有修改辅助邮箱账号:{auxiliary_email_account_change}") tab.wait(5) # 如果没有被修改则进行修改 if not auxiliary_email_account_change: + print("我要开始修改辅助邮箱账号了") + try: - next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15) - if next_span: - click_use_other_account_button2(tab, next_span) - tab.wait(3) - modify_the_secondary_email1(tab, new_recovery_email) # 使用传入的新辅助邮箱 - auxiliary_email_account_change = True + modify_the_secondary_email1(tab, new_recovery_email) # 使用传入的新辅助邮箱 + print("修改完成") + # auxiliary_email_account_change = True except ElementNotFoundError as e: # 捕获并打印异常,不中断后续代码执行 print(f"修改辅助邮箱的时候找不到元素: {e}") save_log(random_number, f"修改辅助邮箱的时候找不到元素: {e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, - proxy_port, proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, row_index) else: print("辅助邮箱账号已经被更改过") save_log(random_number, "辅助邮箱账号已经被更改过") recovery_input.clear() input_recovery_code(tab, new_recovery_email) - print("输入辅助邮箱运行完毕") - save_log(random_number, "输入辅助邮箱运行完毕") + print("修改辅助邮箱1运行完毕") + save_log(random_number, "修改辅助邮箱1运行完毕") if password_change and auxiliary_email_account_change: - update_status_in_db(file_path, '已更改') + update_status_in_db(email_account, '已更改') logutGoogle(tab) browser.quit() return email_account @@ -1228,12 +1035,11 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到继续按钮的元素:{e}") save_log(random_number, f"找不到继续按钮的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) - + proxy_username, proxy_password, region, row_index) try: # 点击头像进入邮箱的安全设置 @@ -1245,10 +1051,11 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到头像的元素:{e}") save_log(random_number, f"找不到继续按钮的元素:{e}") - update_status_in_db(file_path,"请求错误,请重试") + update_status_in_db(email_account,"请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, region, row_index) tab.wait(3) @@ -1264,102 +1071,115 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re except ElementNotFoundError as e: print(f"找不到修改密码的元素:{e}") save_log(random_number, f"找不到修改密码的元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, region, row_index) tab.wait(1) try: if auxiliary_email_account_change: - update_status_in_db(file_path, '已更改') + update_status_in_db(email_account, '已更改') return email_account # 修改辅助邮箱2 flag = modify_the_secondary_email2(tab, new_recovery_email, new_password, hwnd) save_log(random_number, f"辅助邮箱是否被修改:{flag}") if flag: browser.quit() - update_status_in_db(file_path, '已更改') + update_status_in_db(email_account, '已更改') return email_account else: - sent_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print(sent_time) - save_log(random_number, sent_time) + # 最多尝试3次循环 + for attempt in range(3): + print(f"第 {attempt + 1} 次尝试检查验证码...") - # 清空.env文件内容 - with open(".env", "w") as f: - pass - count = 0 - while True: - print("检查新邮件...") - save_log(random_number, "检查新邮件...") - check_emails() - code = get_verification_codes(email_account, sent_time) - if code: - verification_input = tab.eles("@type=text", timeout=15) - for char in code: - if len(verification_input) > 1: - verification_input[1].input(char) - tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒 - verify_button = tab.eles("@text()=Verify", timeout=15) - print(verify_button) - verify_button[1].click(by_js=True) - tab.wait(5) - is_visible = tab.run_js(js_code) - if is_visible: - sent_code_button = tab.ele('@text():Send a new code.', timeout=15) - print(f"重新发送邮件的按钮为:{sent_code_button}") - save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}") - sent_code_button.click(by_js=True) - continue + # 获取当前程序运行的时间 + current_time = datetime.now(timezone.utc) + + # 检查验证码,超时时间为300秒,使用当前时间作为 start_time + try: + code = code_receiver.wait_code(username=f"{email_account}", timeout=300, interval=5, + start_time=current_time) + print(f"检查的验证码是{code}") + save_log(random_number, f"检查的验证码是{code}") + if code: + # 如果找到验证码,填入验证码 + verification_input = tab.eles("@type=text", timeout=15) + if len(verification_input) >= 1: + verification_input[1].clear() + for char in code: + verification_input[1].input(char) + tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒 + + print("输入完验证码了") + save_log(random_number, "输入完验证码了") + + # 点击 Verify 按钮 + verify_button = tab.eles("@text()=Verify", timeout=15) + if verify_button and len(verify_button) > 1: + verify_button[1].click(by_js=True) + tab.wait(5) + + print("点击确定") + + # 检查是否有报错提示 + is_visible = tab.ele('@@id=c4@@tag()=p@@role=alert', timeout=1) + save_log(random_number, f"是否有报错提示:{is_visible}") + if is_visible: + # 如果有报错提示,点击重新发送按钮并进入下一次循环 + sent_code_button = tab.ele('@text():Send a new code.', timeout=15) + if sent_code_button: + print(f"重新发送邮件的按钮为:{sent_code_button}") + sent_code_button.click(by_js=True) + continue + else: + # 如果没有报错,退出 + print("辅助邮箱账号已经更改完毕") + break else: - print("辅助邮箱账号已经更改完毕") - save_log(random_number, "辅助邮箱账号已经更改完毕") - update_status_in_db(file_path, '已更改') - logutGoogle(tab) - return email_account - count += 1 - print(f"已运行 {count} 次") - save_log(random_number, f"已运行 {count} 次") - if count == 24: + # 如果未找到验证码,点击重新发送按钮并进入下一次循环 + print("未找到验证码,点击重新发送按钮") + sent_code_button = tab.ele('@text():Send a new code.', timeout=15) + if sent_code_button: + sent_code_button.click(by_js=True) + continue + except TimeoutError: + print("超时未收到验证码,重新发送") sent_code_button = tab.ele('@text():Send a new code.', timeout=15) - print(f"重新发送邮件的按钮为:{sent_code_button}") - save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}") - sent_code_button.click(by_js=True) - elif count == 48: - sent_code_button = tab.ele('@text():Send a new code.', timeout=15) - print(f"重新发送邮件的按钮为:{sent_code_button}") - save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}") - sent_code_button.click(by_js=True) - elif count == 60: - print("验证码超过四分钟没接收到,自动退出") - save_log(random_number, "验证码超过四分钟没接收到,自动退出") - update_status_in_db(file_path, '未更改(验证码没收到)') - return - tab.wait(5) # 每5秒循环一次 + if sent_code_button: + sent_code_button.click(by_js=True) + + except ElementNotFoundError as e: print(f"更改辅助邮箱账号的时候找不到元素:{e}") save_log(random_number, f"更改辅助邮箱账号的时候找不到元素:{e}") - update_status_in_db(file_path, "请求错误,请重试") + update_status_in_db(email_account, "请求错误,请重试") return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, - proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, region, row_index) except Exception as e: print(f"出现未知错误:{e}") save_log(random_number, f"出现未知错误:{e}") - update_status_in_db(file_path, f'出现未知错误:{e}请重试') + update_status_in_db(email_account, f'出现未知错误:{e}请重试') return ( - email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, - proxy_port, proxy_username, proxy_password, row_index) + email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, + proxy_port, + proxy_username, proxy_password, region, row_index) + finally: - browser.quit() + # browser.quit() + # 释放资源 time.sleep(5) delete_folder(plugin_path) delete_folder(user_dir) + + def run_gui(): stop_flag = threading.Event() # 用于控制任务中止的标志位 exec_thread = None # 用于存储后台线程的引用 @@ -1373,6 +1193,19 @@ def run_gui(): entry_file_path.delete(0, tk.END) entry_file_path.insert(0, file_path) + def update_proxy_stats(): + try: + # 从 ProxyManager 获取代理总数 + total_proxies = proxy_manager.get_proxy_count("ALL") + print(f"更新代理信息的总数:{total_proxies}") + + # 更新标签内容 + proxy_stats = f"总数: {total_proxies}" # 只显示总代表数量 + lbl_proxy_stats.config(text=proxy_stats) + except Exception as e: + print(f"更新代理统计信息时出错: {e}") + lbl_proxy_stats.config(text="获取代理统计信息失败") + def start_processing(): file_path = entry_file_path.get() @@ -1380,6 +1213,8 @@ def run_gui(): max_concurrency = entry_concurrency.get() max_retries = entry_retries.get() + + # 检查文件路径和 Sheet 名称是否填写完整 if not file_path or not sheet_name or not max_concurrency or not max_retries: messagebox.showwarning("警告", "请填写完整的文件路径、表格名称、并发数和重试次数!") @@ -1444,7 +1279,7 @@ def run_gui(): stop_flag.clear() # 清除停止标志 exec_thread = threading.Thread(target=parallel_execution_with_db, - args=(file_path, sheet_name, max_concurrency, max_retries)) + args=(file_path, max_concurrency, max_retries)) exec_thread.daemon = True # 设置为守护线程 exec_thread.start() @@ -1459,17 +1294,20 @@ def run_gui(): root.destroy() # 销毁主窗口 sys.exit(0) # 强制退出程序 - def parallel_execution_with_db(file_path, sheet_name, max_concurrency, max_retries): + def parallel_execution_with_db(file_path, max_concurrency, max_retries): progress_queue = Queue() - all_rows = read_data_from_db(file_path) + all_rows = read_data_from_db(file_path) # 从数据库中读取数据 + print(f"数据库中的数据:{all_rows}") failed_rows = [] total_rows = len(all_rows) completed_count = 0 failed_count = 0 failed_emails = set() # 用于存储唯一的失败邮箱 - # 更新总条目数 + # 更新进度条的总条目数 progress_bar['maximum'] = total_rows + + with Pool(processes=max_concurrency) as pool: retry_count = 0 while all_rows or failed_rows: @@ -1477,7 +1315,7 @@ def run_gui(): print("停止信号已接收,中止任务!") break - rows_to_process = all_rows + failed_rows + rows_to_process = all_rows + failed_rows # 处理所有待处理和失败的行 results = [] for i, row in enumerate(rows_to_process): @@ -1485,17 +1323,51 @@ def run_gui(): print("停止信号已接收,中止任务!") break - # 传递锁给子进程 + # 获取邮箱和区域信息 + account_email = row[0] + region = row[9] # 假设第6列存储的是账号的区域信息 + + if not region: + region = "ALL" # 如果没有区域信息,则使用默认区域 + + # 获取一个随机代理并传递给当前账号 + proxy = proxy_manager.get_random_proxy_by_region(region=region, remove_after_fetch=True) + + if proxy: # 如果分配成功 + proxy_host, proxy_port, proxy_user, proxy_pass = proxy['host'], proxy['port'], proxy[ + 'user'], proxy['password'] + print( + f"为账号 {account_email} 分配代理:{proxy_host}:{proxy_port}:{proxy_user}:{proxy_pass}") + + # 将分配的代理信息传递到数据库中输入邮箱账号运行完毕 + db_manager.update_record(account_email, + proxy=f"{proxy_host}:{proxy_port}:{proxy_user}:{proxy_pass}") + row = row[:5] + (proxy_host, proxy_port, proxy_user, proxy_pass) + row[9:] # 将代理信息加入到数据行中 + else: + print(f"为账号 {account_email} 分配代理失败,跳过此账号。") + + + # 使用写入锁保护 export_proxies + with write_lock: + try: + print("更新剩余可用IP") + proxy_manager.export_proxies("剩下的可用IP.txt") + except Exception as e: + print(f"导出代理时发生错误: {e}") + + update_proxy_stats() + + # 传递锁给子进程,并将当前账号数据传递给 main 函数 result = pool.apply_async( - main, args=(*row, file_path), + main, args=(row), callback=lambda result: progress_queue.put(result) ) results.append(result) - all_rows = [] failed_rows = [] for result in results: + try: retry_row = result.get() print(f"main函数返回的值: {retry_row}") @@ -1510,18 +1382,15 @@ def run_gui(): lbl_progress_status.config( text=f"完成:{completed_count}/{total_rows},失败:{failed_count}" ) + else: # 返回的是单个 account,表示成功 completed_count += 1 - - # 更新数据库中记录的状态 - email = retry_row # 假设 retry_row 是邮箱地址 - update_status_in_db(email, "已完成") - - progress_bar['value'] = completed_count # 更新进度条值(百分比) + progress_bar['value'] = completed_count # 更新进度条值(百分比) lbl_progress_status.config( text=f"完成:{completed_count}/{total_rows},失败:{failed_count}" ) + time.sleep(2) # 模拟耗时操作 except Exception as e: @@ -1550,15 +1419,13 @@ def run_gui(): def read_data_from_db(file_path: str) -> List[Tuple]: """ 从数据库读取数据,同时保持与旧的接口和逻辑一致。 + 根据账号区域自动分配代理,并导入到数据库。 + (去除代理分配和数据库更新代理信息的部分) """ # 初始化数据库管理器 - db_manager = AccountManagerSQLite(db_path="accounts.db") skip_keywords = {"已更改", "接码", "被盗"} data = [] - # 创建代理管理器 - proxy_manager = ProxyManager() - # 导入 Excel 数据到数据库,清空旧数据 db_manager.import_from_excel(file_path, clear_old=True) @@ -1567,6 +1434,7 @@ def run_gui(): global total_tasks, completed_tasks, failed_tasks # 全局变量以更新状态 + # 计算任务数量 completed_tasks = sum(account.change_status in skip_keywords for account in accounts) failed_tasks = len(accounts) - completed_tasks - sum(account.change_status is None for account in accounts) total_tasks = len(accounts) - completed_tasks @@ -1574,27 +1442,13 @@ def run_gui(): for i, account in enumerate(accounts): if not account.email: break - if account.change_status not in skip_keywords: - print(f'测试是否是空的:{proxy_manager.is_empty()}') - print(proxy_manager.import_proxies('IP.txt')) - print(f'再测试是否是空的:{proxy_manager.is_empty()}') + if account.change_status not in skip_keywords: + # 不再分配代理,只是记录账号信息 + data.append(( + account.email, account.original_password, account.original_aux_email, + account.new_password, account.new_aux_email, "", "", "", "",account.region, i + 2 + )) - proxy = proxy_manager.get_random_proxy() - print(f"获取到的随机代理:{proxy}") - - if proxy: # 如果代理测试成功 - print( - f"分配的代理信息:Host: {proxy['host']}, Port: {proxy['port']}, User: {proxy['user']}, Password: {proxy['password']}") - proxy_host, proxy_port, proxy_user, proxy_pass = proxy['host'], proxy['port'], proxy['user'], proxy[ - 'password'] - data.append(( - account.email, account.original_password, account.original_aux_email, - account.new_password, account.new_aux_email, proxy_host, - proxy_port, proxy_user, proxy_pass, i + 2 - )) - else: - # 如果代理无效,则跳过该账号的代理分配 - print(f"代理测试失败,跳过账号 {account.email} 的代理分配。") return data root = tk.Tk() @@ -1635,6 +1489,11 @@ def run_gui(): btn_stop = tk.Button(root, text="停止处理", command=stop_processing) btn_stop.grid(row=7, column=0, columnspan=2, padx=20, pady=20, sticky="ew") + # 添加显示代理统计信息的标签 + tk.Label(root, text="代理统计信息:").grid(row=8, column=0, padx=10, pady=10) + lbl_proxy_stats = tk.Label(root, text="代理统计信息未加载") + lbl_proxy_stats.grid(row=8, column=1, padx=10, pady=10, columnspan=2) + root.protocol("WM_DELETE_WINDOW", on_closing) # 绑定窗口关闭事件 root.mainloop() diff --git a/proxy.py b/proxy.py index 6329c0c..1042ea5 100644 --- a/proxy.py +++ b/proxy.py @@ -1,107 +1,258 @@ +import sqlite3 import random -import requests +from contextlib import contextmanager +from typing import List, Optional, Dict -class ProxyManager: - def __init__(self): - self.proxies = [] - def import_proxies(self, file_path): - """ - 导入代理列表,支持文件路径,格式为 host:port:user:password - """ - try: - with open(file_path, 'r') as file: - content = file.read().replace('\r\n', '\n') # 替换 Windows 风格换行符 - lines = content.strip().split('\n') - for line in lines: - parts = line.split(':') - if len(parts) == 4: # 确保格式正确 - proxy = { - 'host': parts[0], - 'port': parts[1], - 'user': parts[2], - 'password': parts[3], - 'protocol': 'http' - } - self.proxies.append(proxy) - except FileNotFoundError: - print(f"Error: File not found at {file_path}.") - return False - except Exception as e: - print(f"Error: {str(e)}") - return False - - return True +class ProxyManagerSQLite: + def __init__(self, db_path="proxies.db", debug=False): + self.db_path = db_path + self.debug = debug + self._initialize_db() - def get_random_proxy(self): - """ - 随机获取一个代理 - """ - if not self.proxies: - print("No proxies available.") - return None - return random.choice(self.proxies) + def debug_print(self, *args): + """仅在 debug 模式下输出调试信息""" + if self.debug: + print(*args) - def test_proxy(self, proxy): - """ - 测试代理的对外 IP - :param proxy: 格式为 {'host': '...', 'port': '...', 'user': '...', 'password': '...', 'protocol': '...'} - """ - if not proxy: - print("Invalid proxy provided.") - return None + def _initialize_db(self): + """初始化或检查数据库结构""" + with self._get_connection() as conn: + # 启用 WAL 模式 + current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0] + if current_mode != "wal": + self.debug_print("切换到 WAL 模式...") + conn.execute("PRAGMA journal_mode=WAL") - proxy_url = f"{proxy['protocol']}://{proxy['user']}:{proxy['password']}@{proxy['host']}:{proxy['port']}" - proxies = {'http': proxy_url, 'https': proxy_url} + # 检查表是否存在 + cursor = conn.execute("PRAGMA table_info(proxies)") + columns = [row[1] for row in cursor.fetchall()] + + required_columns = ["host", "port", "user", "password", "protocol", "region"] - try: - response = requests.get("http://jsonip.com", proxies=proxies, timeout=5) - if response.status_code == 200: - return response.json() + if not columns: + self.debug_print("表不存在,正在创建表...") + elif columns != required_columns: + self.debug_print(f"表结构不一致,当前列: {columns}, 期望列: {required_columns}") + self.debug_print("正在重建表...") + conn.execute("DROP TABLE IF EXISTS proxies") else: - print(f"Failed with status code: {response.status_code}") - return None - except requests.RequestException as e: - print(f"Request failed: {str(e)}") - return None - - def is_empty(self): - return not self.proxies - - def get_and_test_random_proxy(self): - """ - 从代理列表中随机获取一个代理,测试联通性,并从列表中移除 - """ - if not self.proxies: - print("No proxies available.") - return None + self.debug_print("表结构检查通过,无需更改。") + return - proxy = random.choice(self.proxies) - test_result = self.test_proxy(proxy) - if test_result: - print(f"Proxy works: {test_result}") - self.proxies.remove(proxy) # 移除成功的代理 - return proxy, test_result - else: - print("Proxy failed. Removing from the list.") - self.proxies.remove(proxy) # 移除失败的代理 - return None, None + # 创建表 + conn.execute(""" + CREATE TABLE proxies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + host TEXT NOT NULL, + port TEXT NOT NULL, + user TEXT NOT NULL, + password TEXT NOT NULL, + protocol TEXT DEFAULT 'http', + region TEXT NOT NULL + ) + """) + self.debug_print("表已创建。") + + @contextmanager + def _get_connection(self): + """获取 SQLite 数据库连接""" + conn = sqlite3.connect(self.db_path) + try: + yield conn + finally: + conn.close() + + def import_proxies_with_classifier(self, file_path: str, classifier): + """ + 从文件导入代理列表并分类 + :param file_path: 文件路径,格式为 host:port:user:password + :param classifier: 分类函数,接受代理行字符串,返回国家/地区代码 + """ + try: + with open(file_path, "r") as file: + lines = file.read().replace("\r\n", "\n").strip().split("\n") + + proxies = [] + for line in lines: + parts = line.split(":") + if len(parts) == 4: + proxy = { + "host": parts[0], + "port": parts[1], + "user": parts[2], + "password": parts[3], + "region": classifier(line), + "protocol": "http", + } + proxies.append(proxy) + + with self._get_connection() as conn: + conn.executemany(""" + INSERT INTO proxies (host, port, user, password, protocol, region) + VALUES (:host, :port, :user, :password, :protocol, :region) + """, proxies) + conn.commit() + + self.debug_print(f"成功导入 {len(proxies)} 条代理数据!") + except Exception as e: + self.debug_print(f"Error importing proxies: {str(e)}") + raise + + def get_random_proxy_by_region(self, region: Optional[str] = None, remove_after_fetch: bool = False) -> Optional[Dict]: + """ + 随机获取代理,支持按区域筛选 + :param region: 国家/地区代码,若为 None 则随机选择 + :param remove_after_fetch: 是否从数据库中删除取出的代理 + :return: 随机选取的代理字典或 None + """ + with self._get_connection() as conn: + if region is None or region == "ALL": + cursor = conn.execute("SELECT * FROM proxies ORDER BY RANDOM() LIMIT 1") + else: + cursor = conn.execute("SELECT * FROM proxies WHERE region = ? ORDER BY RANDOM() LIMIT 1", (region,)) + + proxy = cursor.fetchone() + if proxy and remove_after_fetch: + conn.execute("DELETE FROM proxies WHERE id = ?", (proxy[0],)) + conn.commit() + + return dict(zip(["id", "host", "port", "user", "password", "protocol", "region"], proxy)) if proxy else None + + def get_proxy_count(self, region: Optional[str] = "ALL") -> int: + """ + 获取指定区域的代理数量 + :param region: 区域代码,默认为 "ALL" + :return: 代理数量 + """ + with self._get_connection() as conn: + if region == "ALL": + cursor = conn.execute("SELECT COUNT(*) FROM proxies") + else: + cursor = conn.execute("SELECT COUNT(*) FROM proxies WHERE region = ?", (region,)) + return cursor.fetchone()[0] + + def get_summary(self) -> Dict[str, int]: + """ + 获取所有区域的代理统计数量 + :return: 包含区域代码和代理数量的字典 + """ + with self._get_connection() as conn: + cursor = conn.execute("SELECT region, COUNT(*) FROM proxies GROUP BY region") + return {row[0]: row[1] for row in cursor.fetchall()} + + def export_proxies(self, file_path: str, serializer=None): + """ + 导出代理到文件 + :param file_path: 文件路径 + :param serializer: 可选的序列化函数,接受代理字典,返回字符串 + """ + if serializer is None: + serializer = serializer_smartproxy # 使用默认的序列化器 + + try: + with self._get_connection() as conn: + cursor = conn.execute("SELECT host, port, user, password, protocol, region FROM proxies") + proxies = cursor.fetchall() + + with open(file_path, "w") as file: + for proxy in proxies: + proxy_dict = dict(zip(["host", "port", "user", "password", "protocol", "region"], proxy)) + try: + line = serializer(proxy_dict) + file.write(line + "\n") + except Exception as e: + self.debug_print(f"序列化失败: {e}") + continue # 跳过错误的代理数据 + + self.debug_print(f"成功导出代理到 {file_path}!") + except Exception as e: + self.debug_print(f"Error exporting proxies: {str(e)}") + raise + + def get_proxies(self, region: Optional[str] = None) -> List[Dict]: + """ + 获取指定区域的所有代理列表 + :param region: 指定区域代码,如果为 None 或 "ALL",返回所有代理列表 + :return: 包含代理信息的字典列表 + """ + with self._get_connection() as conn: + if region is None or region == "ALL": + cursor = conn.execute("SELECT * FROM proxies") + else: + cursor = conn.execute("SELECT * FROM proxies WHERE region = ?", (region,)) + + rows = cursor.fetchall() + return [ + dict(zip(["id", "host", "port", "user", "password", "protocol", "region"], row)) + for row in rows + ] + + def clear(self): + """清空数据库中的所有数据""" + with self._get_connection() as conn: + conn.execute("DELETE FROM proxies") + conn.commit() + self.debug_print("数据库已清空。") + +def classifier_smartproxy(proxy_line): + """ + 从代理行中提取区域代码 + 区域代码格式: "_area-XX_", 提取 "XX" 部分作为区域代码。 + :param proxy_line: 代理行字符串 + :return: 区域代码 (如 "PL") 或 "OTHER" 如果提取失败 + """ + try: + # 找到 "_area-" 的起始位置 + start_index = proxy_line.find("_area-") + if start_index == -1: + return "OTHER" + + # 区域代码从 "_area-" 之后开始,到下一个 "_" 之前结束 + start_index += len("_area-") + end_index = proxy_line.find("_", start_index) + if end_index == -1: + return "OTHER" # 无法找到结束符,返回 "OTHER" + + # 提取区域代码并返回 + region_code = proxy_line[start_index:end_index] + return region_code.upper() # 返回大写的区域代码 + except Exception as e: + print(f"Error in region classification: {str(e)}") + return "OTHER" + +def serializer_smartproxy(proxy: Dict) -> str: + """ + 默认的代理导出序列化函数 + :param proxy: 代理字典 + :return: 格式化后的字符串,格式为 host:port:user:password 没有区域信息的原因在于,smartproxy 的格式里user 字段就包含了区域信息 + """ + try: + # 使用 "|" 分隔符标记区域,方便后续导入时解析 + return f"{proxy['host']}:{proxy['port']}:{proxy['user']}:{proxy['password']}" + except KeyError as e: + raise ValueError(f"代理信息缺少必要字段: {e}") if __name__ == "__main__": - manager = ProxyManager() - print(f'测试是否是空的:{manager.is_empty()}') - print(manager.import_proxies('IP.txt')) - print(f'再测试是否是空的:{manager.is_empty()}') + manager = ProxyManagerSQLite() + # 导入代理 + manager.import_proxies_with_classifier("IP.txt", classifier=classifier_smartproxy) - random_proxy = manager.get_random_proxy() - print(f"获取到的随机代理:{random_proxy}") + # 获取汇总统计 + print("代理统计:", manager.get_summary()) - test_result = manager.test_proxy(random_proxy) - print(f"随机代理的出口 IP:{test_result}") + # 获取随机代理 + proxy = manager.get_random_proxy_by_region(region="PL", remove_after_fetch=True) + print(f"取出的代理: {proxy}") - proxy, result = manager.get_and_test_random_proxy() - if result: - print(f"测试成功 : {result},取得的代理是:{proxy},这个代理已经从代理管理器里移除。") - else: - print("测试失败.") + # 获取代理数量 + print("所有代理总数:", manager.get_proxy_count("ALL")) + print("PL 区域代理数:", manager.get_proxy_count("PL")) + + print("PL 区域当前列表:",manager.get_proxies("PL")) + + print("目前所有的可用代理列表:",manager.get_proxies("ALL")) + + manager.export_proxies("剩下的可用IP.txt") + \ No newline at end of file