From a786dca4703010b51b802f40c1305f7ca2e83c3c Mon Sep 17 00:00:00 2001 From: dghc2023 <3053714263@qq.com> Date: Wed, 27 Nov 2024 05:06:31 +0000 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 最新的,并发一次能运行,但是没有邮箱没有断开重连 --- mail.py | 120 ++++++++++++++++++++++++++++---------------------------- main.py | 23 +++++------ 2 files changed, 72 insertions(+), 71 deletions(-) diff --git a/mail.py b/mail.py index 851a050..1a89dcf 100644 --- a/mail.py +++ b/mail.py @@ -20,7 +20,6 @@ def to_utf7_imap(text): """ if not text: return "" - # 匹配所有非 ASCII 字符 def encode_match(match): return "&" + match.group(0).encode("utf-16be").hex().upper() + "-" @@ -28,7 +27,6 @@ def to_utf7_imap(text): # 替换非 ASCII 字符为 UTF-7 格式 return re.sub(r"[^\x20-\x7E]", encode_match, text) - def build_search_criteria(from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False): """ 构建 IMAP 搜索条件字符串,支持简单查询逻辑。 @@ -67,7 +65,6 @@ def build_search_criteria(from_email=None, subject=None, body_keyword=None, sinc # 用空格拼接所有条件(IMAP 默认 AND 逻辑) return " ".join(criteria) if criteria else "ALL" - def parse_email_date(date_str, default_tz=timezone.utc): """ 安全解析邮件日期。 @@ -90,12 +87,12 @@ class EmailClient: self.username = username self.password = password self.connection = None - + def connect(self): self.connection = imaplib.IMAP4(self.host) self.connection.login(self.username, self.password) self.connection.select("inbox") - + def disconnect(self): if self.connection: self.connection.logout() @@ -118,16 +115,15 @@ class EmailClient: result, data = self.connection.search(None, search_criteria) if result != "OK": raise Exception(f"Failed to search emails with criteria: {search_criteria}") - + email_ids = data[0].split() if not email_ids: print("Debug: No matching emails found.") return result, [] # 返回空列表以便后续处理 - + return result, data - - def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None, - unseen=False, max_count=100): + + def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False, max_count=100): """ 使用构建的搜索条件查询最近的邮件。 @@ -162,8 +158,9 @@ class EmailClient: email_ids = data[0].split()[-max_count:] return list(reversed(email_ids)) - def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, - start_time=None, max_results=None): + + + def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None, max_results=None): if max_results is not None and max_results <= 0: raise ValueError("max_results must be a positive integer or None") @@ -181,42 +178,47 @@ class EmailClient: msg = email.message_from_bytes(data[0][1]) + # 检查发件人地址是否匹配 from_email = email.utils.parseaddr(msg["From"])[1] if msg["From"] else "" if sender_regex and not sender_regex.search(from_email): continue + # 检查标题是否匹配 subject, encoding = decode_header(msg["Subject"])[0] subject = subject.decode(encoding or "utf-8") if isinstance(subject, bytes) else subject if subject_regex and not subject_regex.search(subject): continue - print("1没报错") - + # 处理邮件内容 content = "" for part in msg.walk(): content_type = part.get_content_type() - print("2没报错") try: - if content_type == "text/plain": - content += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8") - print("3没报错") - elif content_type == "text/html" and not content: - content += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8") - print("4没报错") - except Exception as e: - print(f"Warning: Failed to decode {content_type} content: {e}") + payload = part.get_payload(decode=True) + # 增加对 payload 类型的判断 + if isinstance(payload, bytes): + if content_type == "text/plain": + content += payload.decode(part.get_content_charset() or "utf-8") + elif content_type == "text/html" and not content: + content += payload.decode(part.get_content_charset() or "utf-8") + else: + print(f"Skipping unexpected payload type: {type(payload)}, content_type: {content_type}") + + except Exception as e: + print(f"Error processing MIME part: {e}") + + # 检查内容关键字是否匹配 if keyword_regex and not keyword_regex.search(content): continue - + # 检查邮件日期是否在范围内 date_str = msg.get("Date") email_date = parse_email_date(date_str) if date_str else None - # print('email_date:',email_date) - # print('start_time:',start_time) if start_time and email_date and email_date < start_time: continue + # 保存匹配的邮件 matched_email = { "subject": subject, "from": from_email, @@ -225,6 +227,7 @@ class EmailClient: } all_matched_emails.append(matched_email) + # 如果达到了最大匹配数,停止处理 if max_results and len(all_matched_emails) >= max_results: break @@ -233,8 +236,10 @@ class EmailClient: return all_matched_emails - def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None, - subject_pattern=None, start_time=None): + + + + def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None): matched_emails = self.fetch_all_matching_emails( email_ids=email_ids, sender_pattern=sender_pattern, @@ -298,7 +303,6 @@ class GoogleCodeReceiver: body_keyword=username, since=since # 从 `start_time` 往前推 2 天开始检索 ) - matched_email = self.email_client.filter_emails_by_sender_and_keyword( email_ids=email_ids, # sender_pattern=sender_pattern, @@ -306,7 +310,7 @@ class GoogleCodeReceiver: subject_pattern=subject_pattern, start_time=start_time # 确保只检索 `start_time` 之后的邮件 ) - + if matched_email: match = re.search(subject_pattern, matched_email["subject"]) if match: @@ -320,14 +324,16 @@ class GoogleCodeReceiver: raise TimeoutError("Timeout waiting for verification code.") + + # 老的通用邮箱测试 def mailTest(): - server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 + server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 username = "gmailvinted@mailezu.com" - password = "g1l2o0hld84" + password = "g1l2o0hld84" client = EmailClient(server, username, password) client.connect() - start_time = datetime(2024, 11, 22) + start_time = datetime(2024, 11, 22) sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱 keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式 try: @@ -338,7 +344,7 @@ def mailTest(): # subject='Email verification code',#中文邮件叫 '电子邮件验证码‘ body_keyword='RibeAchour875@gmail.com', since=start_time, - ) + ) # 获取时间上最新的匹配项,应用起始时间过滤器 latest_matched_email = client.filter_emails_by_sender_and_keyword(email_ids, sender_pattern, keyword_pattern) @@ -347,7 +353,7 @@ def mailTest(): print("主题:", latest_matched_email["subject"]) print("发件人:", latest_matched_email["from"]) print("内容:", latest_matched_email["content"]) - print("时间:", latest_matched_email["date"]) + print("时间:",latest_matched_email["date"]) else: print("没有符合条件的时间上最新的匹配邮件") @@ -357,57 +363,53 @@ def mailTest(): if all_matched_emails: print("\n所有匹配的邮件:") for idx, email in enumerate(all_matched_emails): - print(f"邮件 {idx + 1}:") + print(f"邮件 {idx+1}:") print("主题:", email["subject"]) print("发件人:", email["from"]) print("内容:", email["content"], "\n") # 显示内容 - print("时间:", email["date"]) + print("时间:",email["date"]) else: print("没有符合条件的所有匹配邮件") finally: client.disconnect() - def codeTest(): - server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 + server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 username = "gmailvinted@mailezu.com" - password = "g1l2o0hld84" + password = "g1l2o0hld84" client = EmailClient(server, username, password) client.connect() - try: - code_receiver = GoogleCodeReceiver(client) - # 这里改成要捕获的目标邮件地址 - code = code_receiver.wait_code( - username="RibeAchour875@gmail.com", timeout=300, interval=5, - start_time=datetime(2024, 11, 10)) - # code = code_receiver.wait_code(username="RibeAchour875@gmail.com", timeout=300, interval=5) + code_receiver = GoogleCodeReceiver(client) + code = '' + try: + code = code_receiver.wait_code( + username="RibeAchour875@gmail.com", timeout=2, interval=1, + start_time=datetime(2024, 11, 10)) print(f"收到谷歌验证码: {code}") - except TimeoutError as e: - print(e) except Exception as e: print(f"An unexpected error occurred: {e}") - finally: - client.disconnect() - + + if not code: + print("并没有收到验证码") def test3(): - server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 + server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 username = "gmailvinted@mailezu.com" - password = "g1l2o0hld84" + password = "g1l2o0hld84" # server = "imap.qq.com" # username = "bigemon@foxmail.com" - # password = "ejudkkdfiuemcaaj" + # password = "ejudkkdfiuemcaaj" client = EmailClient(server, username, password) client.connect() # mailTest() - ok, email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024') + ok,email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024') if email_ids: print(email_ids) sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱 @@ -416,17 +418,15 @@ def test3(): if all_matched_emails: print("\n所有匹配的邮件:") for idx, email in enumerate(all_matched_emails): - print(f"邮件 {idx + 1}:") + print(f"邮件 {idx+1}:") print("主题:", email["subject"]) print("发件人:", email["from"]) print("内容:", email["content"], "\n") # 显示内容 - print("时间:", email["date"]) + print("时间:",email["date"]) else: print("没有符合条件的所有匹配邮件") else: print("查不到") - - # 使用示例 if __name__ == "__main__": # test3() diff --git a/main.py b/main.py index c8c4a57..84a3567 100644 --- a/main.py +++ b/main.py @@ -583,7 +583,7 @@ def save_log(random_number, log_content): f.write("\n") -def retry_with_recovery(tab, hwnd, email_account, MAX_RETRIES=4, wait_time=15): +def retry_with_recovery(tab, hwnd, email_account, MAX_RETRIES=4, wait_time=5): """ 封装重试逻辑,最多尝试 `MAX_RETRIES` 次,检查是否存在重试按钮,并重新输入邮箱地址。 @@ -650,10 +650,7 @@ def logutGoogle(tab) -> bool: def main(email_account, email_password, old_recovery_email, new_password, new_recovery_email, proxy_host, proxy_port, proxy_username, proxy_password,region, row_index): - # 初始化 - client = EmailClient(server, username, password) - client.connect() - code_receiver = GoogleCodeReceiver(client) + print("邮箱账户:", email_account) # 邮箱账户 print("邮箱密码:", email_password) # 邮箱密码 @@ -832,7 +829,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re print("输入邮箱账号运行完毕") save_log(random_number, "输入邮箱账号运行完毕") - wrong = tab.ele('@text()=出了点问题', timeout=15) + wrong = tab.ele('@text()=出了点问题', timeout=5) if wrong: next_span = tab.ele("@@tag()=span@@text()=下一步", timeout=15) next_span.click(by_js=True) @@ -858,7 +855,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re save_log(random_number, "检查是否出现了wrong运行完毕") # 看下是否出现了手机号 - telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) + telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=5) if telephone: update_status_in_db(email_account, '接码') browser.quit() @@ -926,7 +923,7 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re # 看下是否出现了手机号 - telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=15) + telephone = tab.ele("@text()=请输入电话号码,以便通过短信接收验证码。", timeout=5) if telephone: update_status_in_db(email_account, '接码') browser.quit() @@ -1091,17 +1088,23 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re update_status_in_db(email_account, '已更改') return email_account else: + # 初始化 + client = EmailClient(server, username, password) + client.connect() + code_receiver = GoogleCodeReceiver(client) + # 最多尝试3次循环 for attempt in range(3): print(f"第 {attempt + 1} 次尝试检查验证码...") # 获取当前程序运行的时间 current_time = datetime.now(timezone.utc) + print(f"当前时间为{current_time}") # 检查验证码,超时时间为300秒,使用当前时间作为 start_time try: code = code_receiver.wait_code(username=f"{email_account}", timeout=300, interval=5, - start_time=current_time) + start_time=datetime(2024, 11, 27)) print(f"检查的验证码是{code}") save_log(random_number, f"检查的验证码是{code}") if code: @@ -1150,8 +1153,6 @@ def main(email_account, email_password, old_recovery_email, new_password, new_re sent_code_button = tab.ele('@text():Send a new code.', timeout=15) if sent_code_button: sent_code_button.click(by_js=True) - - except ElementNotFoundError as e: print(f"更改辅助邮箱账号的时候找不到元素:{e}") save_log(random_number, f"更改辅助邮箱账号的时候找不到元素:{e}")