Gmail/mail.py

453 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import imaplib
import email
import re
from email.header import decode_header
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
import time
from datetime import datetime, timedelta
def to_utf7_imap(text):
"""
将文本转换为 IMAP UTF-7 编码格式。
参数:
text (str): 要转换的文本。
返回:
str: 转换后的 UTF-7 编码文本。
"""
if not text:
return ""
# 匹配所有非 ASCII 字符
def encode_match(match):
return "&" + match.group(0).encode("utf-16be").hex().upper() + "-"
# 替换非 ASCII 字符为 UTF-7 格式
return re.sub(r"[^\x20-\x7E]", encode_match, text)
def build_search_criteria(from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False):
"""
构建 IMAP 搜索条件字符串,支持简单查询逻辑。
参数:
from_email (str): 发件人邮箱地址。
subject (str): 邮件标题的关键字。
body_keyword (str): 邮件正文的关键字。
since (datetime): 起始时间,筛选此时间之后的邮件。
before (datetime): 截止时间,筛选此时间之前的邮件。
unseen (bool): 是否仅筛选未读邮件。
返回:
str: 构建的 IMAP 搜索条件字符串。
"""
criteria = []
if unseen:
criteria.append("UNSEEN")
if from_email:
criteria.append(f'FROM "{to_utf7_imap(from_email)}"')
if subject:
criteria.append(f'SUBJECT "{to_utf7_imap(subject)}"')
if body_keyword:
criteria.append(f'BODY "{to_utf7_imap(body_keyword)}"')
if since:
criteria.append(f'SINCE {since.strftime("%d-%b-%Y")}')
if before:
criteria.append(f'BEFORE {before.strftime("%d-%b-%Y")}')
# 用空格拼接所有条件IMAP 默认 AND 逻辑)
return " ".join(criteria) if criteria else "ALL"
def parse_email_date(date_str, default_tz=timezone.utc):
"""
安全解析邮件日期。
"""
if not date_str:
return None
try:
email_date = parsedate_to_datetime(date_str)
if email_date and email_date.tzinfo is None:
email_date = email_date.replace(tzinfo=default_tz)
return email_date
except Exception as e:
print(f"Warning: Failed to parse date '{date_str}': {e}")
return None
class EmailClient:
def __init__(self, host, username, password):
self.host = host
self.username = username
self.password = password
self.connection = None
def connect(self):
self.connection = imaplib.IMAP4(self.host)
self.connection.login(self.username, self.password)
self.connection.select("inbox")
def ensure_connection(self):
"""
确保 IMAP 连接是活跃的,如果断开或未连接则重连。
"""
if self.connection is None:
print("Connection is not established. Reconnecting...")
self.connect() # 如果连接不存在,则进行连接
return
try:
self.connection.noop() # 发送 NOOP 命令以检查连接是否有效
except (imaplib.IMAP4.abort, imaplib.IMAP4.error):
print("Connection lost. Reconnecting...")
self.connect()
def disconnect(self):
if self.connection:
self.connection.logout()
def search_emails(self, search_criteria):
"""
使用自由构建的搜索条件执行 IMAP 搜索。
参数:
search_criteria (str): 用于 IMAP 搜索的条件字符串。
返回:
tuple: 搜索结果 (result, data),其中:
- result (str): 搜索状态 ("OK" 表示成功)。
- data (list): 搜索到的邮件 ID 列表。
Raises:
Exception: 如果搜索失败。
"""
self.ensure_connection() # 确保连接
result, data = self.connection.search(None, search_criteria)
if result != "OK":
raise Exception(f"Failed to search emails with criteria: {search_criteria}")
email_ids = data[0].split()
if not email_ids:
print("Debug: No matching emails found.")
return result, [] # 返回空列表以便后续处理
return result, data
def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False, max_count=100):
"""
使用构建的搜索条件查询最近的邮件。
参数:
from_email (str): 发件人邮箱地址。
subject (str): 邮件标题关键字。
body_keyword (str): 邮件正文关键字。
since (datetime): 起始时间。
before (datetime): 截止时间。
unseen (bool): 是否只查询未读邮件。
max_count (int): 返回的邮件数量上限。
返回:
list: 符合条件的邮件 ID 列表,按接收时间倒序排列。
"""
self.ensure_connection() # 确保连接
search_criteria = build_search_criteria(
from_email=from_email,
subject=subject,
body_keyword=body_keyword,
since=since,
before=before,
unseen=unseen
)
result, data = self.search_emails(search_criteria)
# 检查 `data` 是否有效
if not data or not data[0]:
print("Debug: No emails found matching the criteria.")
return [] # 返回空列表
# 正常处理邮件 ID
email_ids = data[0].split()[-max_count:]
return list(reversed(email_ids))
def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None, max_results=None):
if max_results is not None and max_results <= 0:
raise ValueError("max_results must be a positive integer or None")
sender_regex = re.compile(sender_pattern) if sender_pattern else None
keyword_regex = re.compile(keyword_pattern) if keyword_pattern else None
subject_regex = re.compile(subject_pattern) if subject_pattern else None
all_matched_emails = []
self.ensure_connection() # 确保连接
for email_id in email_ids:
try:
result, data = self.connection.fetch(email_id, "(RFC822)")
if result != "OK":
print(f"Warning: Failed to fetch email with ID {email_id}")
continue
msg = email.message_from_bytes(data[0][1])
# 检查发件人地址是否匹配
from_email = email.utils.parseaddr(msg["From"])[1] if msg["From"] else ""
if sender_regex and not sender_regex.search(from_email):
continue
# 检查标题是否匹配
subject, encoding = decode_header(msg["Subject"])[0]
subject = subject.decode(encoding or "utf-8") if isinstance(subject, bytes) else subject
if subject_regex and not subject_regex.search(subject):
continue
# 处理邮件内容
content = ""
for part in msg.walk():
content_type = part.get_content_type()
try:
payload = part.get_payload(decode=True)
# 增加对 payload 类型的判断
if isinstance(payload, bytes):
if content_type == "text/plain":
content += payload.decode(part.get_content_charset() or "utf-8")
elif content_type == "text/html" and not content:
content += payload.decode(part.get_content_charset() or "utf-8")
else:
print(f"Skipping unexpected payload type: {type(payload)}, content_type: {content_type}")
except Exception as e:
print(f"Error processing MIME part: {e}")
# 检查内容关键字是否匹配
if keyword_regex and not keyword_regex.search(content):
continue
# 检查邮件日期是否在范围内
date_str = msg.get("Date")
email_date = parse_email_date(date_str) if date_str else None
if start_time and email_date and email_date < start_time:
continue
# 保存匹配的邮件
matched_email = {
"subject": subject,
"from": from_email,
"content": content,
"date": email_date
}
all_matched_emails.append(matched_email)
# 如果达到了最大匹配数,停止处理
if max_results and len(all_matched_emails) >= max_results:
break
except Exception as e:
print(f"Error: Failed to process email ID {email_id}: {e}")
return all_matched_emails
def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None):
matched_emails = self.fetch_all_matching_emails(
email_ids=email_ids,
sender_pattern=sender_pattern,
keyword_pattern=keyword_pattern,
subject_pattern=subject_pattern,
start_time=start_time,
max_results=1
)
return matched_emails[0] if matched_emails else None
class GoogleCodeReceiver:
def __init__(self, email_client):
self.email_client = email_client
self.last_timestamp = datetime.now(timezone.utc)
def _update_last_timestamp(self):
self.last_timestamp = datetime.now(timezone.utc)
def wait_code(self, username, timeout=60, interval=3, start_time=None):
"""
等待 Google 验证码邮件。
参数:
username (str): 用户名,用于在正文中检索匹配邮件。
timeout (int): 最大等待时间,单位为秒。
interval (int): 轮询间隔,单位为秒。
start_time (datetime): 开始检索邮件的时间。默认值为当前时间。
返回:
str: 提取的验证码。
"""
# 如果未指定 `start_time`,使用当前时间
if start_time is None:
start_time = datetime.now(timezone.utc) # 默认使用当前 UTC 时间
elif start_time.tzinfo is None:
# 如果 `start_time` 没有时区信息,假定为本地时间,并转为 UTC
local_time = start_time.astimezone()
start_time = local_time.astimezone(timezone.utc)
else:
# 如果已经有时区信息,统一转为 UTC
start_time = start_time.astimezone(timezone.utc)
# 确定 `since` 时间,取 `start_time` 往前推 2 天
since = start_time - timedelta(days=2)
# 更新 `last_timestamp`
self._update_last_timestamp()
end_time = datetime.now(timezone.utc) + timedelta(seconds=timeout)
subject_pattern = r"(?:Email verification code|电子邮件验证码)[:]?\s*(\d{6})"
# sender_pattern = r"noreply@google\.com"
# keyword_pattern = re.escape(username)
while datetime.now(timezone.utc) < end_time:
try:
# Fetch recent emails with the adjusted `since`
email_ids = self.email_client.fetch_recent_emails(
max_count=10,
from_email="noreply@google.com",
body_keyword=username,
since=since # 从 `start_time` 往前推 2 天开始检索
)
matched_email = self.email_client.filter_emails_by_sender_and_keyword(
email_ids=email_ids,
# sender_pattern=sender_pattern,
# keyword_pattern=keyword_pattern,
subject_pattern=subject_pattern,
start_time=start_time # 确保只检索 `start_time` 之后的邮件
)
if matched_email:
match = re.search(subject_pattern, matched_email["subject"])
if match:
return match.group(1)
except Exception as e:
print(f"Warning: Error occurred while fetching code: {e}")
time.sleep(interval)
raise TimeoutError("Timeout waiting for verification code.")
# 老的通用邮箱测试
def mailTest():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
username = "gmailvinted@mailezu.com"
password = "g1l2o0hld84"
client = EmailClient(server, username, password)
client.connect()
start_time = datetime(2024, 11, 22)
sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱
keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式
try:
email_ids = client.fetch_recent_emails(
max_count=10,
from_email="noreply@google.com",
# subject='Email verification code',#中文邮件叫 '电子邮件验证码‘
body_keyword='RibeAchour875@gmail.com',
since=start_time,
)
# 获取时间上最新的匹配项,应用起始时间过滤器
latest_matched_email = client.filter_emails_by_sender_and_keyword(email_ids, sender_pattern, keyword_pattern)
if latest_matched_email:
print("\n时间上最新的匹配邮件:")
print("主题:", latest_matched_email["subject"])
print("发件人:", latest_matched_email["from"])
print("内容:", latest_matched_email["content"])
print("时间:",latest_matched_email["date"])
else:
print("没有符合条件的时间上最新的匹配邮件")
# print(f"ids:{email_ids}")
# 获取所有匹配的邮件,应用起始时间过滤器
all_matched_emails = client.fetch_all_matching_emails(email_ids, sender_pattern, keyword_pattern)
if all_matched_emails:
print("\n所有匹配的邮件:")
for idx, email in enumerate(all_matched_emails):
print(f"邮件 {idx+1}:")
print("主题:", email["subject"])
print("发件人:", email["from"])
print("内容:", email["content"], "\n") # 显示内容
print("时间:",email["date"])
else:
print("没有符合条件的所有匹配邮件")
finally:
client.disconnect()
def codeTest():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
username = "gmailvinted@mailezu.com"
password = "g1l2o0hld84"
client = EmailClient(server, username, password)
# client.connect()
code_receiver = GoogleCodeReceiver(client)
code = ''
try:
code = code_receiver.wait_code(
username="elenagrosu265@gmail.com", timeout=2, interval=1,
start_time=datetime(2024, 11, 10))
print(f"收到谷歌验证码: {code}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
if not code:
print("并没有收到验证码")
def test3():
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
username = "gmailvinted@mailezu.com"
password = "g1l2o0hld84"
# server = "imap.qq.com"
# username = "bigemon@foxmail.com"
# password = "ejudkkdfiuemcaaj"
client = EmailClient(server, username, password)
client.connect()
# mailTest()
ok,email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024')
if email_ids:
print(email_ids)
sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱
keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式
all_matched_emails = client.fetch_all_matching_emails(email_ids)
if all_matched_emails:
print("\n所有匹配的邮件:")
for idx, email in enumerate(all_matched_emails):
print(f"邮件 {idx+1}:")
print("主题:", email["subject"])
print("发件人:", email["from"])
print("内容:", email["content"], "\n") # 显示内容
print("时间:",email["date"])
else:
print("没有符合条件的所有匹配邮件")
else:
print("查不到")
# 使用示例
if __name__ == "__main__":
# test3()
codeTest()