Gmail/main.py

1626 lines
65 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import shutil
import socket
import sys
import threading
from multiprocessing import freeze_support
import pandas as pd
from DrissionPage import ChromiumOptions, ChromiumPage
from DrissionPage.errors import *
import random
import imaplib
import email
from email.header import decode_header
import re
import time
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv, set_key
from filelock import FileLock
from openpyxl import load_workbook
import ctypes
from ctypes import wintypes
import uuid
import multiprocessing
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
import string
from multiprocessing import Lock, Queue, Pool
# Windows 消息常量
WM_MOUSEMOVE = 0x0200
WM_LBUTTONDOWN = 0x0201
WM_LBUTTONUP = 0x0202
MK_LBUTTON = 0x0001
# 停止标志
stop_event = threading.Event()
user32 = ctypes.windll.user32
root_plugin_dir = os.path.join(os.getcwd(), 'proxy_auth_plugin')
# 输入邮箱信息
IMAP_SERVER = "server-10474.cuiqiu.vip" # 替换为你的邮件服务的IMAP服务器地址例如Gmail的为"imap.gmail.com"
EMAIL_ACCOUNT = "gmailvinted@mailezu.com" # 主账户邮箱地址
EMAIL_PASSWORD = "g1l2o0hld84" # 邮箱密码或应用专用密码
ua_templates = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_2_3) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:95.0) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}",
"Mozilla/5.0 (Windows NT 10.0) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{chrome_version} Safari/{safari_version}"
]
def generate_user_agent():
# 随机选择一个Chrome的UA模板
selected_template = random.choice(ua_templates)
# 随机生成 WebKit、Safari 和 Chrome 版本号
webkit_version = f"{random.randint(500, 599)}.{random.randint(0, 99)}"
safari_version = f"{random.randint(500, 599)}.{random.randint(0, 99)}"
chrome_version = f"{random.randint(40, 99)}.{random.randint(0, 2999)}.{random.randint(0, 99)}"
# 填充模板中的动态部分并生成完整的UA
user_agent = selected_template.format(
webkit_version=webkit_version,
safari_version=safari_version,
chrome_version=chrome_version
)
return user_agent
def random_sleep(tab, min_seconds=0, max_seconds=2):
"""在 min_seconds 和 max_seconds 之间随机停顿"""
tab.wait(random.uniform(min_seconds, max_seconds))
def get_free_port():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('localhost', 0)) # 使用端口0让操作系统自动选择一个未占用的端口
port = s.getsockname()[1]
s.close()
return port
# 设置代理ip
def create_proxy_auth_extension(proxy_host, proxy_port, proxy_username, proxy_password, scheme='PROXY', proxy_domains=None):
# Define a root directory
root_plugin_dir = os.path.join(os.getcwd(), 'proxy_auth_plugin')
if not os.path.exists(root_plugin_dir):
os.makedirs(root_plugin_dir) # Create root directory
# Use a unique identifier to generate a separate plugin directory
plugin_path = os.path.join(root_plugin_dir, f'plugin_{uuid.uuid4().hex}')
os.makedirs(plugin_path) # Create plugin directory
# Contents of manifest.json for the plugin
manifest_json = """
{
"version": "1.0.0",
"manifest_version": 2,
"name": "Proxy Auth Extension",
"permissions": [
"proxy",
"tabs",
"unlimitedStorage",
"storage",
"<all_urls>",
"webRequest",
"webRequestBlocking"
],
"background": {
"scripts": ["background.js"]
},
"minimum_chrome_version":"22.0.0"
}
"""
# Ensure proxy_domains is a list
if proxy_domains is None:
proxy_domains = []
# Generate domains_list for PAC script
# Since we can only match on hostnames, we use the domain names
domains_list = ', '.join('"{}"'.format(domain) for domain in proxy_domains)
# Generate PAC script
pac_script = string.Template(
'''
function FindProxyForURL(url, host) {
var proxy = "${scheme} ${proxy_host}:${proxy_port}";
var direct = "DIRECT";
var domains = [${domains_list}];
for (var i = 0; i < domains.length; i++) {
if (dnsDomainIs(host, domains[i])) {
return proxy;
}
}
return direct;
}
'''
).substitute(
scheme=scheme.upper(),
proxy_host=proxy_host,
proxy_port=proxy_port,
domains_list=domains_list
)
# Since we cannot inspect the full URL in onAuthRequired, we'll listen to all URLs
# But we'll check if the proxy is challenging for authentication
# We can do this by checking the 'isProxy' flag in the details
# However, 'isProxy' is only available in certain contexts
# Alternatively, we can restrict the listener to the proxy server
# Contents of background.js, using string templates to insert dynamic configuration
background_js = string.Template(
'''
var config = {
mode: "pac_script",
pacScript: {
data: $pac_script
}
};
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
function callbackFn(details) {
if (details.challenger && details.challenger.host === "${proxy_host}" && details.challenger.port == ${proxy_port}) {
return {
authCredentials: {
username: "${username}",
password: "${password}"
}
};
}
}
chrome.webRequest.onAuthRequired.addListener(
callbackFn,
{urls: ["<all_urls>"]},
['blocking']
);
'''
).substitute(
pac_script=json.dumps(pac_script),
username=proxy_username,
password=proxy_password,
proxy_host=proxy_host,
proxy_port=proxy_port
)
# Write manifest.json and background.js to the plugin directory
with open(os.path.join(plugin_path, 'manifest.json'), 'w') as f:
f.write(manifest_json)
with open(os.path.join(plugin_path, 'background.js'), 'w') as f:
f.write(background_js)
# Return the unique plugin path
return plugin_path
def delete_folder(file_path):
"""
删除指定的代理插件文件夹及其内容
:param plugin_path: 代理插件文件夹路径
"""
if os.path.exists(file_path):
shutil.rmtree(file_path)
# 获取窗口标题
def get_window_title(hwnd):
length = user32.GetWindowTextLengthW(hwnd)
if length > 0:
buffer = ctypes.create_unicode_buffer(length + 1)
user32.GetWindowTextW(hwnd, buffer, length + 1)
return buffer.value
return ""
# 枚举所有窗口
def enum_windows():
windows = []
def callback(hwnd, lparam):
title = get_window_title(hwnd)
if title and "Google Chrome" in title:
windows.append((hwnd, title))
return True
EnumWindowsProc = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, ctypes.c_void_p)
user32.EnumWindows(EnumWindowsProc(callback), 0)
return windows
def find_gmail_window(windows, target_title):
for hwnd, title in windows:
if target_title in title: # 判断目标标题是否包含在窗口标题中
return hwnd
return None
# 模拟鼠标事件
def simulate_mouse(hwnd, x, y):
lParam = (y << 16) | x
user32.PostMessageW(hwnd, WM_MOUSEMOVE, 0, lParam)
user32.PostMessageW(hwnd, WM_LBUTTONDOWN, MK_LBUTTON, lParam)
user32.PostMessageW(hwnd, WM_LBUTTONUP, 0, lParam)
def click_the_login_button(tab, login_a):
# 获取href属性
href = login_a.attr('href')
# 访问href属性指向的页面
tab.get(href)
def input_email(tab, hwnd, email, email_input):
email_input.clear()
"""
输入账号并点击“下一步”按钮
:param tab: 浏览器对象
:param email: 用户的邮箱账号
"""
# 定位邮箱输入框
# 模拟人类输入,每次输入一个字符,并随机延迟
for char in email:
email_input.input(char) # 输入一个字符
tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒
tab.wait(1)
next_input = tab.ele('@type=button', timeout=15, index=3)
# 将按钮拖拽到页面顶部位置
next_input.drag_to((0, 0), 0)
next_input.set.style('position', 'absolute')
next_input.set.style('top', '0')
next_input.set.style('left', '0')
next_input.set.style('width', '1000px')
next_input.set.style('height', '1000px')
tab.wait(2)
simulate_mouse(hwnd, 600, 600)
tab.wait.load_start()
flag = tab.wait.ele_deleted(next_input)
if not flag:
next_input.set.style('width', '10px')
next_input.set.style('height', '10px')
# 点击重试按钮
def recover(tab, recover_a):
recover_a.click(by_js=True)
def input_password(tab, hwnd, password, password_input):
password_input.clear()
"""
输入密码并点击“下一步”按钮
:param tab: 浏览器对象
:param password: 用户的密码
"""
# 模拟人类输入密码,每次输入一个字符,并随机延迟
for char in password:
password_input.input(char) # 输入一个字符
tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒
random_sleep(tab)
next_input = tab.ele('@type=button', timeout=15, index=2)
# 将按钮拖拽到页面顶部位置
next_input.drag_to((0, 0), 0)
next_input.set.style('position', 'absolute')
next_input.set.style('top', '0')
next_input.set.style('left', '0')
next_input.set.style('width', '1000px')
next_input.set.style('height', '1000px')
tab.wait(2)
simulate_mouse(hwnd, 700, 700)
tab.wait.load_start()
flag = tab.wait.ele_deleted(next_input, timeout=10)
if not flag:
next_input.set.style('width', '10px')
next_input.set.style('height', '10px')
def click_alternate_email_verification_button(tab, email_div):
email_div.click(by_js=True)
tab.wait.load_start()
def input_recovery_code(tab, recovery_code):
"""
输入辅助邮箱的验证码并点击“下一步”按钮
:param tab: 浏览器对象
:param recovery_code: 辅助邮箱的验证码
"""
# 第一步:通过 class 定位到辅助邮箱的输入框
recovery_input = tab.ele('@aria-label=输入辅助邮箱地址', timeout=15)
for char in recovery_code:
recovery_input.input(char) # 输入一个字符
tab.wait(random.uniform(0.1, 0.3)) # 随机延迟
next_input = tab.ele('@data-idom-class=nCP5yc AjY5Oe DuMIQc LQeN7 BqKGqe Jskylb TrZEUc lw1w4b', timeout=15)
next_input.click(by_js=True)
tab.wait.load_start()
tab.wait(1)
retry = tab.ele("@text()=您输入的电子邮件地址不正确,请重试。",timeout=15)
if retry:
return True
# 找到修改邮箱的按钮
def click_use_other_account_button2(tab, next_span):
next_span.click(by_js=True)
tab.wait.load_start()
# 修改辅助邮箱账号
def modify_the_secondary_email1(tab, auxiliary_email_account):
button = tab.ele('@@tag()=i@@text()=edit', timeout=15)
button.click(by_js=True)
tab.wait(2)
input = tab.ele('@type=email', timeout=15)
input.clear()
for char in auxiliary_email_account:
input.input(char) # Enter one character at a time
tab.wait(random.uniform(0.1, 0.3)) # Random delay between 0.1 and 0.3 seconds
# 点击保存
save_span = tab.ele('@text()=Save', timeout=15)
save_span.click(by_js=True)
button = tab.ele('@text()=Not now',timeout=15)
button.click(by_js=True)
tab.wait.load_start()
def click_continue_button(tab, continue_div1):
# 点击出现的弹窗
tab.handle_alert(accept=True)
tab.wait(1)
continue_div1.click(by_js=True)
tab.wait(1)
next_button = tab.ele('@@tag()=button@@name=data_consent_dialog_next', timeout=15)
next_button.click(by_js=True)
tab.wait(1)
continue_div2 = tab.ele('@text()=Personalize other Google products with my Gmail, Chat, and Meet data', timeout=15)
continue_div2.click(by_js=True)
tab.wait(1)
Done_button = tab.ele('@name=data_consent_dialog_done', timeout=15)
Done_button.click(by_js=True)
tab.wait(15)
Reload = tab.ele('@text()=Reload', timeout=15)
if Reload:
Reload.click(by_js=True)
def find_and_visit_href2(tab):
password_a = tab.ele('@aria-label=Password', timeout=15)
print(password_a)
href = password_a.attr('href')
tab.get(href)
def input_password_new(tab, password):
"""
输入密码并点击“下一步”按钮
:param tab: 浏览器对象
:param password: 用户的密码
"""
password_input = tab.ele('@@tag()=input@@name=password')
# 模拟人类输入密码,每次输入一个字符,并随机延迟
for char in password:
password_input.input(char) # 输入一个字符
tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒
confirm_new_password_input = tab.ele("@@tag()=input@@name=confirmation_password")
# 模拟人类输入密码,每次输入一个字符,并随机延迟
for char in password:
confirm_new_password_input.input(char) # 输入一个字符
tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒
# 点击修改按钮
def click_span_with_class(tab):
# 找到 class 为 UywwFc vQzf8d 的元素
span_element = tab.ele('@text()=Change password', timeout=15)
# 用 JavaScript 点击该元素
span_element.click(by_js=True)
# 返回上个页面
tab.back()
# 修改辅助邮箱账号2
def modify_the_secondary_email2(tab, new_recovery_email, new_password, hwnd):
try:
# 定位恢复邮箱的元素
recovery_email = tab.ele("@aria-label=Recovery email", timeout=15)
if not recovery_email:
raise Exception("未找到恢复邮箱元素")
# 获取恢复邮箱的链接并导航
href = recovery_email.attr('href')
if not href:
raise Exception("恢复邮箱链接未找到")
tab.get(href)
password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15)
if password_input:
input_password(tab, hwnd, new_password, password_input)
tab.wait.load_start() # 等待页面加载开始
# 定位邮箱输入框并清空内容
email_input = tab.ele("@type=email", timeout=15)
# 查看辅助邮箱账号是否已经被修改了
if email_input.value == new_recovery_email:
return True
if not email_input:
raise Exception("未找到邮箱输入框")
email_input.clear()
for char in new_recovery_email:
email_input.input(char) # 输入一个字符
tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒
# 定位并点击验证按钮
verify_button = tab.ele("@text()=Verify", timeout=15)
print(f"验证按钮已经找到:{verify_button}")
if not verify_button:
raise Exception("未找到验证按钮")
verify_button.click(by_js=True)
return False # 成功返回 True
except Exception as e:
print(f"更新恢复邮箱时发生错误: {e}")
return False # 出现错误返回 False
# 定义保存到本地文件的函数
def save_to_file(entry, filename="code.txt"):
"""将条目保存到本地文件,避免重复"""
# 如果文件不存在,先创建空文件
if not os.path.exists(filename):
with open(filename, "w", encoding='utf-8') as file:
pass # 创建文件
# 读取文件内容,检查是否存在相同条目
with open(filename, "r", encoding='utf-8') as file:
existing_entries = file.readlines()
# 构造条目字符串
entry_str = f"验证码: {entry['验证码']}, 发送的邮箱账号: {entry['发送的邮箱账号']}, 收到的时间: {entry['收到的时间']}\n"
if entry_str not in existing_entries: # 如果条目不在文件中
# 以追加模式写入文件
with open(filename, "a", encoding='utf-8') as file:
file.write(entry_str)
print("--------------------------------------------------")
print(f"新条目已保存: {entry_str.strip()}")
def clean(text):
"""清理邮件主题或发件人内容中的特殊字符"""
return "".join(c if c.isalnum() else "_" for c in text)
# 计算每次运行后的总邮件数量
def update_env_variable(key, value):
load_dotenv()
env_vars = {}
# 读取现有的 .env 文件内容
if os.path.exists(".env"):
with open(".env", "r") as f:
lines = f.readlines()
for line in lines:
if "=" in line:
k, v = line.strip().split("=", 1)
env_vars[k] = v
# 更新或新增变量
env_vars[key] = str(value)
# 写回到 .env 文件
with open(".env", "w") as f:
for k, v in env_vars.items():
f.write(f"{k}={v}\n")
# 获取辅助邮箱验证码
def check_emails():
"""检查邮件并提取验证码和邮箱账号"""
try:
# 连接到IMAP服务器
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD)
# 选择邮箱文件夹(通常是 'INBOX'
mail.select("inbox")
# 搜索邮件,搜索条件可以是 'FROM' 限定谷歌发件人
status, messages = mail.search(None, 'FROM "google.com"') # 搜索来自谷歌的邮件
mail_ids = messages[0].split()
total_mails = len(mail_ids)
print(f"总邮件数量: {len(mail_ids)}")
# 获取环境变量
total_mails = os.getenv('TOTAL_MAILS') # 获取总邮件数量
loop_count = os.getenv('LOOP_COUNT') # 获取循环运行的次数
# 打印输出变量
print(f"总邮件数量 (TOTAL_MAILS): {total_mails}")
print(f"循环运行的次数 (LOOP_COUNT): {loop_count}")
update_env_variable("TOTAL_MAILS", total_mails) # 更新到 .env 文件中
for i in mail_ids[-30:]: # 仅获取最近10封邮件
# 获取邮件内容
res, msg = mail.fetch(i, "(RFC822)")
for response in msg:
if isinstance(response, tuple):
# 解析邮件内容
msg = email.message_from_bytes(response[1])
# 获取邮件主题
subject, encoding = decode_header(msg["Subject"])[0]
if isinstance(subject, bytes):
# 如果是字节,需要解码
subject = subject.decode(encoding if encoding else "utf-8")
# 获取发件人
from_ = msg.get("From")
# 创建存储结果的字典列表
results = []
# 如果邮件有正文部分
if msg.is_multipart():
for part in msg.walk():
# 如果是文本/HTML内容
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode()
# 提取验证码和邮箱账号
match = re.search(r'(\d{6})[\s\S]*?([a-zA-Z0-9._%+-]+@gmail\.com)', body)
if match:
code = match.group(1)
gmail_account = match.group(2)
# 获取邮件接收时间
date_tuple = email.utils.parsedate_tz(msg["Date"])
received_time = datetime.fromtimestamp(
email.utils.mktime_tz(date_tuple)) if date_tuple else None
# 添加到字典之前检查是否重复
entry = {
"验证码": code,
"发送的邮箱账号": gmail_account,
"收到的时间": received_time.strftime(
"%Y-%m-%d %H:%M:%S") if received_time else "未知"
}
print(entry)
# 无需检查是否在字典中,直接保存到文件
save_to_file(entry) # 调用保存函数
else:
# 如果邮件不是多部分(简单邮件)
body = msg.get_payload(decode=True).decode()
# 提取验证码和邮箱账号
match = re.search(r'(\d{6})[\s\S]*?([a-zA-Z0-9._%+-]+@gmail\.com)', body)
if match:
code = match.group(1)
gmail_account = match.group(2)
# 获取邮件接收时间
date_tuple = email.utils.parsedate_tz(msg["Date"])
received_time = datetime.fromtimestamp(
email.utils.mktime_tz(date_tuple)) if date_tuple else None
# 添加到字典之前检查是否重复
entry = {
"验证码": code,
"发送的邮箱账号": gmail_account,
"收到的时间": received_time.strftime(
"%Y-%m-%d %H:%M:%S") if received_time else "未知"
}
print(entry)
# 无需检查是否在字典中,直接保存到文件
save_to_file(entry) # 调用保存函数
# 关闭连接
mail.logout()
except Exception as e:
print(f"发生错误: {e}")
def get_verification_codes(email_account, sent_time):
"""
根据邮箱和发送时间筛选验证码。
:param email: 发送验证码的邮箱账号
:param sent_time: 验证码发送的时间格式YYYY-MM-DD HH:MM:SS
:return: 符合条件的验证码列表
"""
sent_time = datetime.strptime(sent_time, "%Y-%m-%d %H:%M:%S")
latest_code = None
latest_time = None
try:
# 打开文件并逐行读取
with open("code.txt", "r", encoding="utf-8") as file:
for line in file:
# 检查当前行是否包含指定邮箱
if f"发送的邮箱账号: {email_account}".lower() in line:
# 提取时间和验证码
time_str = line.split("收到的时间:")[-1].strip()
code = line.split("验证码:")[1].split(",")[0].strip()
# 将时间字符串转换为 datetime 对象
received_time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
# 筛选条件:发送时间 <= 收到的时间
if sent_time <= received_time + timedelta(seconds=10):
# 更新为最新的验证码
if latest_time is None or received_time > latest_time:
latest_time = received_time
latest_code = code
except FileNotFoundError:
print("错误: 找不到 code.txt 文件。")
except Exception as e:
print(f"错误: {e}")
return latest_code
# 计算循环次数的函数
def manage_loop_count():
# 加载环境变量
load_dotenv()
# 获取当前 LOOP_COUNT 值,如果不存在则默认为 0
count = int(os.getenv("LOOP_COUNT", 0))
# 增加计数器
count += 1
# 更新 .env 文件中的 LOOP_COUNT
set_key(".env", "LOOP_COUNT", str(count))
# 返回更新后的计数
return count
# 用来标记账号是否完成
def update_excel_status(file_path, sheet_name, row_index, status):
"""
更新 Excel 中指定行的 F 列为给定状态。
参数:
file_path (str): Excel 文件路径。
sheet_name (str): 要更新的工作表名称。
row_index (int): 要更新的行号(从 1 开始)。
status (str): 要写入 F 列的状态。
"""
wb = load_workbook(file_path)
# 检查工作表是否存在
if sheet_name not in wb.sheetnames:
raise ValueError(f"Sheet '{sheet_name}' not found in the workbook.")
# 选择动态传入的工作表
sheet = wb[sheet_name]
sheet.cell(row=row_index, column=6, value=status) # F 列是第 6 列
wb.save(file_path)
print(f"{row_index}行已经更改状态为:{status}")
# 保存日志到指定文件
def save_log(random_number, log_content):
logs_dir = "logs"
# 创建文件名,使用传入的 random_number
log_file_path = os.path.join(logs_dir, f"{random_number}.txt")
# 将日志内容写入文件
with open(log_file_path, 'a', encoding='utf-8') as f:
f.write(log_content)
f.write("\n")
def retry_with_recovery(tab, hwnd, email_account, MAX_RETRIES=4, wait_time=15):
"""
封装重试逻辑,最多尝试 `MAX_RETRIES` 次,检查是否存在重试按钮,并重新输入邮箱地址。
:param tab: 页面操作对象,假设具备 `.ele` 方法用于获取元素,`.wait.load_start()` 方法用于等待加载
:param recover: 处理重试按钮点击的函数
:param email_account: 需要输入的邮箱账号
:param MAX_RETRIES: 最大重试次数默认为4次
:param wait_time: 等待时间默认为15秒
"""
attempt = 0 # 初始化尝试次数
while attempt < MAX_RETRIES:
recover_a = tab.ele("@aria-label=重试", timeout=wait_time) # 检查是否有 '重试' 按钮
print(f"Attempt {attempt + 1}: {recover_a}")
tab.wait.load_start() # 等待加载开始
if recover_a:
recover(tab, recover_a) # 调用 recover 函数处理重试按钮
email_input = tab.ele('@aria-label=电子邮件地址或电话号码', timeout=wait_time) # 找到邮箱输入框
input_email(tab, hwnd, email_account, email_input) # 重新输入邮箱账号
random_sleep(tab) # 随机等待,避免被检测到自动化
attempt += 1 # 增加尝试次数
else:
break # 如果没有 '重试' 按钮,则跳出循环
# 如果达到最大尝试次数,跳出循环
if attempt == MAX_RETRIES:
print("Reached maximum retries")
break
def get_absolute_path(relative_path):
"""获取绝对路径"""
# 获取当前执行文件的路径(脚本或.exe
if getattr(sys, 'frozen', False): # 检测是否为打包后的exe
base_path = sys._MEIPASS # PyInstaller打包后资源文件临时路径
exe_path = os.path.dirname(sys.executable)
else:
base_path = os.path.abspath(".")
exe_path = os.path.dirname(os.path.abspath(__file__))
# 如果需要拼接在exe路径上
absolute_path = os.path.join(exe_path, relative_path)
return absolute_path
# 判断验证码是否出现报错
js_code = """
function isElementVisible(element) {
if (!(element instanceof Element)) {
throw new Error("参数必须是一个 DOM 元素");
}
if (!document.body.contains(element)) {
return false;
}
const style = window.getComputedStyle(element);
if (style.display === "none" || style.visibility === "hidden" || style.opacity === "0") {
return false;
}
const rect = element.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) {
return false;
}
const { top, right, bottom, left } = rect;
const viewportWidth = window.innerWidth || document.documentElement.clientWidth;
const viewportHeight = window.innerHeight || document.documentElement.clientHeight;
if (bottom < 0 || top > viewportHeight || right < 0 || left > viewportWidth) {
return false;
}
const centerX = (left + right) / 2;
const centerY = (top + bottom) / 2;
const elementAtCenter = document.elementFromPoint(centerX, centerY);
if (elementAtCenter && !element.contains(elementAtCenter) && !elementAtCenter.contains(element)) {
return false;
}
return true;
}
var element = document.querySelector("#c4");
return isElementVisible(element);
"""
def logutGoogle(tab) -> bool:
tab.get('https://accounts.google.com/Logout?hl=zh-CN')
if tab.ele('@@tag()=input@@type=email', timeout=0) != None: # 已注销就会出登录页元素
print("当前浏览器已退出,无需再次退出")
return True
# 还有没注销移除的账号就会显示已退出和账号元素
try:
tab.ele('@text()=移除账号', timeout=0).click()
tab.wait(2).ele('@text()=已退出', timeout=0).parent().parent().next().click() # 移除图标
tab.wait(3).ele('@@text()=移除@@tag()=button').click() # 移除确认框
return (tab.ele('@@tag()=input@@type=email', timeout=8) != None) # 检查最后界面是否是登录
except Exception as e:
print(f'发生错误:{e}')
return False
return False # 无用分支,不过为了避免函数太长回头修改时候忘记,还是写个 False 以防万一
def main(email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port, proxy_username, proxy_password,row_index, file_path, sheet_name):
global browser, plugin_path, user_dir
# 生成一个 6 位的随机数
global random_number
try:
random_number = str(random.randint(100000000, 999999999))
"""
主函数,用于自动登录谷歌邮箱并修改账户和密码。
参数:
email_account (str): 邮箱账号
email_password (str): 邮箱密码
old_recovery_email (str): 旧的辅助邮箱账号
new_recovery_email (str): 新的辅助邮箱账号
new_password (str): 新密码
"""
if stop_event.is_set():
print("任务被强制停止,退出主函数")
return None # 直接返回 None跳过当前任务
lock = FileLock("proxy_auth_extension.lock")
with lock: # 加锁,确保其他进程在解锁前无法操作
plugin_path = create_proxy_auth_extension(
proxy_host=proxy_host,
proxy_port=proxy_port,
proxy_username=proxy_username,
proxy_password=proxy_password,
proxy_domains=[
"accounts.google.com",
"accounts.youtube.com",
"content-autofill.googleapis.com",
"myaccount.google.com",
"apis.google.com",
"www.google.com",
"ogs.google.com",
"jsonip.com"
]
)
save_log(random_number, f"已经创建好代理插件:{plugin_path}")
# 创建ChromiumOptions对象并配置
options = ChromiumOptions()
options.add_extension(plugin_path)
# 使用随机生成的User-Agent
random_user_agent = generate_user_agent()
options.set_user_agent(user_agent=random_user_agent)
# 随机分配一个端口号
random_port = get_free_port()
print(f"现在占用的端口号是:{random_port}")
save_log(random_number, f"现在占用的端口号是:{random_port}")
options.set_paths(local_port=random_port)
# 示例使用
browser_path = get_absolute_path(r"chrome\chrome.exe")
options.set_paths(f"{browser_path}")
options.no_imgs(True).mute(True)
cache_path = os.path.join(os.getcwd(), 'cache')
temp_path = os.path.join(os.getcwd(), 'tempfile')
user_dir = os.path.join(os.getcwd(), 'userdata', f'{uuid.uuid4().hex}')
options.set_cache_path(cache_path)
options.set_tmp_path(temp_path)
options.set_user_data_path(user_dir)
print(cache_path)
print(temp_path)
print(user_dir)
browser = ChromiumPage(options)
# 清除缓存和cookies
browser.clear_cache(cookies=True)
# browser = Chromium(29581)
# 获取标签页对象
tab = browser.latest_tab
save_log(random_number, "已获取谷歌的最后一个页面")
# 访问谷歌邮箱页面
flag = tab.get('https://workspace.google.com/intl/zh-CN/gmail/')
save_log(random_number, f"访问谷歌邮箱页面的状态:{flag}")
if not flag:
browser.quit()
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host,
proxy_port,
proxy_username, proxy_password, row_index)
save_log(random_number, "已经访问谷歌页面")
# 执行JavaScript清除localStorage
tab.run_js("window.localStorage.clear();")
# 进入登录流程
login_a = tab.ele('@aria-label=登录 Gmail', timeout=15)
if login_a:
click_the_login_button(tab, login_a)
random_sleep(tab)
tab.wait(3)
# 将随机数设置为窗口标题
script = f"document.title = '{random_number}';"
# 执行 JS 脚本
tab.run_js(script)
actual_title = tab.run_js("return document.title;")
print("登录运行完毕")
save_log(random_number, "已经进入登录界面")
tab.wait(7)
save_log(random_number, f"设置标题为: {random_number}, 实际标题为: {actual_title}\n")
# 获取所有 Chrome 窗口
chrome_windows = enum_windows()
save_log(random_number, f"当前谷歌浏览器的所有窗口为:{chrome_windows}")
hwnd = None
for win in chrome_windows:
if random_number in win[1]: # 假设 `win['title']` 是窗口标题
hwnd = win[0]
print(f"窗口句柄:{win[0]} 窗口标题:{win[1]}")
save_log(random_number, f"获取的窗口的句柄:{win[0]} 和标题:{win[1]}")
break
if hwnd is None:
print("无法找到窗口句柄")
save_log(random_number, "无法找到窗口句柄")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host,
proxy_port,
proxy_username, proxy_password, row_index)
try:
# 输入邮箱账号
email_input = tab.ele('@aria-label=电子邮件地址或电话号码', timeout=15)
if email_input:
input_email(tab, hwnd, email_account, email_input) # 使用传入的邮箱账号
random_sleep(tab)
except ElementNotFoundError as e:
print(f"找不到输入邮箱账号的元素{e}")
save_log(random_number, f"找不到输入邮箱账号的元素:{e}")
update_excel_status(file_path, sheet_name,row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port,
proxy_username, proxy_password, row_index)
print("输入邮箱账号运行完毕")
save_log(random_number, "输入邮箱账号运行完毕")
wrong = tab.ele('@text()=出了点问题', timeout=15)
if wrong:
next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15)
next_span.click(by_js=True)
tab.wait.load_start()
try:
# 输入邮箱账号
email_input = tab.ele('@aria-label=电子邮件地址或电话号码', timeout=15)
if email_input:
input_email(tab, hwnd, email_account, email_input) # 使用传入的邮箱账号
random_sleep(tab)
except ElementNotFoundError as e:
print(f"找不到输入邮箱账号的元素{e}")
save_log(random_number, f"找不到输入邮箱账号的元素:{e}")
update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host,
proxy_port,
proxy_username, proxy_password, row_index)
retry_with_recovery(tab, hwnd, email_account)
print("检查是否出现了wrong运行完毕")
save_log(random_number, "检查是否出现了wrong运行完毕")
# 看下是否出现了手机号
telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15)
if telephone:
update_excel_status(file_path, sheet_name,row_index, '接码')
browser.quit()
return
print("检查出现手机号运行完毕")
save_log(random_number, "检查出现手机号运行完毕")
try:
# 输入密码
password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15)
if password_input:
input_password(tab, hwnd, email_password, password_input) # 使用传入的邮箱密码
random_sleep(tab)
except ElementNotFoundError as e:
print(f"找不到密码输入框的元素:{e}")
save_log(random_number, f"找不到密码输入框的元素:{e}")
update_excel_status(file_path, sheet_name,row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port,
proxy_username, proxy_password, row_index)
print("输入密码运行完毕")
save_log(random_number, "输入密码运行完毕")
wrong = tab.ele("@text()=抱歉,出了点问题。请重试。", timeout=15)
save_log(random_number, f"谷歌验证按钮是否出现:{wrong}")
if wrong:
next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15)
next_span.click(by_js=True)
tab.wait.load_start()
try:
# 输入邮箱账号
email_input = tab.ele('@aria-label=电子邮件地址或电话号码', timeout=15)
if email_input:
input_email(tab, hwnd, email_account, email_input) # 使用传入的邮箱账号
random_sleep(tab)
except ElementNotFoundError as e:
print(f"找不到输入邮箱账号的元素{e}")
save_log(random_number, f"找不到输入邮箱账号的元素:{e}")
update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host,
proxy_port,
proxy_username, proxy_password, row_index)
try:
# 输入密码
password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15)
if password_input:
input_password(tab, hwnd, email_password, password_input) # 使用传入的邮箱密码
random_sleep(tab)
except ElementNotFoundError as e:
print(f"找不到密码输入框的元素:{e}")
save_log(random_number, f"找不到密码输入框的元素:{e}")
update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host,
proxy_port,
proxy_username, proxy_password, row_index)
print("如果出现错误,输入密码运行完毕")
save_log(random_number, "如果出现错误后,输入密码运行完毕")
# 看下是否出现了手机号
telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15)
if telephone:
update_excel_status(file_path, sheet_name, row_index, '接码')
browser.quit()
return
print("检查手机号2运行完毕")
save_log(random_number, "检查手机号2运行完毕")
# 确定密码是否被修改的开关
password_change = False
try:
# 查看密码是否更改
password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15)
if password_input:
input_password(tab, hwnd, new_password, password_input)
tab.wait(7)
password_input = tab.ele('@@aria-label=输入您的密码@@type=password', timeout=15)
alternate_email_button = tab.ele('@text()=确认您的辅助邮箱', timeout=15)
if password_input and not alternate_email_button:
update_excel_status(file_path, sheet_name, row_index, '被盗')
return
password_change = True
print("密码已经被更改过")
save_log(random_number, "密码已经被更改过")
except ElementNotFoundError as e:
print(f"找不到密码输入框的元素{e}")
save_log(random_number, f"找不到密码输入框的元素:{e}")
update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port,
proxy_username, proxy_password, row_index)
print("输入新密码运行完毕")
save_log(random_number, "输入新密码运行完毕")
try:
# 使用辅助邮箱验证
alternate_email_button = tab.ele('@text()=确认您的辅助邮箱', timeout=15)
if alternate_email_button:
click_alternate_email_verification_button(tab, alternate_email_button)
random_sleep(tab)
except ElementNotFoundError as e:
print(f"找不到输入辅助邮箱的元素:{e}" )
save_log(random_number, f"找不到输入辅助邮箱的元素:{e}")
update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port,
proxy_username, proxy_password, row_index)
print("点击辅助邮箱验证运行完毕")
save_log(random_number, "点击辅助邮箱验证运行完毕")
# 确定辅助邮箱账号是否被更改
auxiliary_email_account_change = False
# 输入辅助邮箱账号
recovery_input = tab.ele('@aria-label=输入辅助邮箱地址', timeout=15)
if recovery_input:
auxiliary_email_account_change = input_recovery_code(tab, old_recovery_email) # 使用传入的旧辅助邮箱
tab.wait(5)
# 如果没有被修改则进行修改
if not auxiliary_email_account_change:
try:
next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15)
if next_span:
click_use_other_account_button2(tab, next_span)
tab.wait(3)
modify_the_secondary_email1(tab, new_recovery_email) # 使用传入的新辅助邮箱
auxiliary_email_account_change = True
except ElementNotFoundError as e:
# 捕获并打印异常,不中断后续代码执行
print(f"修改辅助邮箱的时候找不到元素: {e}")
save_log(random_number, f"修改辅助邮箱的时候找不到元素: {e}")
update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host,
proxy_port, proxy_username, proxy_password, row_index)
else:
print("辅助邮箱账号已经被更改过")
save_log(random_number, "辅助邮箱账号已经被更改过")
recovery_input.clear()
input_recovery_code(tab, new_recovery_email)
print("输入辅助邮箱运行完毕")
save_log(random_number, "输入辅助邮箱运行完毕")
if password_change and auxiliary_email_account_change:
update_excel_status(file_path, sheet_name, row_index, '已更改')
logutGoogle(tab)
browser.quit()
return
tab.handle_alert(accept=True)
# 点击继续按钮
continue_div1 = tab.ele('@text()=Continue with smart features', timeout=15)
if continue_div1:
try:
click_continue_button(tab, continue_div1)
except ElementNotFoundError as e:
print(f"找不到继续按钮的元素:{e}")
save_log(random_number, f"找不到继续按钮的元素:{e}")
update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host,
proxy_port,
proxy_username, proxy_password, row_index)
try:
# 点击头像进入邮箱的安全设置
tab.handle_alert(accept=True)
tab.get("https://ogs.google.com/u/0/widget/account?cn=account")
tab.handle_alert(accept=True)
tab.get("https://myaccount.google.com/security?gar=WzEyMF0")
tab.handle_alert(accept=True)
except ElementNotFoundError as e:
print(f"找不到头像的元素:{e}")
save_log(random_number, f"找不到继续按钮的元素:{e}")
update_excel_status(file_path,sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port,
proxy_username, proxy_password, row_index)
tab.wait(3)
try:
# 如果密码没被修改
if not password_change:
# 找到修改密码的按钮
find_and_visit_href2(tab)
# 修改密码
input_password_new(tab, new_password) # 使用传入的新密码
# 点击确认修改
click_span_with_class(tab)
except ElementNotFoundError as e:
print(f"找不到修改密码的元素:{e}")
save_log(random_number, f"找不到修改密码的元素:{e}")
update_excel_status(file_path,sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port,
proxy_username, proxy_password, row_index)
tab.wait(1)
try:
if auxiliary_email_account_change:
update_excel_status(file_path, sheet_name, row_index, '已更改')
return
# 修改辅助邮箱2
flag = modify_the_secondary_email2(tab, new_recovery_email, new_password, hwnd)
save_log(random_number, f"辅助邮箱是否被修改:{flag}")
if flag:
browser.quit()
update_excel_status(file_path, sheet_name, row_index, '已更改')
return
else:
sent_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(sent_time)
save_log(random_number, sent_time)
# 清空.env文件内容
with open(".env", "w") as f:
pass
count = 0
while True:
print("检查新邮件...")
save_log(random_number, "检查新邮件...")
check_emails()
code = get_verification_codes(email_account, sent_time)
if code:
verification_input = tab.eles("@type=text", timeout=15)
for char in code:
if len(verification_input) > 1:
verification_input[1].input(char)
tab.wait(random.uniform(0.1, 0.3)) # 随机延迟 0.1 到 0.3 秒
verify_button = tab.eles("@text()=Verify", timeout=15)
print(verify_button)
verify_button[1].click(by_js=True)
tab.wait(5)
is_visible = tab.run_js(js_code)
if is_visible:
sent_code_button = tab.ele('@text():Send a new code.', timeout=15)
print(f"重新发送邮件的按钮为:{sent_code_button}")
save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}")
sent_code_button.click(by_js=True)
continue
else:
print("辅助邮箱账号已经更改完毕")
save_log(random_number, "辅助邮箱账号已经更改完毕")
update_excel_status(file_path, sheet_name, row_index, '已更改')
logutGoogle(tab)
return
count += 1
print(f"已运行 {count}")
save_log(random_number, f"已运行 {count}")
if count == 24:
sent_code_button = tab.ele('@text():Send a new code.', timeout=15)
print(f"重新发送邮件的按钮为:{sent_code_button}")
save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}")
sent_code_button.click(by_js=True)
elif count == 48:
sent_code_button = tab.ele('@text():Send a new code.', timeout=15)
print(f"重新发送邮件的按钮为:{sent_code_button}")
save_log(random_number, f"重新发送邮件的按钮为:{sent_code_button}")
sent_code_button.click(by_js=True)
elif count == 60:
print("验证码超过四分钟没接收到,自动退出")
save_log(random_number, "验证码超过四分钟没接收到,自动退出")
update_excel_status(file_path, sheet_name, row_index, '未更改(验证码没收到)')
return
tab.wait(5) # 每5秒循环一次
except ElementNotFoundError as e:
print(f"更改辅助邮箱账号的时候找不到元素:{e}")
save_log(random_number, f"更改辅助邮箱账号的时候找不到元素:{e}")
update_excel_status(file_path, sheet_name, row_index, "请求错误,请重试")
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host, proxy_port,
proxy_username, proxy_password, row_index)
except Exception as e:
print(f"出现未知错误:{e}")
save_log(random_number, f"出现未知错误:{e}")
update_excel_status(file_path, sheet_name, row_index, f'出现未知错误:{e}请重试')
return (
email_account, email_password, old_recovery_email, new_recovery_email, new_password, proxy_host,
proxy_port, proxy_username, proxy_password, row_index)
finally:
browser.quit()
time.sleep(5)
delete_folder(plugin_path)
delete_folder(user_dir)
def run_gui():
stop_flag = threading.Event() # 用于控制任务中止的标志位
exec_thread = None # 用于存储后台线程的引用
def handle_file_select():
file_path = filedialog.askopenfilename(
title="选择 Excel 文件",
filetypes=[("Excel 文件", "*.xlsx"), ("所有文件", "*.*")]
)
if file_path:
entry_file_path.delete(0, tk.END)
entry_file_path.insert(0, file_path)
def start_processing():
file_path = entry_file_path.get()
sheet_name = entry_sheet_name.get()
max_concurrency = entry_concurrency.get()
max_retries = entry_retries.get()
# 检查文件路径和 Sheet 名称是否填写完整
if not file_path or not sheet_name or not max_concurrency or not max_retries:
messagebox.showwarning("警告", "请填写完整的文件路径、表格名称、并发数和重试次数!")
return
# 检查文件路径是否存在
if not os.path.exists(file_path):
if not file_path.endswith(".xlsx"):
messagebox.showerror("错误", "文件的格式必须是.xlsx")
else:
messagebox.showerror("错误", "没有找到该路径的文件,请检查文件是否存在!")
return
# 检查文件是否正在被打开
try:
with open(file_path, 'r+b') as file:
pass # 成功打开说明未被占用
except PermissionError:
messagebox.showerror("错误", "文件正在被占用,请先保存并关闭 Excel 程序!")
return
# 验证 Sheet 名称是否存在
try:
wb = load_workbook(file_path, read_only=True)
if sheet_name not in wb.sheetnames:
messagebox.showerror("错误", f"{sheet_name}”表格名字不存在,请打开 Excel 文件检查!")
return
except Exception as e:
messagebox.showerror("错误", f"无法打开文件:{e}")
return
# 验证并发数
try:
max_concurrency = int(max_concurrency)
if max_concurrency <= 0:
raise ValueError("并发数必须是大于0的整数")
except ValueError as e:
messagebox.showerror("错误", f"无效的并发数:{e}")
return
# 验证重试次数
try:
max_retries = int(max_retries)
if max_retries < 0:
raise ValueError("重试次数必须是大于或等于0的整数")
except ValueError as e:
messagebox.showerror("错误", f"无效的重试次数:{e}")
return
# 初始化 Excel 数据和进度条状态
all_rows = read_excel_data_for_main(file_path, sheet_name) # 读取 Excel 数据
total_tasks = len(all_rows)
completed_count = 0
failed_count = 0
# 设置进度条和状态显示
progress_bar['maximum'] = total_tasks
progress_bar['value'] = completed_count # 初始化为0
lbl_progress_status.config(
text=f"完成:{completed_count}/{total_tasks},失败:{failed_count}"
)
stop_flag.clear() # 清除停止标志
exec_thread = threading.Thread(target=parallel_execution,
args=(file_path, sheet_name, max_concurrency, max_retries))
exec_thread.daemon = True # 设置为守护线程
exec_thread.start()
def stop_processing():
stop_flag.set() # 设置标志位为 True通知停止
messagebox.showinfo("提示", "等待当前任务完成,程序即将停止!")
def on_closing():
stop_flag.set() # 设置标志位为 True通知停止
if exec_thread and exec_thread.is_alive():
exec_thread.join(timeout=5) # 等待线程结束
root.destroy() # 销毁主窗口
sys.exit(0) # 强制退出程序
def parallel_execution(file_path, sheet_name, max_concurrency, max_retries):
progress_queue = Queue()
all_rows = read_excel_data_for_main(file_path, sheet_name)
failed_rows = []
total_rows = len(all_rows)
completed_count = 0
failed_count = 0
# 更新总条目数
progress_bar['maximum'] = total_rows
with Pool(processes=max_concurrency) as pool:
retry_count = 0
while all_rows or failed_rows:
if stop_flag.is_set():
print("停止信号已接收,中止任务!")
break
rows_to_process = all_rows + failed_rows
results = []
for i, row in enumerate(rows_to_process):
if stop_flag.is_set():
print("停止信号已接收,中止任务!")
break
# 传递锁给子进程
result = pool.apply_async(
main, args=(*row, file_path, sheet_name),
callback=lambda result: progress_queue.put(result)
)
results.append(result)
all_rows = []
failed_rows = []
for result in results:
try:
retry_row = result.get()
print(f"main函数返回的值: {retry_row}")
if retry_row: # 如果返回需要重试的数据
failed_rows.append(retry_row)
failed_count += 1
else:
completed_count += 1
progress_bar['value'] = completed_count # 更新进度条
lbl_progress_status.config(
text=f"完成:{completed_count}/{total_rows},失败:{failed_count}"
)
except Exception as e:
print(f"任务执行时发生错误: {e}")
failed_count += 1
lbl_progress_status.config(
text=f"完成:{completed_count}/{total_rows},失败:{failed_count}"
)
messagebox.showwarning("任务执行错误", f"任务执行时发生错误: {e}")
if stop_flag.is_set():
print("停止信号已接收,中止任务!")
break
if len(failed_rows) > 0 and retry_count < max_retries:
retry_count += 1
all_rows = failed_rows
failed_rows = []
elif retry_count >= max_retries:
print("达到最大重试次数,停止重试。")
break
else:
print("所有任务已完成。")
messagebox.showinfo('所有任务已经完成')
def read_excel_data_for_main(file_path, sheet_name):
df = pd.read_excel(file_path, sheet_name=sheet_name)
skip_keywords = {"已更改", "接码", "被盗"}
data = []
global total_tasks, completed_tasks, failed_tasks # 全局变量以更新状态
completed_tasks = sum(
df["是否更改完成"].isin(skip_keywords)
) # 统计已完成的
failed_tasks = len(df) - completed_tasks - df["是否更改完成"].isna().sum()
total_tasks = len(df) - completed_tasks
for i in range(len(df)):
if pd.isna(df.iloc[i, 0]):
break
if pd.notna(df.iloc[i, 6]) and df.iloc[i, 5] not in skip_keywords and ':' in df.iloc[i, 6]:
proxy_parts = df.iloc[i, 6].split(':')
if len(proxy_parts) == 4:
proxy_host, proxy_port, proxy_user, proxy_pass = proxy_parts
data.append((
df.iloc[i, 0], df.iloc[i, 1], df.iloc[i, 2],
df.iloc[i, 4], df.iloc[i, 3], proxy_host,
proxy_port, proxy_user, proxy_pass, i + 2
))
return data
root = tk.Tk()
root.title("程序控制界面")
tk.Label(root, text="Excel 文件路径:").grid(row=0, column=0, padx=10, pady=10)
entry_file_path = tk.Entry(root, width=50)
entry_file_path.grid(row=0, column=1, padx=10, pady=10)
btn_select_file = tk.Button(root, text="选择文件", command=handle_file_select)
btn_select_file.grid(row=0, column=2, padx=10, pady=10)
tk.Label(root, text="Sheet 名称:").grid(row=1, column=0, padx=10, pady=10)
entry_sheet_name = tk.Entry(root, width=50)
entry_sheet_name.grid(row=1, column=1, padx=10, pady=10)
entry_sheet_name.insert(0, "修改邮箱密码和辅助邮箱")
tk.Label(root, text="并发数:").grid(row=2, column=0, padx=10, pady=10)
entry_concurrency = tk.Entry(root, width=50)
entry_concurrency.grid(row=2, column=1, padx=10, pady=10)
entry_concurrency.insert(0, str(multiprocessing.cpu_count()))
tk.Label(root, text="重试次数:").grid(row=3, column=0, padx=10, pady=10)
entry_retries = tk.Entry(root, width=50)
entry_retries.grid(row=3, column=1, padx=10, pady=10)
entry_retries.insert(0, "3")
# 进度条
progress_bar = ttk.Progressbar(root, orient="horizontal", length=400, mode="determinate")
progress_bar.grid(row=4, column=0, columnspan=2, padx=10, pady=10)
# 进度状态显示
lbl_progress_status = tk.Label(root, text="完成0/0失败0")
lbl_progress_status.grid(row=5, column=0, columnspan=2, padx=10, pady=10)
btn_start = tk.Button(root, text="开始处理", command=start_processing)
btn_start.grid(row=6, column=0, columnspan=2, padx=20, pady=20, sticky="ew")
btn_stop = tk.Button(root, text="停止处理", command=stop_processing)
btn_stop.grid(row=7, column=0, columnspan=2, padx=20, pady=20, sticky="ew")
root.protocol("WM_DELETE_WINDOW", on_closing) # 绑定窗口关闭事件
root.mainloop()
# 示例运行方式
if __name__ == "__main__":
logs_dir = "logs"
# 确保 logs 文件夹存在,如果不存在则创建
if not os.path.exists(logs_dir):
os.makedirs(logs_dir)
# 清空 logs 文件夹中的所有文件
for filename in os.listdir("logs"):
file_path = os.path.join("logs", filename)
if os.path.isfile(file_path):
os.remove(file_path)
freeze_support()
run_gui()