上传文件至 /

最新的,并发一次能运行,但是没有邮箱没有断开重连
main
dghc2023 2024-11-27 05:06:31 +00:00
parent b4b6835010
commit a786dca470
2 changed files with 72 additions and 71 deletions

86
mail.py
View File

@ -20,7 +20,6 @@ def to_utf7_imap(text):
""" """
if not text: if not text:
return "" return ""
# 匹配所有非 ASCII 字符 # 匹配所有非 ASCII 字符
def encode_match(match): def encode_match(match):
return "&" + match.group(0).encode("utf-16be").hex().upper() + "-" return "&" + match.group(0).encode("utf-16be").hex().upper() + "-"
@ -28,7 +27,6 @@ def to_utf7_imap(text):
# 替换非 ASCII 字符为 UTF-7 格式 # 替换非 ASCII 字符为 UTF-7 格式
return re.sub(r"[^\x20-\x7E]", encode_match, text) return re.sub(r"[^\x20-\x7E]", encode_match, text)
def build_search_criteria(from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False): def build_search_criteria(from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False):
""" """
构建 IMAP 搜索条件字符串支持简单查询逻辑 构建 IMAP 搜索条件字符串支持简单查询逻辑
@ -67,7 +65,6 @@ def build_search_criteria(from_email=None, subject=None, body_keyword=None, sinc
# 用空格拼接所有条件IMAP 默认 AND 逻辑) # 用空格拼接所有条件IMAP 默认 AND 逻辑)
return " ".join(criteria) if criteria else "ALL" return " ".join(criteria) if criteria else "ALL"
def parse_email_date(date_str, default_tz=timezone.utc): def parse_email_date(date_str, default_tz=timezone.utc):
""" """
安全解析邮件日期 安全解析邮件日期
@ -126,8 +123,7 @@ class EmailClient:
return result, data return result, data
def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None, def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False, max_count=100):
unseen=False, max_count=100):
""" """
使用构建的搜索条件查询最近的邮件 使用构建的搜索条件查询最近的邮件
@ -162,8 +158,9 @@ class EmailClient:
email_ids = data[0].split()[-max_count:] email_ids = data[0].split()[-max_count:]
return list(reversed(email_ids)) return list(reversed(email_ids))
def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None,
start_time=None, max_results=None):
def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None, max_results=None):
if max_results is not None and max_results <= 0: if max_results is not None and max_results <= 0:
raise ValueError("max_results must be a positive integer or None") raise ValueError("max_results must be a positive integer or None")
@ -181,42 +178,47 @@ class EmailClient:
msg = email.message_from_bytes(data[0][1]) msg = email.message_from_bytes(data[0][1])
# 检查发件人地址是否匹配
from_email = email.utils.parseaddr(msg["From"])[1] if msg["From"] else "" from_email = email.utils.parseaddr(msg["From"])[1] if msg["From"] else ""
if sender_regex and not sender_regex.search(from_email): if sender_regex and not sender_regex.search(from_email):
continue continue
# 检查标题是否匹配
subject, encoding = decode_header(msg["Subject"])[0] subject, encoding = decode_header(msg["Subject"])[0]
subject = subject.decode(encoding or "utf-8") if isinstance(subject, bytes) else subject subject = subject.decode(encoding or "utf-8") if isinstance(subject, bytes) else subject
if subject_regex and not subject_regex.search(subject): if subject_regex and not subject_regex.search(subject):
continue continue
print("1没报错") # 处理邮件内容
content = "" content = ""
for part in msg.walk(): for part in msg.walk():
content_type = part.get_content_type() content_type = part.get_content_type()
print("2没报错")
try: try:
if content_type == "text/plain": payload = part.get_payload(decode=True)
content += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8")
print("3没报错")
elif content_type == "text/html" and not content:
content += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8")
print("4没报错")
except Exception as e:
print(f"Warning: Failed to decode {content_type} content: {e}")
# 增加对 payload 类型的判断
if isinstance(payload, bytes):
if content_type == "text/plain":
content += payload.decode(part.get_content_charset() or "utf-8")
elif content_type == "text/html" and not content:
content += payload.decode(part.get_content_charset() or "utf-8")
else:
print(f"Skipping unexpected payload type: {type(payload)}, content_type: {content_type}")
except Exception as e:
print(f"Error processing MIME part: {e}")
# 检查内容关键字是否匹配
if keyword_regex and not keyword_regex.search(content): if keyword_regex and not keyword_regex.search(content):
continue continue
# 检查邮件日期是否在范围内
date_str = msg.get("Date") date_str = msg.get("Date")
email_date = parse_email_date(date_str) if date_str else None email_date = parse_email_date(date_str) if date_str else None
# print('email_date:',email_date)
# print('start_time:',start_time)
if start_time and email_date and email_date < start_time: if start_time and email_date and email_date < start_time:
continue continue
# 保存匹配的邮件
matched_email = { matched_email = {
"subject": subject, "subject": subject,
"from": from_email, "from": from_email,
@ -225,6 +227,7 @@ class EmailClient:
} }
all_matched_emails.append(matched_email) all_matched_emails.append(matched_email)
# 如果达到了最大匹配数,停止处理
if max_results and len(all_matched_emails) >= max_results: if max_results and len(all_matched_emails) >= max_results:
break break
@ -233,8 +236,10 @@ class EmailClient:
return all_matched_emails return all_matched_emails
def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None,
subject_pattern=None, start_time=None):
def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None):
matched_emails = self.fetch_all_matching_emails( matched_emails = self.fetch_all_matching_emails(
email_ids=email_ids, email_ids=email_ids,
sender_pattern=sender_pattern, sender_pattern=sender_pattern,
@ -298,7 +303,6 @@ class GoogleCodeReceiver:
body_keyword=username, body_keyword=username,
since=since # 从 `start_time` 往前推 2 天开始检索 since=since # 从 `start_time` 往前推 2 天开始检索
) )
matched_email = self.email_client.filter_emails_by_sender_and_keyword( matched_email = self.email_client.filter_emails_by_sender_and_keyword(
email_ids=email_ids, email_ids=email_ids,
# sender_pattern=sender_pattern, # sender_pattern=sender_pattern,
@ -320,6 +324,8 @@ class GoogleCodeReceiver:
raise TimeoutError("Timeout waiting for verification code.") raise TimeoutError("Timeout waiting for verification code.")
# 老的通用邮箱测试 # 老的通用邮箱测试
def mailTest(): def mailTest():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
@ -347,7 +353,7 @@ def mailTest():
print("主题:", latest_matched_email["subject"]) print("主题:", latest_matched_email["subject"])
print("发件人:", latest_matched_email["from"]) print("发件人:", latest_matched_email["from"])
print("内容:", latest_matched_email["content"]) print("内容:", latest_matched_email["content"])
print("时间:", latest_matched_email["date"]) print("时间:",latest_matched_email["date"])
else: else:
print("没有符合条件的时间上最新的匹配邮件") print("没有符合条件的时间上最新的匹配邮件")
@ -357,18 +363,17 @@ def mailTest():
if all_matched_emails: if all_matched_emails:
print("\n所有匹配的邮件:") print("\n所有匹配的邮件:")
for idx, email in enumerate(all_matched_emails): for idx, email in enumerate(all_matched_emails):
print(f"邮件 {idx + 1}:") print(f"邮件 {idx+1}:")
print("主题:", email["subject"]) print("主题:", email["subject"])
print("发件人:", email["from"]) print("发件人:", email["from"])
print("内容:", email["content"], "\n") # 显示内容 print("内容:", email["content"], "\n") # 显示内容
print("时间:", email["date"]) print("时间:",email["date"])
else: else:
print("没有符合条件的所有匹配邮件") print("没有符合条件的所有匹配邮件")
finally: finally:
client.disconnect() client.disconnect()
def codeTest(): def codeTest():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
username = "gmailvinted@mailezu.com" username = "gmailvinted@mailezu.com"
@ -376,22 +381,19 @@ def codeTest():
client = EmailClient(server, username, password) client = EmailClient(server, username, password)
client.connect() client.connect()
try:
code_receiver = GoogleCodeReceiver(client) code_receiver = GoogleCodeReceiver(client)
# 这里改成要捕获的目标邮件地址
code = code_receiver.wait_code(
username="RibeAchour875@gmail.com", timeout=300, interval=5,
start_time=datetime(2024, 11, 10))
# code = code_receiver.wait_code(username="RibeAchour875@gmail.com", timeout=300, interval=5)
code = ''
try:
code = code_receiver.wait_code(
username="RibeAchour875@gmail.com", timeout=2, interval=1,
start_time=datetime(2024, 11, 10))
print(f"收到谷歌验证码: {code}") print(f"收到谷歌验证码: {code}")
except TimeoutError as e:
print(e)
except Exception as e: except Exception as e:
print(f"An unexpected error occurred: {e}") print(f"An unexpected error occurred: {e}")
finally:
client.disconnect()
if not code:
print("并没有收到验证码")
def test3(): def test3():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
@ -407,7 +409,7 @@ def test3():
# mailTest() # mailTest()
ok, email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024') ok,email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024')
if email_ids: if email_ids:
print(email_ids) print(email_ids)
sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱 sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱
@ -416,17 +418,15 @@ def test3():
if all_matched_emails: if all_matched_emails:
print("\n所有匹配的邮件:") print("\n所有匹配的邮件:")
for idx, email in enumerate(all_matched_emails): for idx, email in enumerate(all_matched_emails):
print(f"邮件 {idx + 1}:") print(f"邮件 {idx+1}:")
print("主题:", email["subject"]) print("主题:", email["subject"])
print("发件人:", email["from"]) print("发件人:", email["from"])
print("内容:", email["content"], "\n") # 显示内容 print("内容:", email["content"], "\n") # 显示内容
print("时间:", email["date"]) print("时间:",email["date"])
else: else:
print("没有符合条件的所有匹配邮件") print("没有符合条件的所有匹配邮件")
else: else:
print("查不到") print("查不到")
# 使用示例 # 使用示例
if __name__ == "__main__": if __name__ == "__main__":
# test3() # test3()

23
main.py
View File

@ -583,7 +583,7 @@ def save_log(random_number, log_content):
f.write("\n") f.write("\n")
def retry_with_recovery(tab, hwnd, email_account, MAX_RETRIES=4, wait_time=15): def retry_with_recovery(tab, hwnd, email_account, MAX_RETRIES=4, wait_time=5):
""" """
封装重试逻辑最多尝试 `MAX_RETRIES` 检查是否存在重试按钮并重新输入邮箱地址 封装重试逻辑最多尝试 `MAX_RETRIES` 检查是否存在重试按钮并重新输入邮箱地址
@ -650,10 +650,7 @@ def logutGoogle(tab) -> bool:
def main(email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, proxy_username, proxy_password,region, row_index): def main(email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, proxy_username, proxy_password,region, row_index):
# 初始化
client = EmailClient(server, username, password)
client.connect()
code_receiver = GoogleCodeReceiver(client)
print("邮箱账户:", email_account) # 邮箱账户 print("邮箱账户:", email_account) # 邮箱账户
print("邮箱密码:", email_password) # 邮箱密码 print("邮箱密码:", email_password) # 邮箱密码
@ -832,7 +829,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
print("输入邮箱账号运行完毕") print("输入邮箱账号运行完毕")
save_log(random_number, "输入邮箱账号运行完毕") save_log(random_number, "输入邮箱账号运行完毕")
wrong = tab.ele('@text()=出了点问题', timeout=15) wrong = tab.ele('@text()=出了点问题', timeout=5)
if wrong: if wrong:
next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15) next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15)
next_span.click(by_js=True) next_span.click(by_js=True)
@ -858,7 +855,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
save_log(random_number, "检查是否出现了wrong运行完毕") save_log(random_number, "检查是否出现了wrong运行完毕")
# 看下是否出现了手机号 # 看下是否出现了手机号
telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=5)
if telephone: if telephone:
update_status_in_db(email_account, '接码') update_status_in_db(email_account, '接码')
browser.quit() browser.quit()
@ -926,7 +923,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
# 看下是否出现了手机号 # 看下是否出现了手机号
telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=5)
if telephone: if telephone:
update_status_in_db(email_account, '接码') update_status_in_db(email_account, '接码')
browser.quit() browser.quit()
@ -1091,17 +1088,23 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
update_status_in_db(email_account, '已更改') update_status_in_db(email_account, '已更改')
return email_account return email_account
else: else:
# 初始化
client = EmailClient(server, username, password)
client.connect()
code_receiver = GoogleCodeReceiver(client)
# 最多尝试3次循环 # 最多尝试3次循环
for attempt in range(3): for attempt in range(3):
print(f"{attempt + 1} 次尝试检查验证码...") print(f"{attempt + 1} 次尝试检查验证码...")
# 获取当前程序运行的时间 # 获取当前程序运行的时间
current_time = datetime.now(timezone.utc) current_time = datetime.now(timezone.utc)
print(f"当前时间为{current_time}")
# 检查验证码超时时间为300秒使用当前时间作为 start_time # 检查验证码超时时间为300秒使用当前时间作为 start_time
try: try:
code = code_receiver.wait_code(username=f"{email_account}", timeout=300, interval=5, code = code_receiver.wait_code(username=f"{email_account}", timeout=300, interval=5,
start_time=current_time) start_time=datetime(2024, 11, 27))
print(f"检查的验证码是{code}") print(f"检查的验证码是{code}")
save_log(random_number, f"检查的验证码是{code}") save_log(random_number, f"检查的验证码是{code}")
if code: if code:
@ -1150,8 +1153,6 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re
sent_code_button = tab.ele('@text():Send a new code.', timeout=15) sent_code_button = tab.ele('@text():Send a new code.', timeout=15)
if sent_code_button: if sent_code_button:
sent_code_button.click(by_js=True) sent_code_button.click(by_js=True)
except ElementNotFoundError as e: except ElementNotFoundError as e:
print(f"更改辅助邮箱账号的时候找不到元素:{e}") print(f"更改辅助邮箱账号的时候找不到元素:{e}")
save_log(random_number, f"更改辅助邮箱账号的时候找不到元素:{e}") save_log(random_number, f"更改辅助邮箱账号的时候找不到元素:{e}")