Gmail/mail.py

435 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import imaplib
import email
import re
from email.header import decode_header
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
import time
from datetime import datetime, timedelta
def to_utf7_imap(text):
"""
将文本转换为 IMAP UTF-7 编码格式。
参数:
text (str): 要转换的文本。
返回:
str: 转换后的 UTF-7 编码文本。
"""
if not text:
return ""
# 匹配所有非 ASCII 字符
def encode_match(match):
return "&" + match.group(0).encode("utf-16be").hex().upper() + "-"
# 替换非 ASCII 字符为 UTF-7 格式
return re.sub(r"[^\x20-\x7E]", encode_match, text)
def build_search_criteria(from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False):
"""
构建 IMAP 搜索条件字符串,支持简单查询逻辑。
参数:
from_email (str): 发件人邮箱地址。
subject (str): 邮件标题的关键字。
body_keyword (str): 邮件正文的关键字。
since (datetime): 起始时间,筛选此时间之后的邮件。
before (datetime): 截止时间,筛选此时间之前的邮件。
unseen (bool): 是否仅筛选未读邮件。
返回:
str: 构建的 IMAP 搜索条件字符串。
"""
criteria = []
if unseen:
criteria.append("UNSEEN")
if from_email:
criteria.append(f'FROM "{to_utf7_imap(from_email)}"')
if subject:
criteria.append(f'SUBJECT "{to_utf7_imap(subject)}"')
if body_keyword:
criteria.append(f'BODY "{to_utf7_imap(body_keyword)}"')
if since:
criteria.append(f'SINCE {since.strftime("%d-%b-%Y")}')
if before:
criteria.append(f'BEFORE {before.strftime("%d-%b-%Y")}')
# 用空格拼接所有条件IMAP 默认 AND 逻辑)
return " ".join(criteria) if criteria else "ALL"
def parse_email_date(date_str, default_tz=timezone.utc):
"""
安全解析邮件日期。
"""
if not date_str:
return None
try:
email_date = parsedate_to_datetime(date_str)
if email_date and email_date.tzinfo is None:
email_date = email_date.replace(tzinfo=default_tz)
return email_date
except Exception as e:
print(f"Warning: Failed to parse date '{date_str}': {e}")
return None
class EmailClient:
def __init__(self, host, username, password):
self.host = host
self.username = username
self.password = password
self.connection = None
def connect(self):
self.connection = imaplib.IMAP4(self.host)
self.connection.login(self.username, self.password)
self.connection.select("inbox")
def disconnect(self):
if self.connection:
self.connection.logout()
def search_emails(self, search_criteria):
"""
使用自由构建的搜索条件执行 IMAP 搜索。
参数:
search_criteria (str): 用于 IMAP 搜索的条件字符串。
返回:
tuple: 搜索结果 (result, data),其中:
- result (str): 搜索状态 ("OK" 表示成功)。
- data (list): 搜索到的邮件 ID 列表。
Raises:
Exception: 如果搜索失败。
"""
result, data = self.connection.search(None, search_criteria)
if result != "OK":
raise Exception(f"Failed to search emails with criteria: {search_criteria}")
email_ids = data[0].split()
if not email_ids:
print("Debug: No matching emails found.")
return result, [] # 返回空列表以便后续处理
return result, data
def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None,
unseen=False, max_count=100):
"""
使用构建的搜索条件查询最近的邮件。
参数:
from_email (str): 发件人邮箱地址。
subject (str): 邮件标题关键字。
body_keyword (str): 邮件正文关键字。
since (datetime): 起始时间。
before (datetime): 截止时间。
unseen (bool): 是否只查询未读邮件。
max_count (int): 返回的邮件数量上限。
返回:
list: 符合条件的邮件 ID 列表,按接收时间倒序排列。
"""
search_criteria = build_search_criteria(
from_email=from_email,
subject=subject,
body_keyword=body_keyword,
since=since,
before=before,
unseen=unseen
)
result, data = self.search_emails(search_criteria)
# 检查 `data` 是否有效
if not data or not data[0]:
print("Debug: No emails found matching the criteria.")
return [] # 返回空列表
# 正常处理邮件 ID
email_ids = data[0].split()[-max_count:]
return list(reversed(email_ids))
def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None,
start_time=None, max_results=None):
if max_results is not None and max_results <= 0:
raise ValueError("max_results must be a positive integer or None")
sender_regex = re.compile(sender_pattern) if sender_pattern else None
keyword_regex = re.compile(keyword_pattern) if keyword_pattern else None
subject_regex = re.compile(subject_pattern) if subject_pattern else None
all_matched_emails = []
for email_id in email_ids:
try:
result, data = self.connection.fetch(email_id, "(RFC822)")
if result != "OK":
print(f"Warning: Failed to fetch email with ID {email_id}")
continue
msg = email.message_from_bytes(data[0][1])
from_email = email.utils.parseaddr(msg["From"])[1] if msg["From"] else ""
if sender_regex and not sender_regex.search(from_email):
continue
subject, encoding = decode_header(msg["Subject"])[0]
subject = subject.decode(encoding or "utf-8") if isinstance(subject, bytes) else subject
if subject_regex and not subject_regex.search(subject):
continue
print("1没报错")
content = ""
for part in msg.walk():
content_type = part.get_content_type()
print("2没报错")
try:
if content_type == "text/plain":
content += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8")
print("3没报错")
elif content_type == "text/html" and not content:
content += part.get_payload(decode=True).decode(part.get_content_charset() or "utf-8")
print("4没报错")
except Exception as e:
print(f"Warning: Failed to decode {content_type} content: {e}")
if keyword_regex and not keyword_regex.search(content):
continue
date_str = msg.get("Date")
email_date = parse_email_date(date_str) if date_str else None
# print('email_date:',email_date)
# print('start_time:',start_time)
if start_time and email_date and email_date < start_time:
continue
matched_email = {
"subject": subject,
"from": from_email,
"content": content,
"date": email_date
}
all_matched_emails.append(matched_email)
if max_results and len(all_matched_emails) >= max_results:
break
except Exception as e:
print(f"Error: Failed to process email ID {email_id}: {e}")
return all_matched_emails
def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None,
subject_pattern=None, start_time=None):
matched_emails = self.fetch_all_matching_emails(
email_ids=email_ids,
sender_pattern=sender_pattern,
keyword_pattern=keyword_pattern,
subject_pattern=subject_pattern,
start_time=start_time,
max_results=1
)
return matched_emails[0] if matched_emails else None
class GoogleCodeReceiver:
def __init__(self, email_client):
self.email_client = email_client
self.last_timestamp = datetime.now(timezone.utc)
def _update_last_timestamp(self):
self.last_timestamp = datetime.now(timezone.utc)
def wait_code(self, username, timeout=60, interval=3, start_time=None):
"""
等待 Google 验证码邮件。
参数:
username (str): 用户名,用于在正文中检索匹配邮件。
timeout (int): 最大等待时间,单位为秒。
interval (int): 轮询间隔,单位为秒。
start_time (datetime): 开始检索邮件的时间。默认值为当前时间。
返回:
str: 提取的验证码。
"""
# 如果未指定 `start_time`,使用当前时间
if start_time is None:
start_time = datetime.now(timezone.utc) # 默认使用当前 UTC 时间
elif start_time.tzinfo is None:
# 如果 `start_time` 没有时区信息,假定为本地时间,并转为 UTC
local_time = start_time.astimezone()
start_time = local_time.astimezone(timezone.utc)
else:
# 如果已经有时区信息,统一转为 UTC
start_time = start_time.astimezone(timezone.utc)
# 确定 `since` 时间,取 `start_time` 往前推 2 天
since = start_time - timedelta(days=2)
# 更新 `last_timestamp`
self._update_last_timestamp()
end_time = datetime.now(timezone.utc) + timedelta(seconds=timeout)
subject_pattern = r"(?:Email verification code|电子邮件验证码)[:]?\s*(\d{6})"
# sender_pattern = r"noreply@google\.com"
# keyword_pattern = re.escape(username)
while datetime.now(timezone.utc) < end_time:
try:
# Fetch recent emails with the adjusted `since`
email_ids = self.email_client.fetch_recent_emails(
max_count=10,
from_email="noreply@google.com",
body_keyword=username,
since=since # 从 `start_time` 往前推 2 天开始检索
)
matched_email = self.email_client.filter_emails_by_sender_and_keyword(
email_ids=email_ids,
# sender_pattern=sender_pattern,
# keyword_pattern=keyword_pattern,
subject_pattern=subject_pattern,
start_time=start_time # 确保只检索 `start_time` 之后的邮件
)
if matched_email:
match = re.search(subject_pattern, matched_email["subject"])
if match:
return match.group(1)
except Exception as e:
print(f"Warning: Error occurred while fetching code: {e}")
time.sleep(interval)
raise TimeoutError("Timeout waiting for verification code.")
# 老的通用邮箱测试
def mailTest():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
username = "gmailvinted@mailezu.com"
password = "g1l2o0hld84"
client = EmailClient(server, username, password)
client.connect()
start_time = datetime(2024, 11, 22)
sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱
keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式
try:
email_ids = client.fetch_recent_emails(
max_count=10,
from_email="noreply@google.com",
# subject='Email verification code',#中文邮件叫 '电子邮件验证码‘
body_keyword='RibeAchour875@gmail.com',
since=start_time,
)
# 获取时间上最新的匹配项,应用起始时间过滤器
latest_matched_email = client.filter_emails_by_sender_and_keyword(email_ids, sender_pattern, keyword_pattern)
if latest_matched_email:
print("\n时间上最新的匹配邮件:")
print("主题:", latest_matched_email["subject"])
print("发件人:", latest_matched_email["from"])
print("内容:", latest_matched_email["content"])
print("时间:", latest_matched_email["date"])
else:
print("没有符合条件的时间上最新的匹配邮件")
# print(f"ids:{email_ids}")
# 获取所有匹配的邮件,应用起始时间过滤器
all_matched_emails = client.fetch_all_matching_emails(email_ids, sender_pattern, keyword_pattern)
if all_matched_emails:
print("\n所有匹配的邮件:")
for idx, email in enumerate(all_matched_emails):
print(f"邮件 {idx + 1}:")
print("主题:", email["subject"])
print("发件人:", email["from"])
print("内容:", email["content"], "\n") # 显示内容
print("时间:", email["date"])
else:
print("没有符合条件的所有匹配邮件")
finally:
client.disconnect()
def codeTest():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
username = "gmailvinted@mailezu.com"
password = "g1l2o0hld84"
client = EmailClient(server, username, password)
client.connect()
try:
code_receiver = GoogleCodeReceiver(client)
# 这里改成要捕获的目标邮件地址
code = code_receiver.wait_code(
username="RibeAchour875@gmail.com", timeout=300, interval=5,
start_time=datetime(2024, 11, 10))
# code = code_receiver.wait_code(username="RibeAchour875@gmail.com", timeout=300, interval=5)
print(f"收到谷歌验证码: {code}")
except TimeoutError as e:
print(e)
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
client.disconnect()
def test3():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
username = "gmailvinted@mailezu.com"
password = "g1l2o0hld84"
# server = "imap.qq.com"
# username = "bigemon@foxmail.com"
# password = "ejudkkdfiuemcaaj"
client = EmailClient(server, username, password)
client.connect()
# mailTest()
ok, email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024')
if email_ids:
print(email_ids)
sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱
keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式
all_matched_emails = client.fetch_all_matching_emails(email_ids)
if all_matched_emails:
print("\n所有匹配的邮件:")
for idx, email in enumerate(all_matched_emails):
print(f"邮件 {idx + 1}:")
print("主题:", email["subject"])
print("发件人:", email["from"])
print("内容:", email["content"], "\n") # 显示内容
print("时间:", email["date"])
else:
print("没有符合条件的所有匹配邮件")
else:
print("查不到")
# 使用示例
if __name__ == "__main__":
# test3()
codeTest()