Gmail/account.py

237 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sqlite3
import pandas as pd
from contextlib import contextmanager
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class AccountData:
email: str # 邮箱
original_password: str # 原密码
original_aux_email: str # 原辅助邮箱
new_password: str # 新密码
new_aux_email: str # 新辅助邮箱
change_status: str # 是否更改完成
proxy: str # 代理
region: Optional[str] = None # 区域(可选)
class AccountManagerSQLite:
def __init__(self, db_path="accounts.db", debug=False):
self.db_path = db_path
self.debug = debug
self._initialize_db()
def debug_print(self, *args):
"""仅在 debug 模式下输出调试信息"""
if self.debug:
print(*args)
def _initialize_db(self):
"""初始化或检查数据库结构"""
with self._get_connection() as conn:
# 启用 WAL 模式
current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
if current_mode != "wal":
self.debug_print("切换到 WAL 模式...")
conn.execute("PRAGMA journal_mode=WAL")
# 检查表是否存在
cursor = conn.execute("PRAGMA table_info(accounts)")
columns = [row[1] for row in cursor.fetchall()]
# 定义表结构
required_columns = [
"email", "original_password", "original_aux_email",
"new_password", "new_aux_email", "change_status", "proxy", "region"
]
if not columns: # 表不存在
self.debug_print("表不存在,正在创建表...")
elif columns != required_columns: # 表存在但结构不一致
self.debug_print(f"表结构不一致,当前列: {columns}, 期望列: {required_columns}")
self.debug_print("正在重建表...")
conn.execute("DROP TABLE IF EXISTS accounts")
else: # 表存在且结构一致
self.debug_print("表结构检查通过,无需更改。")
return
# 创建表
conn.execute("""
CREATE TABLE accounts (
email TEXT PRIMARY KEY,
original_password TEXT,
original_aux_email TEXT,
new_password TEXT,
new_aux_email TEXT,
change_status TEXT,
proxy TEXT,
region TEXT
)
""")
self.debug_print("表已创建。")
@contextmanager
def _get_connection(self):
"""获取 SQLite 数据库连接"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
finally:
conn.close()
def clear(self):
"""清空数据库中的所有数据"""
with self._get_connection() as conn:
try:
conn.execute("DELETE FROM accounts")
conn.commit()
self.debug_print("数据库已清空。")
except sqlite3.Error as e:
conn.rollback()
self.debug_print(f"清空数据库失败:{e}")
raise
def import_data(self, account_list: List[AccountData]):
"""批量导入数据"""
with self._get_connection() as conn:
try:
conn.executemany("""
INSERT OR REPLACE INTO accounts (
email, original_password, original_aux_email,
new_password, new_aux_email, change_status, proxy, region
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", [
(
account.email, account.original_password, account.original_aux_email,
account.new_password, account.new_aux_email, account.change_status,
account.proxy, account.region
) for account in account_list
])
conn.commit()
self.debug_print(f"成功导入 {len(account_list)} 条数据!")
except sqlite3.Error as e:
conn.rollback()
self.debug_print(f"Error importing data: {e}")
raise
def export_data(self) -> List[AccountData]:
"""导出所有数据"""
with self._get_connection() as conn:
cursor = conn.execute("SELECT * FROM accounts")
rows = cursor.fetchall()
return [AccountData(*row) for row in rows]
def query(self, **kwargs) -> List[AccountData]:
"""查询数据"""
query = "SELECT * FROM accounts WHERE " + " AND ".join([f"{key} = ?" for key in kwargs])
values = tuple(kwargs.values())
with self._get_connection() as conn:
cursor = conn.execute(query, values)
rows = cursor.fetchall()
return [AccountData(*row) for row in rows]
def update_record(self, email: str, **kwargs):
"""更新记录的指定字段"""
if not kwargs:
raise ValueError("没有指定任何更新的字段")
# 动态生成 SQL 的 SET 子句
set_clause = ", ".join([f"{key} = ?" for key in kwargs.keys()])
values = list(kwargs.values())
values.append(email) # 将 email 添加到参数列表的最后
query = f"""
UPDATE accounts
SET {set_clause}
WHERE email = ?
"""
with self._get_connection() as conn:
try:
conn.execute(query, values)
conn.commit()
self.debug_print(f"成功更新记录: {email}")
except sqlite3.Error as e:
conn.rollback()
self.debug_print(f"更新记录失败:{e}")
raise
def delete_account(self, email: str):
"""删除某个账户"""
with self._get_connection() as conn:
try:
conn.execute("DELETE FROM accounts WHERE email = ?", (email,))
conn.commit()
self.debug_print(f"成功删除账户: {email}")
except sqlite3.Error as e:
conn.rollback()
self.debug_print(f"Error deleting account: {e}")
raise
def import_from_excel(self, excel_path: str, clear_old: bool = False):
"""从 Excel 文件导入数据"""
try:
if clear_old:
self.clear()
# 读取 Excel 文件
df = pd.read_excel(excel_path, sheet_name=0)
# 校验表格格式
required_columns = [
"邮箱", "原密码", "原辅助邮箱", "新密码", "新辅助邮箱", "是否更改完成", "代理"
]
optional_columns = ["区域"]
all_columns = required_columns + optional_columns
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"表格缺少必要的列:{missing_columns}")
# 添加缺失的可选列并填充默认值
for col in optional_columns:
if col not in df.columns:
df[col] = None
# 转换数据为 AccountData 对象
account_list = [
AccountData(
email=row["邮箱"],
original_password=row["原密码"],
original_aux_email=row["原辅助邮箱"],
new_password=row["新密码"],
new_aux_email=row["新辅助邮箱"],
change_status=row["是否更改完成"],
proxy=row["代理"],
region=row.get("区域", None)
)
for _, row in df.iterrows()
]
self.import_data(account_list)
except Exception as e:
self.debug_print(f"导入失败:{e}")
raise
def export_to_excel(self, excel_path: str):
"""导出数据到 Excel 文件"""
try:
accounts = self.export_data()
df = pd.DataFrame([{
"邮箱": account.email,
"原密码": account.original_password,
"原辅助邮箱": account.original_aux_email,
"新密码": account.new_password,
"新辅助邮箱": account.new_aux_email,
"是否更改完成": account.change_status,
"代理": account.proxy,
"区域": account.region
} for account in accounts])
df.to_excel(excel_path, index=False, sheet_name="Accounts")
self.debug_print(f"成功导出数据到 {excel_path}")
except Exception as e:
self.debug_print(f"导出失败:{e}")
raise