import json import shutil import socket import sys import threading from multiprocessing import freeze_support import pandas as pd from DrissionPage import ChromiumOptions, ChromiumPage from DrissionPage.errors import * import random import imaplib import email from email.header import decode_header import re import time from datetime import datetime, timedelta import os from dotenv import load_dotenv, set_key from filelock import FileLock from openpyxl import load_workbook import ctypes from ctypes import wintypes import uuid import multiprocessing import tkinter as tk from tkinter import filedialog, messagebox, ttk import string from multiprocessing import Lock, Queue, Pool # Windows 消息常量 WM_MOUSEMOVE = 0x0200 WM_LBUTTONDOWN = 0x0201 WM_LBUTTONUP = 0x0202 MK_LBUTTON = 0x0001 # 停止标志 stop_event = threading.Event() user32 = ctypes.windll.user32 root_plugin_dir = os.path.join(os.getcwd(), 'proxy_auth_plugin') # 输入邮箱信息 IMAP_SERVER = "server-10474.cuiqiu.vip" # 替换为你的邮件服务的IMAP服务器地址,例如Gmail的为"imap.gmail.com" EMAIL_ACCOUNT = "gmailvinted@mailezu.com" # 主账户邮箱地址 EMAIL_PASSWORD = "g1l2o0hld84" # 邮箱密码或应用专用密码 ua_templates = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_2_3) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:95.0) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}", "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}" ] def generate_user_agent(): # 随机选择一个Chrome的UA模板 selected_template = random.choice(ua_templates) # 随机生成 WebKit、Safari 和 Chrome 版本号 webkit_version = f"{random.randint(500, 599)}.{random.randint(0, 99)}" safari_version = f"{random.randint(500, 599)}.{random.randint(0, 99)}" chrome_version = f"{random.randint(40, 99)}.{random.randint(0, 2999)}.{random.randint(0, 99)}" # 填充模板中的动态部分并生成完整的UA user_agent = selected_template.format( webkit_version=webkit_version, safari_version=safari_version, chrome_version=chrome_version ) return user_agent def random_sleep(tab, min_seconds=0, max_seconds=2): """在 min_seconds 和 max_seconds 之间随机停顿""" tab.wait(random.uniform(min_seconds, max_seconds)) def get_free_port(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('localhost', 0)) # 使用端口0让操作系统自动选择一个未占用的端口 port = s.getsockname()[1] s.close() return port # 设置代理ip def create_proxy_auth_extension(proxy_host, proxy_port, proxy_username, proxy_password, scheme='PROXY', proxy_domains=None): # Define a root directory root_plugin_dir = os.path.join(os.getcwd(), 'proxy_auth_plugin') if not os.path.exists(root_plugin_dir): os.makedirs(root_plugin_dir) # Create root directory # Use a unique identifier to generate a separate plugin directory plugin_path = os.path.join(root_plugin_dir, f'plugin_{uuid.uuid4().hex}') os.makedirs(plugin_path) # Create plugin directory # Contents of manifest.json for the plugin manifest_json = """ { "version": "1.0.0", "manifest_version": 2, "name": "Proxy Auth Extension", "permissions": [ "proxy", "tabs", "unlimitedStorage", "storage", "", "webRequest", "webRequestBlocking" ], "background": { "scripts": ["background.js"] }, "minimum_chrome_version":"22.0.0" } """ # Ensure proxy_domains is a list if proxy_domains is None: proxy_domains = [] # Generate domains_list for PAC script # Since we can only match on hostnames, we use the domain names domains_list = ', '.join('"{}"'.format(domain) for domain in proxy_domains) # Generate PAC script pac_script = string.Template( ''' function FindProxyForURL(url, host) { var proxy = "${scheme} ${proxy_host}:${proxy_port}"; var direct = "DIRECT"; var domains = [${domains_list}]; for (var i = 0; i < domains.length; i++) { if (dnsDomainIs(host, domains[i])) { return proxy; } } return direct; } ''' ).substitute( scheme=scheme.upper(), proxy_host=proxy_host, proxy_port=proxy_port, domains_list=domains_list ) # Since we cannot inspect the full URL in onAuthRequired, we'll listen to all URLs # But we'll check if the proxy is challenging for authentication # We can do this by checking the 'isProxy' flag in the details # However, 'isProxy' is only available in certain contexts # Alternatively, we can restrict the listener to the proxy server # Contents of background.js, using string templates to insert dynamic configuration background_js = string.Template( ''' var config = { mode: "pac_script", pacScript: { data: $pac_script } }; chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); function callbackFn(details) { if (details.challenger && details.challenger.host === "${proxy_host}" && details.challenger.port == ${proxy_port}) { return { authCredentials: { username: "${username}", password: "${password}" } }; } } chrome.webRequest.onAuthRequired.addListener( callbackFn, {urls: [""]}, ['blocking'] ); ''' ).substitute( pac_script=json.dumps(pac_script), username=proxy_username, password=proxy_password, proxy_host=proxy_host, proxy_port=proxy_port ) # Write manifest.json and background.js to the plugin directory with open(os.path.join(plugin_path, 'manifest.json'), 'w') as f: f.write(manifest_json) with open(os.path.join(plugin_path, 'background.js'), 'w') as f: f.write(background_js) # Return the unique plugin path return plugin_path def delete_folder(file_path): """ 删除指定的代理插件文件夹及其内容 :param plugin_path: 代理插件文件夹路径 """ if os.path.exists(file_path): shutil.rmtree(file_path) # 获取窗口标题 def get_window_title(hwnd): length = user32.GetWindowTextLengthW(hwnd) if length > 0: buffer = ctypes.create_unicode_buffer(length + 1) user32.GetWindowTextW(hwnd, buffer, length + 1) return buffer.value return "" # 枚举所有窗口 def enum_windows(): windows = [] def callback(hwnd, lparam): title = get_window_title(hwnd) if title and "Google Chrome" in title: windows.append((hwnd, title)) return True EnumWindowsProc = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, ctypes.c_void_p) user32.EnumWindows(EnumWindowsProc(callback), 0) return windows def find_gmail_window(windows, target_title): for hwnd, title in windows: if target_title in title: # 判断目标标题是否包含在窗口标题中 return hwnd return None # 模拟鼠标事件 def simulate_mouse(hwnd, x, y): lParam = (y << 16) | x user32.PostMessageW(hwnd, WM_MOUSEMOVE, 0, lParam) user32.PostMessageW(hwnd, WM_LBUTTONDOWN, MK_LBUTTON, lParam) user32.PostMessageW(hwnd, WM_LBUTTONUP, 0, lParam) def click_the_login_button(tab, login_a): # 获取href属性 href = login_a.attr('href') # 访问href属性指向的页面 tab.get(href) def input_email(tab, hwnd, email, email_input): email_input.clear() """ 输入账号并点击“下一步”按钮 :param tab: 浏览器对象 :param email: 用户的邮箱账号 """ # 定位邮箱输入框 # 模拟人类输入,每次输入一个字符,并随机延迟 for char in email: email_input.input(char) # 输入一个字符 tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒 tab.wait(1) next_input = tab.ele('@type=button', timeout=15, index=3) # 将按钮拖拽到页面顶部位置 next_input.drag_to((0, 0), 0) next_input.set.style('position', 'absolute') next_input.set.style('top', '0') next_input.set.style('left', '0') next_input.set.style('width', '1000px') next_input.set.style('height', '1000px') tab.wait(2) simulate_mouse(hwnd, 600, 600) tab.wait.load_start() flag = tab.wait.ele_deleted(next_input) if not flag: next_input.set.style('width', '10px') next_input.set.style('height', '10px') # 点击重试按钮 def recover(tab, recover_a): recover_a.click(by_js=True) def input_password(tab, hwnd, password, password_input): password_input.clear() """ 输入密码并点击“下一步”按钮 :param tab: 浏览器对象 :param password: 用户的密码 """ # 模拟人类输入密码,每次输入一个字符,并随机延迟 for char in password: password_input.input(char) # 输入一个字符 tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒 random_sleep(tab) next_input = tab.ele('@type=button', timeout=15, index=2) # 将按钮拖拽到页面顶部位置 next_input.drag_to((0, 0), 0) next_input.set.style('position', 'absolute') next_input.set.style('top', '0') next_input.set.style('left', '0') next_input.set.style('width', '1000px') next_input.set.style('height', '1000px') tab.wait(2) simulate_mouse(hwnd, 700, 700) tab.wait.load_start() flag = tab.wait.ele_deleted(next_input, timeout=10) if not flag: next_input.set.style('width', '10px') next_input.set.style('height', '10px') def click_alternate_email_verification_button(tab, email_div): email_div.click(by_js=True) tab.wait.load_start() def input_recovery_code(tab, recovery_code): """ 输入辅助邮箱的验证码并点击“下一步”按钮 :param tab: 浏览器对象 :param recovery_code: 辅助邮箱的验证码 """ # 第一步:通过 class 定位到辅助邮箱的输入框 recovery_input = tab.ele('@aria-label=输入辅助邮箱地址', timeout=15) for char in recovery_code: recovery_input.input(char) # 输入一个字符 tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 next_input = tab.ele('@data-idom-class=nCP5yc AjY5Oe DuMIQc LQeN7 BqKGqe Jskylb TrZEUc lw1w4b', timeout=15) next_input.click(by_js=True) tab.wait.load_start() tab.wait(1) retry = tab.ele("@text()=您输入的电子邮件地址不正确,请重试。",timeout=15) if retry: return True # 找到修改邮箱的按钮 def click_use_other_account_button2(tab, next_span): next_span.click(by_js=True) tab.wait.load_start() # 修改辅助邮箱账号 def modify_the_secondary_email1(tab, auxiliary_email_account): button = tab.ele('@@tag()=i@@text()=edit', timeout=15) button.click(by_js=True) tab.wait(2) input = tab.ele('@type=email', timeout=15) input.clear() for char in auxiliary_email_account: input.input(char) # Enter one character at a time tab.wait(random.uniform(0.1, 0.3)) # Random delay between 0.1 and 0.3 seconds # 点击保存 save_span = tab.ele('@text()=Save', timeout=15) save_span.click(by_js=True) button = tab.ele('@text()=Not now',timeout=15) button.click(by_js=True) tab.wait.load_start() def click_continue_button(tab, continue_div1): # 点击出现的弹窗 tab.handle_alert(accept=True) tab.wait(1) continue_div1.click(by_js=True) tab.wait(1) next_button = tab.ele('@@tag()=button@@name=data_consent_dialog_next', timeout=15) next_button.click(by_js=True) tab.wait(1) continue_div2 = tab.ele('@text()=Personalize other Google products with my Gmail, Chat, and Meet data', timeout=15) continue_div2.click(by_js=True) tab.wait(1) Done_button = tab.ele('@name=data_consent_dialog_done', timeout=15) Done_button.click(by_js=True) tab.wait(15) Reload = tab.ele('@text()=Reload', timeout=15) if Reload: Reload.click(by_js=True) def find_and_visit_href2(tab): password_a = tab.ele('@aria-label=Password', timeout=15) print(password_a) href = password_a.attr('href') tab.get(href) def input_password_new(tab, password): """ 输入密码并点击“下一步”按钮 :param tab: 浏览器对象 :param password: 用户的密码 """ password_input = tab.ele('@@tag()=input@@name=password') # 模拟人类输入密码,每次输入一个字符,并随机延迟 for char in password: password_input.input(char) # 输入一个字符 tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒 confirm_new_password_input = tab.ele("@@tag()=input@@name=confirmation_password") # 模拟人类输入密码,每次输入一个字符,并随机延迟 for char in password: confirm_new_password_input.input(char) # 输入一个字符 tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒 # 点击修改按钮 def click_span_with_class(tab): # 找到 class 为 UywwFc vQzf8d 的元素 span_element = tab.ele('@text()=Change password', timeout=15) # 用 JavaScript 点击该元素 span_element.click(by_js=True) # 返回上个页面 tab.back() # 修改辅助邮箱账号2 def modify_the_secondary_email2(tab, new_recovery_email, new_password, hwnd): try: # 定位恢复邮箱的元素 recovery_email = tab.ele("@aria-label=Recovery email", timeout=15) if not recovery_email: raise Exception("未找到恢复邮箱元素") # 获取恢复邮箱的链接并导航 href = recovery_email.attr('href') if not href: raise Exception("恢复邮箱链接未找到") tab.get(href) password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15) if password_input: input_password(tab, hwnd, new_password, password_input) tab.wait.load_start() # 等待页面加载开始 # 定位邮箱输入框并清空内容 email_input = tab.ele("@type=email", timeout=15) # 查看辅助邮箱账号是否已经被修改了 if email_input.value == new_recovery_email: return True if not email_input: raise Exception("未找到邮箱输入框") email_input.clear() for char in new_recovery_email: email_input.input(char) # 输入一个字符 tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒 # 定位并点击验证按钮 verify_button = tab.ele("@text()=Verify", timeout=15) print(f"验证按钮已经找到:{verify_button}") if not verify_button: raise Exception("未找到验证按钮") verify_button.click(by_js=True) return False # 成功返回 True except Exception as e: print(f"更新恢复邮箱时发生错误: {e}") return False # 出现错误返回 False # 定义保存到本地文件的函数 def save_to_file(entry, filename="code.txt"): """将条目保存到本地文件,避免重复""" # 如果文件不存在,先创建空文件 if not os.path.exists(filename): with open(filename, "w", encoding='utf-8') as file: pass # 创建文件 # 读取文件内容,检查是否存在相同条目 with open(filename, "r", encoding='utf-8') as file: existing_entries = file.readlines() # 构造条目字符串 entry_str = f"验证码: {entry['验证码']}, 发送的邮箱账号: {entry['发送的邮箱账号']}, 收到的时间: {entry['收到的时间']}\n" if entry_str not in existing_entries: # 如果条目不在文件中 # 以追加模式写入文件 with open(filename, "a", encoding='utf-8') as file: file.write(entry_str) print("--------------------------------------------------") print(f"新条目已保存: {entry_str.strip()}") def clean(text): """清理邮件主题或发件人内容中的特殊字符""" return "".join(c if c.isalnum() else "_" for c in text) # 计算每次运行后的总邮件数量 def update_env_variable(key, value): load_dotenv() env_vars = {} # 读取现有的 .env 文件内容 if os.path.exists(".env"): with open(".env", "r") as f: lines = f.readlines() for line in lines: if "=" in line: k, v = line.strip().split("=", 1) env_vars[k] = v # 更新或新增变量 env_vars[key] = str(value) # 写回到 .env 文件 with open(".env", "w") as f: for k, v in env_vars.items(): f.write(f"{k}={v}\n") # 获取辅助邮箱验证码 def check_emails(): """检查邮件并提取验证码和邮箱账号""" try: # 连接到IMAP服务器 mail = imaplib.IMAP4_SSL(IMAP_SERVER) mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) # 选择邮箱文件夹(通常是 'INBOX') mail.select("inbox") # 搜索邮件,搜索条件可以是 'FROM' 限定谷歌发件人 status, messages = mail.search(None, 'FROM "google.com"') # 搜索来自谷歌的邮件 mail_ids = messages[0].split() total_mails = len(mail_ids) print(f"总邮件数量: {len(mail_ids)}") # 获取环境变量 total_mails = os.getenv('TOTAL_MAILS') # 获取总邮件数量 loop_count = os.getenv('LOOP_COUNT') # 获取循环运行的次数 # 打印输出变量 print(f"总邮件数量 (TOTAL_MAILS): {total_mails}") print(f"循环运行的次数 (LOOP_COUNT): {loop_count}") update_env_variable("TOTAL_MAILS", total_mails) # 更新到 .env 文件中 for i in mail_ids[-30:]: # 仅获取最近10封邮件 # 获取邮件内容 res, msg = mail.fetch(i, "(RFC822)") for response in msg: if isinstance(response, tuple): # 解析邮件内容 msg = email.message_from_bytes(response[1]) # 获取邮件主题 subject, encoding = decode_header(msg["Subject"])[0] if isinstance(subject, bytes): # 如果是字节,需要解码 subject = subject.decode(encoding if encoding else "utf-8") # 获取发件人 from_ = msg.get("From") # 创建存储结果的字典列表 results = [] # 如果邮件有正文部分 if msg.is_multipart(): for part in msg.walk(): # 如果是文本/HTML内容 if part.get_content_type() == "text/plain": body = part.get_payload(decode=True).decode() # 提取验证码和邮箱账号 match = re.search(r'(\d{6})[\s\S]*?([a-zA-Z0-9._%+-]+@gmail\.com)', body) if match: code = match.group(1) gmail_account = match.group(2) # 获取邮件接收时间 date_tuple = email.utils.parsedate_tz(msg["Date"]) received_time = datetime.fromtimestamp( email.utils.mktime_tz(date_tuple)) if date_tuple else None # 添加到字典之前检查是否重复 entry = { "验证码": code, "发送的邮箱账号": gmail_account, "收到的时间": received_time.strftime( "%Y-%m-%d %H:%M:%S") if received_time else "未知" } print(entry) # 无需检查是否在字典中,直接保存到文件 save_to_file(entry) # 调用保存函数 else: # 如果邮件不是多部分(简单邮件) body = msg.get_payload(decode=True).decode() # 提取验证码和邮箱账号 match = re.search(r'(\d{6})[\s\S]*?([a-zA-Z0-9._%+-]+@gmail\.com)', body) if match: code = match.group(1) gmail_account = match.group(2) # 获取邮件接收时间 date_tuple = email.utils.parsedate_tz(msg["Date"]) received_time = datetime.fromtimestamp( email.utils.mktime_tz(date_tuple)) if date_tuple else None # 添加到字典之前检查是否重复 entry = { "验证码": code, "发送的邮箱账号": gmail_account, "收到的时间": received_time.strftime( "%Y-%m-%d %H:%M:%S") if received_time else "未知" } print(entry) # 无需检查是否在字典中,直接保存到文件 save_to_file(entry) # 调用保存函数 # 关闭连接 mail.logout() except Exception as e: print(f"发生错误: {e}") def get_verification_codes(email_account, sent_time): """ 根据邮箱和发送时间筛选验证码。 :param email: 发送验证码的邮箱账号 :param sent_time: 验证码发送的时间(格式:YYYY-MM-DD HH:MM:SS) :return: 符合条件的验证码列表 """ sent_time = datetime.strptime(sent_time, "%Y-%m-%d %H:%M:%S") latest_code = None latest_time = None try: # 打开文件并逐行读取 with open("code.txt", "r", encoding="utf-8") as file: for line in file: # 检查当前行是否包含指定邮箱 if f"发送的邮箱账号: {email_account}".lower() in line: # 提取时间和验证码 time_str = line.split("收到的时间:")[-1].strip() code = line.split("验证码:")[1].split(",")[0].strip() # 将时间字符串转换为 datetime 对象 received_time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") # 筛选条件:发送时间 <= 收到的时间 if sent_time <= received_time + timedelta(seconds=10): # 更新为最新的验证码 if latest_time is None or received_time > latest_time: latest_time = received_time latest_code = code except FileNotFoundError: print("错误: 找不到 code.txt 文件。") except Exception as e: print(f"错误: {e}") return latest_code # 计算循环次数的函数 def manage_loop_count(): # 加载环境变量 load_dotenv() # 获取当前 LOOP_COUNT 值,如果不存在则默认为 0 count = int(os.getenv("LOOP_COUNT", 0)) # 增加计数器 count += 1 # 更新 .env 文件中的 LOOP_COUNT set_key(".env", "LOOP_COUNT", str(count)) # 返回更新后的计数 return count # 用来标记账号是否完成 def update_excel_status(file_path, sheet_name, row_index, status): """ 更新 Excel 中指定行的 F 列为给定状态。 参数: file_path (str): Excel 文件路径。 sheet_name (str): 要更新的工作表名称。 row_index (int): 要更新的行号(从 1 开始)。 status (str): 要写入 F 列的状态。 """ wb = load_workbook(file_path) # 检查工作表是否存在 if sheet_name not in wb.sheetnames: raise ValueError(f"Sheet '{sheet_name}' not found in the workbook.") # 选择动态传入的工作表 sheet = wb[sheet_name] sheet.cell(row=row_index, column=6, value=status) # F 列是第 6 列 wb.save(file_path) print(f"第{row_index}行已经更改状态为:{status}") # 保存日志到指定文件 def save_log(random_number, log_content): logs_dir = "logs" # 创建文件名,使用传入的 random_number log_file_path = os.path.join(logs_dir, f"{random_number}.txt") # 将日志内容写入文件 with open(log_file_path, 'a', encoding='utf-8') as f: f.write(log_content) f.write("\n") def retry_with_recovery(tab, hwnd, email_account, MAX_RETRIES=4, wait_time=15): """ 封装重试逻辑,最多尝试 `MAX_RETRIES` 次,检查是否存在重试按钮,并重新输入邮箱地址。 :param tab: 页面操作对象,假设具备 `.ele` 方法用于获取元素,`.wait.load_start()` 方法用于等待加载 :param recover: 处理重试按钮点击的函数 :param email_account: 需要输入的邮箱账号 :param MAX_RETRIES: 最大重试次数,默认为4次 :param wait_time: 等待时间,默认为15秒 """ attempt = 0 # 初始化尝试次数 while attempt < MAX_RETRIES: recover_a = tab.ele("@aria-label=重试", timeout=wait_time) # 检查是否有 '重试' 按钮 print(f"Attempt {attempt + 1}: {recover_a}") tab.wait.load_start() # 等待加载开始 if recover_a: recover(tab, recover_a) # 调用 recover 函数处理重试按钮 email_input = tab.ele('@aria-label=电子邮件地址或电话号码', timeout=wait_time) # 找到邮箱输入框 input_email(tab, hwnd, email_account, email_input) # 重新输入邮箱账号 random_sleep(tab) # 随机等待,避免被检测到自动化 attempt += 1 # 增加尝试次数 else: break # 如果没有 '重试' 按钮,则跳出循环 # 如果达到最大尝试次数,跳出循环 if attempt == MAX_RETRIES: print("Reached maximum retries") break def get_absolute_path(relative_path): """获取绝对路径""" # 获取当前执行文件的路径(脚本或.exe) if getattr(sys, 'frozen', False): # 检测是否为打包后的exe base_path = sys._MEIPASS # PyInstaller打包后资源文件临时路径 exe_path = os.path.dirname(sys.executable) else: base_path = os.path.abspath(".") exe_path = os.path.dirname(os.path.abspath(__file__)) # 如果需要拼接在exe路径上: absolute_path = os.path.join(exe_path, relative_path) return absolute_path # 判断验证码是否出现报错 js_code = """ function isElementVisible(element) { if (!(element instanceof Element)) { throw new Error("参数必须是一个 DOM 元素"); } if (!document.body.contains(element)) { return false; } const style = window.getComputedStyle(element); if (style.display === "none" || style.visibility === "hidden" || style.opacity === "0") { return false; } const rect = element.getBoundingClientRect(); if (rect.width === 0 || rect.height === 0) { return false; } const { top, right, bottom, left } = rect; const viewportWidth = window.innerWidth || document.documentElement.clientWidth; const viewportHeight = window.innerHeight || document.documentElement.clientHeight; if (bottom < 0 || top > viewportHeight || right < 0 || left > viewportWidth) { return false; } const centerX = (left + right) / 2; const centerY = (top + bottom) / 2; const elementAtCenter = document.elementFromPoint(centerX, centerY); if (elementAtCenter && !element.contains(elementAtCenter) && !elementAtCenter.contains(element)) { return false; } return true; } var element = document.querySelector("#c4"); return isElementVisible(element); """ def logutGoogle(tab) -> bool: tab.get('https://accounts.google.com/Logout?hl=zh-CN') if tab.ele('@@tag()=input@@type=email', timeout=0) != None: # 已注销就会出登录页元素 print("当前浏览器已退出,无需再次退出") return True # 还有没注销移除的账号就会显示已退出和账号元素 try: tab.ele('@text()=移除账号', timeout=0).click() tab.wait(2).ele('@text()=已退出', timeout=0).parent().parent().next().click() # 移除图标 tab.wait(3).ele('@@text()=移除@@tag()=button').click() # 移除确认框 return (tab.ele('@@tag()=input@@type=email', timeout=8) != None) # 检查最后界面是否是登录 except Exception as e: print(f'发生错误:{e}') return False return False # 无用分支,不过为了避免函数太长回头修改时候忘记,还是写个 False 以防万一 def main(email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password,row_index, file_path, sheet_name): global browser, plugin_path, user_dir # 生成一个 6 位的随机数 global random_number try: random_number = str(random.randint(100000000, 999999999)) """ 主函数,用于自动登录谷歌邮箱并修改账户和密码。 参数: email_account (str): 邮箱账号 email_password (str): 邮箱密码 old_recovery_email (str): 旧的辅助邮箱账号 new_recovery_email (str): 新的辅助邮箱账号 new_password (str): 新密码 """ if stop_event.is_set(): print("任务被强制停止,退出主函数") return None # 直接返回 None,跳过当前任务 lock = FileLock("proxy_auth_extension.lock") with lock: # 加锁,确保其他进程在解锁前无法操作 plugin_path = create_proxy_auth_extension( proxy_host=proxy_host, proxy_port=proxy_port, proxy_username=proxy_username, proxy_password=proxy_password, proxy_domains=[ "accounts.google.com", "accounts.youtube.com", "content-autofill.googleapis.com", "myaccount.google.com", "apis.google.com", "www.google.com", "ogs.google.com", "jsonip.com" ] ) save_log(random_number, f"已经创建好代理插件:{plugin_path}") # 创建ChromiumOptions对象并配置 options = ChromiumOptions() options.add_extension(plugin_path) # 使用随机生成的User-Agent random_user_agent = generate_user_agent() options.set_user_agent(user_agent=random_user_agent) # 随机分配一个端口号 random_port = get_free_port() print(f"现在占用的端口号是:{random_port}") save_log(random_number, f"现在占用的端口号是:{random_port}") options.set_paths(local_port=random_port) # 示例使用 browser_path = get_absolute_path(r"chrome\chrome.exe") options.set_paths(f"{browser_path}") options.no_imgs(True).mute(True) cache_path = os.path.join(os.getcwd(), 'cache') temp_path = os.path.join(os.getcwd(), 'tempfile') user_dir = os.path.join(os.getcwd(), 'userdata', f'{uuid.uuid4().hex}') options.set_cache_path(cache_path) options.set_tmp_path(temp_path) options.set_user_data_path(user_dir) print(cache_path) print(temp_path) print(user_dir) browser = ChromiumPage(options) # 清除缓存和cookies browser.clear_cache(cookies=True) # browser = Chromium(29581) # 获取标签页对象 tab = browser.latest_tab save_log(random_number, "已获取谷歌的最后一个页面") # 访问谷歌邮箱页面 flag = tab.get('https://workspace.google.com/intl/zh-CN/gmail/') save_log(random_number, f"访问谷歌邮箱页面的状态:{flag}") if not flag: browser.quit() return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) save_log(random_number, "已经访问谷歌页面") # 执行JavaScript清除localStorage tab.run_js("window.localStorage.clear();") # 进入登录流程 login_a = tab.ele('@aria-label=登录 Gmail', timeout=15) if login_a: click_the_login_button(tab, login_a) random_sleep(tab) tab.wait(3) # 将随机数设置为窗口标题 script = f"document.title = '{random_number}';" # 执行 JS 脚本 tab.run_js(script) actual_title = tab.run_js("return document.title;") print("登录运行完毕") save_log(random_number, "已经进入登录界面") tab.wait(7) save_log(random_number, f"设置标题为: {random_number}, 实际标题为: {actual_title}\n") # 获取所有 Chrome 窗口 chrome_windows = enum_windows() save_log(random_number, f"当前谷歌浏览器的所有窗口为:{chrome_windows}") hwnd = None for win in chrome_windows: if random_number in win[1]: # 假设 `win['title']` 是窗口标题 hwnd = win[0] print(f"窗口句柄:{win[0]} 窗口标题:{win[1]}") save_log(random_number, f"获取的窗口的句柄:{win[0]} 和标题:{win[1]}") break if hwnd is None: print("无法找到窗口句柄") save_log(random_number, "无法找到窗口句柄") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) try: # 输入邮箱账号 email_input = tab.ele('@aria-label=电子邮件地址或电话号码', timeout=15) if email_input: input_email(tab, hwnd, email_account, email_input) # 使用传入的邮箱账号 random_sleep(tab) except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") update_excel_status(file_path, sheet_name,row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) print("输入邮箱账号运行完毕") save_log(random_number, "输入邮箱账号运行完毕") wrong = tab.ele('@text()=出了点问题', timeout=15) if wrong: next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15) next_span.click(by_js=True) tab.wait.load_start() try: # 输入邮箱账号 email_input = tab.ele('@aria-label=电子邮件地址或电话号码', timeout=15) if email_input: input_email(tab, hwnd, email_account, email_input) # 使用传入的邮箱账号 random_sleep(tab) except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) retry_with_recovery(tab, hwnd, email_account) print("检查是否出现了wrong运行完毕") save_log(random_number, "检查是否出现了wrong运行完毕") # 看下是否出现了手机号 telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) if telephone: update_excel_status(file_path, sheet_name,row_index, '接码') browser.quit() return print("检查出现手机号运行完毕") save_log(random_number, "检查出现手机号运行完毕") try: # 输入密码 password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15) if password_input: input_password(tab, hwnd, email_password, password_input) # 使用传入的邮箱密码 random_sleep(tab) except ElementNotFoundError as e: print(f"找不到密码输入框的元素:{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") update_excel_status(file_path, sheet_name,row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) print("输入密码运行完毕") save_log(random_number, "输入密码运行完毕") wrong = tab.ele("@text()=抱歉,出了点问题。请重试。", timeout=15) save_log(random_number, f"谷歌验证按钮是否出现:{wrong}") if wrong: next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15) next_span.click(by_js=True) tab.wait.load_start() try: # 输入邮箱账号 email_input = tab.ele('@aria-label=电子邮件地址或电话号码', timeout=15) if email_input: input_email(tab, hwnd, email_account, email_input) # 使用传入的邮箱账号 random_sleep(tab) except ElementNotFoundError as e: print(f"找不到输入邮箱账号的元素{e}") save_log(random_number, f"找不到输入邮箱账号的元素:{e}") update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) try: # 输入密码 password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15) if password_input: input_password(tab, hwnd, email_password, password_input) # 使用传入的邮箱密码 random_sleep(tab) except ElementNotFoundError as e: print(f"找不到密码输入框的元素:{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) print("如果出现错误,输入密码运行完毕") save_log(random_number, "如果出现错误后,输入密码运行完毕") # 看下是否出现了手机号 telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) if telephone: update_excel_status(file_path, sheet_name, row_index, '接码') browser.quit() return print("检查手机号2运行完毕") save_log(random_number, "检查手机号2运行完毕") # 确定密码是否被修改的开关 password_change = False try: # 查看密码是否更改 password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15) if password_input: input_password(tab, hwnd, new_password, password_input) tab.wait(7) password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15) alternate_email_button = tab.ele('@text()=确认您的辅助邮箱', timeout=15) if password_input and not alternate_email_button: update_excel_status(file_path, sheet_name, row_index, '被盗') return password_change = True print("密码已经被更改过") save_log(random_number, "密码已经被更改过") except ElementNotFoundError as e: print(f"找不到密码输入框的元素{e}") save_log(random_number, f"找不到密码输入框的元素:{e}") update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) print("输入新密码运行完毕") save_log(random_number, "输入新密码运行完毕") try: # 使用辅助邮箱验证 alternate_email_button = tab.ele('@text()=确认您的辅助邮箱', timeout=15) if alternate_email_button: click_alternate_email_verification_button(tab, alternate_email_button) random_sleep(tab) except ElementNotFoundError as e: print(f"找不到输入辅助邮箱的元素:{e}" ) save_log(random_number, f"找不到输入辅助邮箱的元素:{e}") update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) print("点击辅助邮箱验证运行完毕") save_log(random_number, "点击辅助邮箱验证运行完毕") # 确定辅助邮箱账号是否被更改 auxiliary_email_account_change = False # 输入辅助邮箱账号 recovery_input = tab.ele('@aria-label=输入辅助邮箱地址', timeout=15) if recovery_input: auxiliary_email_account_change = input_recovery_code(tab, old_recovery_email) # 使用传入的旧辅助邮箱 tab.wait(5) # 如果没有被修改则进行修改 if not auxiliary_email_account_change: try: next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15) if next_span: click_use_other_account_button2(tab, next_span) tab.wait(3) modify_the_secondary_email1(tab, new_recovery_email) # 使用传入的新辅助邮箱 auxiliary_email_account_change = True except ElementNotFoundError as e: # 捕获并打印异常,不中断后续代码执行 print(f"修改辅助邮箱的时候找不到元素: {e}") save_log(random_number, f"修改辅助邮箱的时候找不到元素: {e}") update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) else: print("辅助邮箱账号已经被更改过") save_log(random_number, "辅助邮箱账号已经被更改过") recovery_input.clear() input_recovery_code(tab, new_recovery_email) print("输入辅助邮箱运行完毕") save_log(random_number, "输入辅助邮箱运行完毕") if password_change and auxiliary_email_account_change: update_excel_status(file_path, sheet_name, row_index, '已更改') logutGoogle(tab) browser.quit() return tab.handle_alert(accept=True) # 点击继续按钮 continue_div1 = tab.ele('@text()=Continue with smart features', timeout=15) if continue_div1: try: click_continue_button(tab, continue_div1) except ElementNotFoundError as e: print(f"找不到继续按钮的元素:{e}") save_log(random_number, f"找不到继续按钮的元素:{e}") update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) try: # 点击头像进入邮箱的安全设置 tab.handle_alert(accept=True) tab.get("https://ogs.google.com/u/0/widget/account?cn=account") tab.handle_alert(accept=True) tab.get("https://myaccount.google.com/security?gar=WzEyMF0") tab.handle_alert(accept=True) except ElementNotFoundError as e: print(f"找不到头像的元素:{e}") save_log(random_number, f"找不到继续按钮的元素:{e}") update_excel_status(file_path,sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) tab.wait(3) try: # 如果密码没被修改 if not password_change: # 找到修改密码的按钮 find_and_visit_href2(tab) # 修改密码 input_password_new(tab, new_password) # 使用传入的新密码 # 点击确认修改 click_span_with_class(tab) except ElementNotFoundError as e: print(f"找不到修改密码的元素:{e}") save_log(random_number, f"找不到修改密码的元素:{e}") update_excel_status(file_path,sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) tab.wait(1) try: if auxiliary_email_account_change: update_excel_status(file_path, sheet_name, row_index, '已更改') return # 修改辅助邮箱2 flag = modify_the_secondary_email2(tab, new_recovery_email, new_password, hwnd) save_log(random_number, f"辅助邮箱是否被修改:{flag}") if flag: browser.quit() update_excel_status(file_path, sheet_name, row_index, '已更改') return else: sent_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(sent_time) save_log(random_number, sent_time) # 清空.env文件内容 with open(".env", "w") as f: pass count = 0 while True: print("检查新邮件...") save_log(random_number, "检查新邮件...") check_emails() code = get_verification_codes(email_account, sent_time) if code: verification_input = tab.eles("@type=text", timeout=15) for char in code: if len(verification_input) > 1: verification_input[1].input(char) tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒 verify_button = tab.eles("@text()=Verify", timeout=15) print(verify_button) verify_button[1].click(by_js=True) tab.wait(5) is_visible = tab.run_js(js_code) if is_visible: sent_code_button = tab.ele('@text():Send a new code.', timeout=15) print(f"重新发送邮件的按钮为:{sent_code_button}") save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}") sent_code_button.click(by_js=True) continue else: print("辅助邮箱账号已经更改完毕") save_log(random_number, "辅助邮箱账号已经更改完毕") update_excel_status(file_path, sheet_name, row_index, '已更改') logutGoogle(tab) return count += 1 print(f"已运行 {count} 次") save_log(random_number, f"已运行 {count} 次") if count == 24: sent_code_button = tab.ele('@text():Send a new code.', timeout=15) print(f"重新发送邮件的按钮为:{sent_code_button}") save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}") sent_code_button.click(by_js=True) elif count == 48: sent_code_button = tab.ele('@text():Send a new code.', timeout=15) print(f"重新发送邮件的按钮为:{sent_code_button}") save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}") sent_code_button.click(by_js=True) elif count == 60: print("验证码超过四分钟没接收到,自动退出") save_log(random_number, "验证码超过四分钟没接收到,自动退出") update_excel_status(file_path, sheet_name, row_index, '未更改(验证码没收到)') return tab.wait(5) # 每5秒循环一次 except ElementNotFoundError as e: print(f"更改辅助邮箱账号的时候找不到元素:{e}") save_log(random_number, f"更改辅助邮箱账号的时候找不到元素:{e}") update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试") return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) except Exception as e: print(f"出现未知错误:{e}") save_log(random_number, f"出现未知错误:{e}") update_excel_status(file_path, sheet_name, row_index, f'出现未知错误:{e}请重试') return ( email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password, row_index) finally: browser.quit() time.sleep(5) delete_folder(plugin_path) delete_folder(user_dir) def run_gui(): stop_flag = threading.Event() # 用于控制任务中止的标志位 exec_thread = None # 用于存储后台线程的引用 def handle_file_select(): file_path = filedialog.askopenfilename( title="选择 Excel 文件", filetypes=[("Excel 文件", "*.xlsx"), ("所有文件", "*.*")] ) if file_path: entry_file_path.delete(0, tk.END) entry_file_path.insert(0, file_path) def start_processing(): file_path = entry_file_path.get() sheet_name = entry_sheet_name.get() max_concurrency = entry_concurrency.get() max_retries = entry_retries.get() # 检查文件路径和 Sheet 名称是否填写完整 if not file_path or not sheet_name or not max_concurrency or not max_retries: messagebox.showwarning("警告", "请填写完整的文件路径、表格名称、并发数和重试次数!") return # 检查文件路径是否存在 if not os.path.exists(file_path): if not file_path.endswith(".xlsx"): messagebox.showerror("错误", "文件的格式必须是.xlsx") else: messagebox.showerror("错误", "没有找到该路径的文件,请检查文件是否存在!") return # 检查文件是否正在被打开 try: with open(file_path, 'r+b') as file: pass # 成功打开说明未被占用 except PermissionError: messagebox.showerror("错误", "文件正在被占用,请先保存并关闭 Excel 程序!") return # 验证 Sheet 名称是否存在 try: wb = load_workbook(file_path, read_only=True) if sheet_name not in wb.sheetnames: messagebox.showerror("错误", f"“{sheet_name}”表格名字不存在,请打开 Excel 文件检查!") return except Exception as e: messagebox.showerror("错误", f"无法打开文件:{e}") return # 验证并发数 try: max_concurrency = int(max_concurrency) if max_concurrency <= 0: raise ValueError("并发数必须是大于0的整数!") except ValueError as e: messagebox.showerror("错误", f"无效的并发数:{e}") return # 验证重试次数 try: max_retries = int(max_retries) if max_retries < 0: raise ValueError("重试次数必须是大于或等于0的整数!") except ValueError as e: messagebox.showerror("错误", f"无效的重试次数:{e}") return # 初始化 Excel 数据和进度条状态 all_rows = read_excel_data_for_main(file_path, sheet_name) # 读取 Excel 数据 total_tasks = len(all_rows) completed_count = 0 failed_count = 0 # 设置进度条和状态显示 progress_bar['maximum'] = total_tasks progress_bar['value'] = completed_count # 初始化为0 lbl_progress_status.config( text=f"完成:{completed_count}/{total_tasks},失败:{failed_count}" ) stop_flag.clear() # 清除停止标志 exec_thread = threading.Thread(target=parallel_execution, args=(file_path, sheet_name, max_concurrency, max_retries)) exec_thread.daemon = True # 设置为守护线程 exec_thread.start() def stop_processing(): stop_flag.set() # 设置标志位为 True,通知停止 messagebox.showinfo("提示", "等待当前任务完成,程序即将停止!") def on_closing(): stop_flag.set() # 设置标志位为 True,通知停止 if exec_thread and exec_thread.is_alive(): exec_thread.join(timeout=5) # 等待线程结束 root.destroy() # 销毁主窗口 sys.exit(0) # 强制退出程序 def parallel_execution(file_path, sheet_name, max_concurrency, max_retries): progress_queue = Queue() all_rows = read_excel_data_for_main(file_path, sheet_name) failed_rows = [] total_rows = len(all_rows) completed_count = 0 failed_count = 0 # 更新总条目数 progress_bar['maximum'] = total_rows with Pool(processes=max_concurrency) as pool: retry_count = 0 while all_rows or failed_rows: if stop_flag.is_set(): print("停止信号已接收,中止任务!") break rows_to_process = all_rows + failed_rows results = [] for i, row in enumerate(rows_to_process): if stop_flag.is_set(): print("停止信号已接收,中止任务!") break # 传递锁给子进程 result = pool.apply_async( main, args=(*row, file_path, sheet_name), callback=lambda result: progress_queue.put(result) ) results.append(result) all_rows = [] failed_rows = [] for result in results: try: retry_row = result.get() print(f"main函数返回的值: {retry_row}") if retry_row: # 如果返回需要重试的数据 failed_rows.append(retry_row) failed_count += 1 else: completed_count += 1 progress_bar['value'] = completed_count # 更新进度条 lbl_progress_status.config( text=f"完成:{completed_count}/{total_rows},失败:{failed_count}" ) except Exception as e: print(f"任务执行时发生错误: {e}") failed_count += 1 lbl_progress_status.config( text=f"完成:{completed_count}/{total_rows},失败:{failed_count}" ) messagebox.showwarning("任务执行错误", f"任务执行时发生错误: {e}") if stop_flag.is_set(): print("停止信号已接收,中止任务!") break if len(failed_rows) > 0 and retry_count < max_retries: retry_count += 1 all_rows = failed_rows failed_rows = [] elif retry_count >= max_retries: print("达到最大重试次数,停止重试。") break else: print("所有任务已完成。") messagebox.showinfo('所有任务已经完成') def read_excel_data_for_main(file_path, sheet_name): df = pd.read_excel(file_path, sheet_name=sheet_name) skip_keywords = {"已更改", "接码", "被盗"} data = [] global total_tasks, completed_tasks, failed_tasks # 全局变量以更新状态 completed_tasks = sum( df["是否更改完成"].isin(skip_keywords) ) # 统计已完成的 failed_tasks = len(df) - completed_tasks - df["是否更改完成"].isna().sum() total_tasks = len(df) - completed_tasks for i in range(len(df)): if pd.isna(df.iloc[i, 0]): break if pd.notna(df.iloc[i, 6]) and df.iloc[i, 5] not in skip_keywords and ':' in df.iloc[i, 6]: proxy_parts = df.iloc[i, 6].split(':') if len(proxy_parts) == 4: proxy_host, proxy_port, proxy_user, proxy_pass = proxy_parts data.append(( df.iloc[i, 0], df.iloc[i, 1], df.iloc[i, 2], df.iloc[i, 4], df.iloc[i, 3], proxy_host, proxy_port, proxy_user, proxy_pass, i + 2 )) return data root = tk.Tk() root.title("程序控制界面") tk.Label(root, text="Excel 文件路径:").grid(row=0, column=0, padx=10, pady=10) entry_file_path = tk.Entry(root, width=50) entry_file_path.grid(row=0, column=1, padx=10, pady=10) btn_select_file = tk.Button(root, text="选择文件", command=handle_file_select) btn_select_file.grid(row=0, column=2, padx=10, pady=10) tk.Label(root, text="Sheet 名称:").grid(row=1, column=0, padx=10, pady=10) entry_sheet_name = tk.Entry(root, width=50) entry_sheet_name.grid(row=1, column=1, padx=10, pady=10) entry_sheet_name.insert(0, "修改邮箱密码和辅助邮箱") tk.Label(root, text="并发数:").grid(row=2, column=0, padx=10, pady=10) entry_concurrency = tk.Entry(root, width=50) entry_concurrency.grid(row=2, column=1, padx=10, pady=10) entry_concurrency.insert(0, str(multiprocessing.cpu_count())) tk.Label(root, text="重试次数:").grid(row=3, column=0, padx=10, pady=10) entry_retries = tk.Entry(root, width=50) entry_retries.grid(row=3, column=1, padx=10, pady=10) entry_retries.insert(0, "3") # 进度条 progress_bar = ttk.Progressbar(root, orient="horizontal", length=400, mode="determinate") progress_bar.grid(row=4, column=0, columnspan=2, padx=10, pady=10) # 进度状态显示 lbl_progress_status = tk.Label(root, text="完成:0/0,失败:0") lbl_progress_status.grid(row=5, column=0, columnspan=2, padx=10, pady=10) btn_start = tk.Button(root, text="开始处理", command=start_processing) btn_start.grid(row=6, column=0, columnspan=2, padx=20, pady=20, sticky="ew") btn_stop = tk.Button(root, text="停止处理", command=stop_processing) btn_stop.grid(row=7, column=0, columnspan=2, padx=20, pady=20, sticky="ew") root.protocol("WM_DELETE_WINDOW", on_closing) # 绑定窗口关闭事件 root.mainloop() # 示例运行方式 if __name__ == "__main__": logs_dir = "logs" # 确保 logs 文件夹存在,如果不存在则创建 if not os.path.exists(logs_dir): os.makedirs(logs_dir) # 清空 logs 文件夹中的所有文件 for filename in os.listdir("logs"): file_path = os.path.join("logs", filename) if os.path.isfile(file_path): os.remove(file_path) freeze_support() run_gui()