import imaplib import email import re from email.header import decode_header from datetime import datetime, timezone from email.utils import parsedate_to_datetime import time from datetime import datetime, timedelta def to_utf7_imap(text): """ 将文本转换为 IMAP UTF-7 编码格式。 参数: text (str): 要转换的文本。 返回: str: 转换后的 UTF-7 编码文本。 """ if not text: return "" # 匹配所有非 ASCII 字符 def encode_match(match): return "&" + match.group(0).encode("utf-16be").hex().upper() + "-" # 替换非 ASCII 字符为 UTF-7 格式 return re.sub(r"[^\x20-\x7E]", encode_match, text) def build_search_criteria(from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False): """ 构建 IMAP 搜索条件字符串,支持简单查询逻辑。 参数: from_email (str): 发件人邮箱地址。 subject (str): 邮件标题的关键字。 body_keyword (str): 邮件正文的关键字。 since (datetime): 起始时间,筛选此时间之后的邮件。 before (datetime): 截止时间,筛选此时间之前的邮件。 unseen (bool): 是否仅筛选未读邮件。 返回: str: 构建的 IMAP 搜索条件字符串。 """ criteria = [] if unseen: criteria.append("UNSEEN") if from_email: criteria.append(f'FROM "{to_utf7_imap(from_email)}"') if subject: criteria.append(f'SUBJECT "{to_utf7_imap(subject)}"') if body_keyword: criteria.append(f'BODY "{to_utf7_imap(body_keyword)}"') if since: criteria.append(f'SINCE {since.strftime("%d-%b-%Y")}') if before: criteria.append(f'BEFORE {before.strftime("%d-%b-%Y")}') # 用空格拼接所有条件(IMAP 默认 AND 逻辑) return " ".join(criteria) if criteria else "ALL" def parse_email_date(date_str, default_tz=timezone.utc): """ 安全解析邮件日期。 """ if not date_str: return None try: email_date = parsedate_to_datetime(date_str) if email_date and email_date.tzinfo is None: email_date = email_date.replace(tzinfo=default_tz) return email_date except Exception as e: print(f"Warning: Failed to parse date '{date_str}': {e}") return None class EmailClient: def __init__(self, host, username, password): self.host = host self.username = username self.password = password self.connection = None def connect(self): self.connection = imaplib.IMAP4(self.host) self.connection.login(self.username, self.password) self.connection.select("inbox") def disconnect(self): if self.connection: self.connection.logout() def search_emails(self, search_criteria): """ 使用自由构建的搜索条件执行 IMAP 搜索。 参数: search_criteria (str): 用于 IMAP 搜索的条件字符串。 返回: tuple: 搜索结果 (result, data),其中: - result (str): 搜索状态 ("OK" 表示成功)。 - data (list): 搜索到的邮件 ID 列表。 Raises: Exception: 如果搜索失败。 """ result, data = self.connection.search(None, search_criteria) if result != "OK": raise Exception(f"Failed to search emails with criteria: {search_criteria}") email_ids = data[0].split() if not email_ids: print("Debug: No matching emails found.") return result, [] # 返回空列表以便后续处理 return result, data def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False, max_count=100): """ 使用构建的搜索条件查询最近的邮件。 参数: from_email (str): 发件人邮箱地址。 subject (str): 邮件标题关键字。 body_keyword (str): 邮件正文关键字。 since (datetime): 起始时间。 before (datetime): 截止时间。 unseen (bool): 是否只查询未读邮件。 max_count (int): 返回的邮件数量上限。 返回: list: 符合条件的邮件 ID 列表,按接收时间倒序排列。 """ search_criteria = build_search_criteria( from_email=from_email, subject=subject, body_keyword=body_keyword, since=since, before=before, unseen=unseen ) result, data = self.search_emails(search_criteria) # 检查 `data` 是否有效 if not data or not data[0]: print("Debug: No emails found matching the criteria.") return [] # 返回空列表 # 正常处理邮件 ID email_ids = data[0].split()[-max_count:] return list(reversed(email_ids)) def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None, max_results=None): if max_results is not None and max_results <= 0: raise ValueError("max_results must be a positive integer or None") sender_regex = re.compile(sender_pattern) if sender_pattern else None keyword_regex = re.compile(keyword_pattern) if keyword_pattern else None subject_regex = re.compile(subject_pattern) if subject_pattern else None all_matched_emails = [] for email_id in email_ids: try: result, data = self.connection.fetch(email_id, "(RFC822)") if result != "OK": print(f"Warning: Failed to fetch email with ID {email_id}") continue msg = email.message_from_bytes(data[0][1]) # 检查发件人地址是否匹配 from_email = email.utils.parseaddr(msg["From"])[1] if msg["From"] else "" if sender_regex and not sender_regex.search(from_email): continue # 检查标题是否匹配 subject, encoding = decode_header(msg["Subject"])[0] subject = subject.decode(encoding or "utf-8") if isinstance(subject, bytes) else subject if subject_regex and not subject_regex.search(subject): continue # 处理邮件内容 content = "" for part in msg.walk(): content_type = part.get_content_type() try: payload = part.get_payload(decode=True) # 增加对 payload 类型的判断 if isinstance(payload, bytes): if content_type == "text/plain": content += payload.decode(part.get_content_charset() or "utf-8") elif content_type == "text/html" and not content: content += payload.decode(part.get_content_charset() or "utf-8") else: print(f"Skipping unexpected payload type: {type(payload)}, content_type: {content_type}") except Exception as e: print(f"Error processing MIME part: {e}") # 检查内容关键字是否匹配 if keyword_regex and not keyword_regex.search(content): continue # 检查邮件日期是否在范围内 date_str = msg.get("Date") email_date = parse_email_date(date_str) if date_str else None if start_time and email_date and email_date < start_time: continue # 保存匹配的邮件 matched_email = { "subject": subject, "from": from_email, "content": content, "date": email_date } all_matched_emails.append(matched_email) # 如果达到了最大匹配数,停止处理 if max_results and len(all_matched_emails) >= max_results: break except Exception as e: print(f"Error: Failed to process email ID {email_id}: {e}") return all_matched_emails def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None): matched_emails = self.fetch_all_matching_emails( email_ids=email_ids, sender_pattern=sender_pattern, keyword_pattern=keyword_pattern, subject_pattern=subject_pattern, start_time=start_time, max_results=1 ) return matched_emails[0] if matched_emails else None class GoogleCodeReceiver: def __init__(self, email_client): self.email_client = email_client self.last_timestamp = datetime.now(timezone.utc) def _update_last_timestamp(self): self.last_timestamp = datetime.now(timezone.utc) def wait_code(self, username, timeout=60, interval=3, start_time=None): """ 等待 Google 验证码邮件。 参数: username (str): 用户名,用于在正文中检索匹配邮件。 timeout (int): 最大等待时间,单位为秒。 interval (int): 轮询间隔,单位为秒。 start_time (datetime): 开始检索邮件的时间。默认值为当前时间。 返回: str: 提取的验证码。 """ # 如果未指定 `start_time`,使用当前时间 if start_time is None: start_time = datetime.now(timezone.utc) # 默认使用当前 UTC 时间 elif start_time.tzinfo is None: # 如果 `start_time` 没有时区信息,假定为本地时间,并转为 UTC local_time = start_time.astimezone() start_time = local_time.astimezone(timezone.utc) else: # 如果已经有时区信息,统一转为 UTC start_time = start_time.astimezone(timezone.utc) # 确定 `since` 时间,取 `start_time` 往前推 2 天 since = start_time - timedelta(days=2) # 更新 `last_timestamp` self._update_last_timestamp() end_time = datetime.now(timezone.utc) + timedelta(seconds=timeout) subject_pattern = r"(?:Email verification code|电子邮件验证码)[::]?\s*(\d{6})" # sender_pattern = r"noreply@google\.com" # keyword_pattern = re.escape(username) while datetime.now(timezone.utc) < end_time: try: # Fetch recent emails with the adjusted `since` email_ids = self.email_client.fetch_recent_emails( max_count=10, from_email="noreply@google.com", body_keyword=username, since=since # 从 `start_time` 往前推 2 天开始检索 ) matched_email = self.email_client.filter_emails_by_sender_and_keyword( email_ids=email_ids, # sender_pattern=sender_pattern, # keyword_pattern=keyword_pattern, subject_pattern=subject_pattern, start_time=start_time # 确保只检索 `start_time` 之后的邮件 ) if matched_email: match = re.search(subject_pattern, matched_email["subject"]) if match: return match.group(1) except Exception as e: print(f"Warning: Error occurred while fetching code: {e}") time.sleep(interval) raise TimeoutError("Timeout waiting for verification code.") # 老的通用邮箱测试 def mailTest(): server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 username = "gmailvinted@mailezu.com" password = "g1l2o0hld84" client = EmailClient(server, username, password) client.connect() start_time = datetime(2024, 11, 22) sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱 keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式 try: email_ids = client.fetch_recent_emails( max_count=10, from_email="noreply@google.com", # subject='Email verification code',#中文邮件叫 '电子邮件验证码‘ body_keyword='RibeAchour875@gmail.com', since=start_time, ) # 获取时间上最新的匹配项,应用起始时间过滤器 latest_matched_email = client.filter_emails_by_sender_and_keyword(email_ids, sender_pattern, keyword_pattern) if latest_matched_email: print("\n时间上最新的匹配邮件:") print("主题:", latest_matched_email["subject"]) print("发件人:", latest_matched_email["from"]) print("内容:", latest_matched_email["content"]) print("时间:",latest_matched_email["date"]) else: print("没有符合条件的时间上最新的匹配邮件") # print(f"ids:{email_ids}") # 获取所有匹配的邮件,应用起始时间过滤器 all_matched_emails = client.fetch_all_matching_emails(email_ids, sender_pattern, keyword_pattern) if all_matched_emails: print("\n所有匹配的邮件:") for idx, email in enumerate(all_matched_emails): print(f"邮件 {idx+1}:") print("主题:", email["subject"]) print("发件人:", email["from"]) print("内容:", email["content"], "\n") # 显示内容 print("时间:",email["date"]) else: print("没有符合条件的所有匹配邮件") finally: client.disconnect() def codeTest(): server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 username = "gmailvinted@mailezu.com" password = "g1l2o0hld84" client = EmailClient(server, username, password) client.connect() code_receiver = GoogleCodeReceiver(client) code = '' try: code = code_receiver.wait_code( username="RibeAchour875@gmail.com", timeout=2, interval=1, start_time=datetime(2024, 11, 10)) print(f"收到谷歌验证码: {code}") except Exception as e: print(f"An unexpected error occurred: {e}") if not code: print("并没有收到验证码") def test3(): server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址 username = "gmailvinted@mailezu.com" password = "g1l2o0hld84" # server = "imap.qq.com" # username = "bigemon@foxmail.com" # password = "ejudkkdfiuemcaaj" client = EmailClient(server, username, password) client.connect() # mailTest() ok,email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024') if email_ids: print(email_ids) sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱 keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式 all_matched_emails = client.fetch_all_matching_emails(email_ids) if all_matched_emails: print("\n所有匹配的邮件:") for idx, email in enumerate(all_matched_emails): print(f"邮件 {idx+1}:") print("主题:", email["subject"]) print("发件人:", email["from"]) print("内容:", email["content"], "\n") # 显示内容 print("时间:",email["date"]) else: print("没有符合条件的所有匹配邮件") else: print("查不到") # 使用示例 if __name__ == "__main__": # test3() codeTest()