453 lines
17 KiB
Python
453 lines
17 KiB
Python
import imaplib
|
||
import email
|
||
import re
|
||
from email.header import decode_header
|
||
from datetime import datetime, timezone
|
||
from email.utils import parsedate_to_datetime
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
|
||
|
||
def to_utf7_imap(text):
|
||
"""
|
||
将文本转换为 IMAP UTF-7 编码格式。
|
||
|
||
参数:
|
||
text (str): 要转换的文本。
|
||
|
||
返回:
|
||
str: 转换后的 UTF-7 编码文本。
|
||
"""
|
||
if not text:
|
||
return ""
|
||
# 匹配所有非 ASCII 字符
|
||
def encode_match(match):
|
||
return "&" + match.group(0).encode("utf-16be").hex().upper() + "-"
|
||
|
||
# 替换非 ASCII 字符为 UTF-7 格式
|
||
return re.sub(r"[^\x20-\x7E]", encode_match, text)
|
||
|
||
def build_search_criteria(from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False):
|
||
"""
|
||
构建 IMAP 搜索条件字符串,支持简单查询逻辑。
|
||
|
||
参数:
|
||
from_email (str): 发件人邮箱地址。
|
||
subject (str): 邮件标题的关键字。
|
||
body_keyword (str): 邮件正文的关键字。
|
||
since (datetime): 起始时间,筛选此时间之后的邮件。
|
||
before (datetime): 截止时间,筛选此时间之前的邮件。
|
||
unseen (bool): 是否仅筛选未读邮件。
|
||
|
||
返回:
|
||
str: 构建的 IMAP 搜索条件字符串。
|
||
"""
|
||
criteria = []
|
||
|
||
if unseen:
|
||
criteria.append("UNSEEN")
|
||
|
||
if from_email:
|
||
criteria.append(f'FROM "{to_utf7_imap(from_email)}"')
|
||
|
||
if subject:
|
||
criteria.append(f'SUBJECT "{to_utf7_imap(subject)}"')
|
||
|
||
if body_keyword:
|
||
criteria.append(f'BODY "{to_utf7_imap(body_keyword)}"')
|
||
|
||
if since:
|
||
criteria.append(f'SINCE {since.strftime("%d-%b-%Y")}')
|
||
|
||
if before:
|
||
criteria.append(f'BEFORE {before.strftime("%d-%b-%Y")}')
|
||
|
||
# 用空格拼接所有条件(IMAP 默认 AND 逻辑)
|
||
return " ".join(criteria) if criteria else "ALL"
|
||
|
||
def parse_email_date(date_str, default_tz=timezone.utc):
|
||
"""
|
||
安全解析邮件日期。
|
||
"""
|
||
if not date_str:
|
||
return None
|
||
try:
|
||
email_date = parsedate_to_datetime(date_str)
|
||
if email_date and email_date.tzinfo is None:
|
||
email_date = email_date.replace(tzinfo=default_tz)
|
||
return email_date
|
||
except Exception as e:
|
||
print(f"Warning: Failed to parse date '{date_str}': {e}")
|
||
return None
|
||
|
||
class EmailClient:
|
||
def __init__(self, host, username, password):
|
||
self.host = host
|
||
self.username = username
|
||
self.password = password
|
||
self.connection = None
|
||
|
||
def connect(self):
|
||
self.connection = imaplib.IMAP4(self.host)
|
||
self.connection.login(self.username, self.password)
|
||
self.connection.select("inbox")
|
||
|
||
def ensure_connection(self):
|
||
"""
|
||
确保 IMAP 连接是活跃的,如果断开或未连接则重连。
|
||
"""
|
||
if self.connection is None:
|
||
print("Connection is not established. Reconnecting...")
|
||
self.connect() # 如果连接不存在,则进行连接
|
||
return
|
||
try:
|
||
self.connection.noop() # 发送 NOOP 命令以检查连接是否有效
|
||
except (imaplib.IMAP4.abort, imaplib.IMAP4.error):
|
||
print("Connection lost. Reconnecting...")
|
||
self.connect()
|
||
|
||
|
||
def disconnect(self):
|
||
if self.connection:
|
||
self.connection.logout()
|
||
|
||
def search_emails(self, search_criteria):
|
||
"""
|
||
使用自由构建的搜索条件执行 IMAP 搜索。
|
||
|
||
参数:
|
||
search_criteria (str): 用于 IMAP 搜索的条件字符串。
|
||
|
||
返回:
|
||
tuple: 搜索结果 (result, data),其中:
|
||
- result (str): 搜索状态 ("OK" 表示成功)。
|
||
- data (list): 搜索到的邮件 ID 列表。
|
||
|
||
Raises:
|
||
Exception: 如果搜索失败。
|
||
"""
|
||
self.ensure_connection() # 确保连接
|
||
result, data = self.connection.search(None, search_criteria)
|
||
if result != "OK":
|
||
raise Exception(f"Failed to search emails with criteria: {search_criteria}")
|
||
|
||
email_ids = data[0].split()
|
||
if not email_ids:
|
||
print("Debug: No matching emails found.")
|
||
return result, [] # 返回空列表以便后续处理
|
||
|
||
return result, data
|
||
|
||
def fetch_recent_emails(self, from_email=None, subject=None, body_keyword=None, since=None, before=None, unseen=False, max_count=100):
|
||
"""
|
||
使用构建的搜索条件查询最近的邮件。
|
||
|
||
参数:
|
||
from_email (str): 发件人邮箱地址。
|
||
subject (str): 邮件标题关键字。
|
||
body_keyword (str): 邮件正文关键字。
|
||
since (datetime): 起始时间。
|
||
before (datetime): 截止时间。
|
||
unseen (bool): 是否只查询未读邮件。
|
||
max_count (int): 返回的邮件数量上限。
|
||
|
||
返回:
|
||
list: 符合条件的邮件 ID 列表,按接收时间倒序排列。
|
||
"""
|
||
self.ensure_connection() # 确保连接
|
||
search_criteria = build_search_criteria(
|
||
from_email=from_email,
|
||
subject=subject,
|
||
body_keyword=body_keyword,
|
||
since=since,
|
||
before=before,
|
||
unseen=unseen
|
||
)
|
||
result, data = self.search_emails(search_criteria)
|
||
|
||
# 检查 `data` 是否有效
|
||
if not data or not data[0]:
|
||
print("Debug: No emails found matching the criteria.")
|
||
return [] # 返回空列表
|
||
|
||
# 正常处理邮件 ID
|
||
email_ids = data[0].split()[-max_count:]
|
||
return list(reversed(email_ids))
|
||
|
||
|
||
|
||
def fetch_all_matching_emails(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None, max_results=None):
|
||
if max_results is not None and max_results <= 0:
|
||
raise ValueError("max_results must be a positive integer or None")
|
||
|
||
sender_regex = re.compile(sender_pattern) if sender_pattern else None
|
||
keyword_regex = re.compile(keyword_pattern) if keyword_pattern else None
|
||
subject_regex = re.compile(subject_pattern) if subject_pattern else None
|
||
all_matched_emails = []
|
||
self.ensure_connection() # 确保连接
|
||
|
||
for email_id in email_ids:
|
||
try:
|
||
result, data = self.connection.fetch(email_id, "(RFC822)")
|
||
if result != "OK":
|
||
print(f"Warning: Failed to fetch email with ID {email_id}")
|
||
continue
|
||
|
||
msg = email.message_from_bytes(data[0][1])
|
||
|
||
# 检查发件人地址是否匹配
|
||
from_email = email.utils.parseaddr(msg["From"])[1] if msg["From"] else ""
|
||
if sender_regex and not sender_regex.search(from_email):
|
||
continue
|
||
|
||
# 检查标题是否匹配
|
||
subject, encoding = decode_header(msg["Subject"])[0]
|
||
subject = subject.decode(encoding or "utf-8") if isinstance(subject, bytes) else subject
|
||
if subject_regex and not subject_regex.search(subject):
|
||
continue
|
||
|
||
# 处理邮件内容
|
||
content = ""
|
||
for part in msg.walk():
|
||
content_type = part.get_content_type()
|
||
try:
|
||
payload = part.get_payload(decode=True)
|
||
|
||
# 增加对 payload 类型的判断
|
||
if isinstance(payload, bytes):
|
||
if content_type == "text/plain":
|
||
content += payload.decode(part.get_content_charset() or "utf-8")
|
||
elif content_type == "text/html" and not content:
|
||
content += payload.decode(part.get_content_charset() or "utf-8")
|
||
else:
|
||
print(f"Skipping unexpected payload type: {type(payload)}, content_type: {content_type}")
|
||
|
||
except Exception as e:
|
||
print(f"Error processing MIME part: {e}")
|
||
|
||
# 检查内容关键字是否匹配
|
||
if keyword_regex and not keyword_regex.search(content):
|
||
continue
|
||
|
||
# 检查邮件日期是否在范围内
|
||
date_str = msg.get("Date")
|
||
email_date = parse_email_date(date_str) if date_str else None
|
||
if start_time and email_date and email_date < start_time:
|
||
continue
|
||
|
||
# 保存匹配的邮件
|
||
matched_email = {
|
||
"subject": subject,
|
||
"from": from_email,
|
||
"content": content,
|
||
"date": email_date
|
||
}
|
||
all_matched_emails.append(matched_email)
|
||
|
||
# 如果达到了最大匹配数,停止处理
|
||
if max_results and len(all_matched_emails) >= max_results:
|
||
break
|
||
|
||
except Exception as e:
|
||
print(f"Error: Failed to process email ID {email_id}: {e}")
|
||
|
||
return all_matched_emails
|
||
|
||
|
||
|
||
|
||
def filter_emails_by_sender_and_keyword(self, email_ids, sender_pattern=None, keyword_pattern=None, subject_pattern=None, start_time=None):
|
||
matched_emails = self.fetch_all_matching_emails(
|
||
email_ids=email_ids,
|
||
sender_pattern=sender_pattern,
|
||
keyword_pattern=keyword_pattern,
|
||
subject_pattern=subject_pattern,
|
||
start_time=start_time,
|
||
max_results=1
|
||
)
|
||
return matched_emails[0] if matched_emails else None
|
||
|
||
|
||
class GoogleCodeReceiver:
|
||
def __init__(self, email_client):
|
||
self.email_client = email_client
|
||
self.last_timestamp = datetime.now(timezone.utc)
|
||
|
||
def _update_last_timestamp(self):
|
||
self.last_timestamp = datetime.now(timezone.utc)
|
||
|
||
def wait_code(self, username, timeout=60, interval=3, start_time=None):
|
||
"""
|
||
等待 Google 验证码邮件。
|
||
|
||
参数:
|
||
username (str): 用户名,用于在正文中检索匹配邮件。
|
||
timeout (int): 最大等待时间,单位为秒。
|
||
interval (int): 轮询间隔,单位为秒。
|
||
start_time (datetime): 开始检索邮件的时间。默认值为当前时间。
|
||
|
||
返回:
|
||
str: 提取的验证码。
|
||
"""
|
||
# 如果未指定 `start_time`,使用当前时间
|
||
if start_time is None:
|
||
start_time = datetime.now(timezone.utc) # 默认使用当前 UTC 时间
|
||
elif start_time.tzinfo is None:
|
||
# 如果 `start_time` 没有时区信息,假定为本地时间,并转为 UTC
|
||
local_time = start_time.astimezone()
|
||
start_time = local_time.astimezone(timezone.utc)
|
||
else:
|
||
# 如果已经有时区信息,统一转为 UTC
|
||
start_time = start_time.astimezone(timezone.utc)
|
||
|
||
# 确定 `since` 时间,取 `start_time` 往前推 2 天
|
||
since = start_time - timedelta(days=2)
|
||
|
||
# 更新 `last_timestamp`
|
||
self._update_last_timestamp()
|
||
|
||
end_time = datetime.now(timezone.utc) + timedelta(seconds=timeout)
|
||
subject_pattern = r"(?:Email verification code|电子邮件验证码)[::]?\s*(\d{6})"
|
||
# sender_pattern = r"noreply@google\.com"
|
||
# keyword_pattern = re.escape(username)
|
||
|
||
while datetime.now(timezone.utc) < end_time:
|
||
try:
|
||
# Fetch recent emails with the adjusted `since`
|
||
email_ids = self.email_client.fetch_recent_emails(
|
||
max_count=10,
|
||
from_email="noreply@google.com",
|
||
body_keyword=username,
|
||
since=since # 从 `start_time` 往前推 2 天开始检索
|
||
)
|
||
matched_email = self.email_client.filter_emails_by_sender_and_keyword(
|
||
email_ids=email_ids,
|
||
# sender_pattern=sender_pattern,
|
||
# keyword_pattern=keyword_pattern,
|
||
subject_pattern=subject_pattern,
|
||
start_time=start_time # 确保只检索 `start_time` 之后的邮件
|
||
)
|
||
|
||
if matched_email:
|
||
match = re.search(subject_pattern, matched_email["subject"])
|
||
if match:
|
||
return match.group(1)
|
||
|
||
except Exception as e:
|
||
print(f"Warning: Error occurred while fetching code: {e}")
|
||
|
||
time.sleep(interval)
|
||
|
||
raise TimeoutError("Timeout waiting for verification code.")
|
||
|
||
|
||
|
||
|
||
# 老的通用邮箱测试
|
||
def mailTest():
|
||
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
|
||
username = "gmailvinted@mailezu.com"
|
||
password = "g1l2o0hld84"
|
||
client = EmailClient(server, username, password)
|
||
client.connect()
|
||
start_time = datetime(2024, 11, 22)
|
||
sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱
|
||
keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式
|
||
try:
|
||
|
||
email_ids = client.fetch_recent_emails(
|
||
max_count=10,
|
||
from_email="noreply@google.com",
|
||
# subject='Email verification code',#中文邮件叫 '电子邮件验证码‘
|
||
body_keyword='RibeAchour875@gmail.com',
|
||
since=start_time,
|
||
)
|
||
|
||
# 获取时间上最新的匹配项,应用起始时间过滤器
|
||
latest_matched_email = client.filter_emails_by_sender_and_keyword(email_ids, sender_pattern, keyword_pattern)
|
||
if latest_matched_email:
|
||
print("\n时间上最新的匹配邮件:")
|
||
print("主题:", latest_matched_email["subject"])
|
||
print("发件人:", latest_matched_email["from"])
|
||
print("内容:", latest_matched_email["content"])
|
||
print("时间:",latest_matched_email["date"])
|
||
else:
|
||
print("没有符合条件的时间上最新的匹配邮件")
|
||
|
||
# print(f"ids:{email_ids}")
|
||
# 获取所有匹配的邮件,应用起始时间过滤器
|
||
all_matched_emails = client.fetch_all_matching_emails(email_ids, sender_pattern, keyword_pattern)
|
||
if all_matched_emails:
|
||
print("\n所有匹配的邮件:")
|
||
for idx, email in enumerate(all_matched_emails):
|
||
print(f"邮件 {idx+1}:")
|
||
print("主题:", email["subject"])
|
||
print("发件人:", email["from"])
|
||
print("内容:", email["content"], "\n") # 显示内容
|
||
print("时间:",email["date"])
|
||
else:
|
||
print("没有符合条件的所有匹配邮件")
|
||
|
||
finally:
|
||
client.disconnect()
|
||
|
||
def codeTest():
|
||
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
|
||
username = "gmailvinted@mailezu.com"
|
||
password = "g1l2o0hld84"
|
||
|
||
client = EmailClient(server, username, password)
|
||
# client.connect()
|
||
code_receiver = GoogleCodeReceiver(client)
|
||
|
||
|
||
code = ''
|
||
try:
|
||
code = code_receiver.wait_code(
|
||
username="elenagrosu265@gmail.com", timeout=2, interval=1,
|
||
start_time=datetime(2024, 11, 10))
|
||
print(f"收到谷歌验证码: {code}")
|
||
except Exception as e:
|
||
print(f"An unexpected error occurred: {e}")
|
||
|
||
if not code:
|
||
print("并没有收到验证码")
|
||
|
||
def test3():
|
||
server = "server-10474.cuiqiu.vip" # 替换为你的 IMAP 服务器地址
|
||
username = "gmailvinted@mailezu.com"
|
||
password = "g1l2o0hld84"
|
||
|
||
# server = "imap.qq.com"
|
||
# username = "bigemon@foxmail.com"
|
||
# password = "ejudkkdfiuemcaaj"
|
||
|
||
client = EmailClient(server, username, password)
|
||
client.connect()
|
||
|
||
# mailTest()
|
||
|
||
ok,email_ids = client.search_emails('FROM "noreply@google.com" BODY "RibeAchour875@gmail.com" SINCE 11-Nov-2024')
|
||
if email_ids:
|
||
print(email_ids)
|
||
sender_pattern = r".*google.*" # 使用正则表达式匹配发件人邮箱
|
||
keyword_pattern = r".*" # 替换为你想要匹配的关键字或正则表达式
|
||
all_matched_emails = client.fetch_all_matching_emails(email_ids)
|
||
if all_matched_emails:
|
||
print("\n所有匹配的邮件:")
|
||
for idx, email in enumerate(all_matched_emails):
|
||
print(f"邮件 {idx+1}:")
|
||
print("主题:", email["subject"])
|
||
print("发件人:", email["from"])
|
||
print("内容:", email["content"], "\n") # 显示内容
|
||
print("时间:",email["date"])
|
||
else:
|
||
print("没有符合条件的所有匹配邮件")
|
||
else:
|
||
print("查不到")
|
||
# 使用示例
|
||
if __name__ == "__main__":
|
||
# test3()
|
||
codeTest()
|
||
|