237 lines
8.8 KiB
Python
237 lines
8.8 KiB
Python
import sqlite3
|
||
import pandas as pd
|
||
from contextlib import contextmanager
|
||
from dataclasses import dataclass
|
||
from typing import List, Optional
|
||
|
||
@dataclass
|
||
class AccountData:
|
||
email: str # 邮箱
|
||
original_password: str # 原密码
|
||
original_aux_email: str # 原辅助邮箱
|
||
new_password: str # 新密码
|
||
new_aux_email: str # 新辅助邮箱
|
||
change_status: str # 是否更改完成
|
||
proxy: str # 代理
|
||
region: Optional[str] = None # 区域(可选)
|
||
|
||
class AccountManagerSQLite:
|
||
def __init__(self, db_path="accounts.db", debug=False):
|
||
self.db_path = db_path
|
||
self.debug = debug
|
||
self._initialize_db()
|
||
|
||
def debug_print(self, *args):
|
||
"""仅在 debug 模式下输出调试信息"""
|
||
if self.debug:
|
||
print(*args)
|
||
|
||
def _initialize_db(self):
|
||
"""初始化或检查数据库结构"""
|
||
with self._get_connection() as conn:
|
||
# 启用 WAL 模式
|
||
current_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
|
||
if current_mode != "wal":
|
||
self.debug_print("切换到 WAL 模式...")
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
|
||
# 检查表是否存在
|
||
cursor = conn.execute("PRAGMA table_info(accounts)")
|
||
columns = [row[1] for row in cursor.fetchall()]
|
||
|
||
# 定义表结构
|
||
required_columns = [
|
||
"email", "original_password", "original_aux_email",
|
||
"new_password", "new_aux_email", "change_status", "proxy", "region"
|
||
]
|
||
|
||
if not columns: # 表不存在
|
||
self.debug_print("表不存在,正在创建表...")
|
||
elif columns != required_columns: # 表存在但结构不一致
|
||
self.debug_print(f"表结构不一致,当前列: {columns}, 期望列: {required_columns}")
|
||
self.debug_print("正在重建表...")
|
||
conn.execute("DROP TABLE IF EXISTS accounts")
|
||
else: # 表存在且结构一致
|
||
self.debug_print("表结构检查通过,无需更改。")
|
||
return
|
||
|
||
# 创建表
|
||
conn.execute("""
|
||
CREATE TABLE accounts (
|
||
email TEXT PRIMARY KEY,
|
||
original_password TEXT,
|
||
original_aux_email TEXT,
|
||
new_password TEXT,
|
||
new_aux_email TEXT,
|
||
change_status TEXT,
|
||
proxy TEXT,
|
||
region TEXT
|
||
)
|
||
""")
|
||
self.debug_print("表已创建。")
|
||
|
||
@contextmanager
|
||
def _get_connection(self):
|
||
"""获取 SQLite 数据库连接"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
try:
|
||
yield conn
|
||
finally:
|
||
conn.close()
|
||
|
||
def clear(self):
|
||
"""清空数据库中的所有数据"""
|
||
with self._get_connection() as conn:
|
||
try:
|
||
conn.execute("DELETE FROM accounts")
|
||
conn.commit()
|
||
self.debug_print("数据库已清空。")
|
||
except sqlite3.Error as e:
|
||
conn.rollback()
|
||
self.debug_print(f"清空数据库失败:{e}")
|
||
raise
|
||
|
||
def import_data(self, account_list: List[AccountData]):
|
||
"""批量导入数据"""
|
||
with self._get_connection() as conn:
|
||
try:
|
||
conn.executemany("""
|
||
INSERT OR REPLACE INTO accounts (
|
||
email, original_password, original_aux_email,
|
||
new_password, new_aux_email, change_status, proxy, region
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", [
|
||
(
|
||
account.email, account.original_password, account.original_aux_email,
|
||
account.new_password, account.new_aux_email, account.change_status,
|
||
account.proxy, account.region
|
||
) for account in account_list
|
||
])
|
||
conn.commit()
|
||
self.debug_print(f"成功导入 {len(account_list)} 条数据!")
|
||
except sqlite3.Error as e:
|
||
conn.rollback()
|
||
self.debug_print(f"Error importing data: {e}")
|
||
raise
|
||
|
||
def export_data(self) -> List[AccountData]:
|
||
"""导出所有数据"""
|
||
with self._get_connection() as conn:
|
||
cursor = conn.execute("SELECT * FROM accounts")
|
||
rows = cursor.fetchall()
|
||
return [AccountData(*row) for row in rows]
|
||
|
||
def query(self, **kwargs) -> List[AccountData]:
|
||
"""查询数据"""
|
||
query = "SELECT * FROM accounts WHERE " + " AND ".join([f"{key} = ?" for key in kwargs])
|
||
values = tuple(kwargs.values())
|
||
|
||
with self._get_connection() as conn:
|
||
cursor = conn.execute(query, values)
|
||
rows = cursor.fetchall()
|
||
return [AccountData(*row) for row in rows]
|
||
|
||
def update_record(self, email: str, **kwargs):
|
||
"""更新记录的指定字段"""
|
||
if not kwargs:
|
||
raise ValueError("没有指定任何更新的字段")
|
||
|
||
# 动态生成 SQL 的 SET 子句
|
||
set_clause = ", ".join([f"{key} = ?" for key in kwargs.keys()])
|
||
values = list(kwargs.values())
|
||
values.append(email) # 将 email 添加到参数列表的最后
|
||
|
||
query = f"""
|
||
UPDATE accounts
|
||
SET {set_clause}
|
||
WHERE email = ?
|
||
"""
|
||
|
||
with self._get_connection() as conn:
|
||
try:
|
||
conn.execute(query, values)
|
||
conn.commit()
|
||
self.debug_print(f"成功更新记录: {email}")
|
||
except sqlite3.Error as e:
|
||
conn.rollback()
|
||
self.debug_print(f"更新记录失败:{e}")
|
||
raise
|
||
|
||
def delete_account(self, email: str):
|
||
"""删除某个账户"""
|
||
with self._get_connection() as conn:
|
||
try:
|
||
conn.execute("DELETE FROM accounts WHERE email = ?", (email,))
|
||
conn.commit()
|
||
self.debug_print(f"成功删除账户: {email}")
|
||
except sqlite3.Error as e:
|
||
conn.rollback()
|
||
self.debug_print(f"Error deleting account: {e}")
|
||
raise
|
||
|
||
def import_from_excel(self, excel_path: str, clear_old: bool = False):
|
||
"""从 Excel 文件导入数据"""
|
||
try:
|
||
if clear_old:
|
||
self.clear()
|
||
|
||
# 读取 Excel 文件
|
||
df = pd.read_excel(excel_path, sheet_name=0)
|
||
|
||
# 校验表格格式
|
||
required_columns = [
|
||
"邮箱", "原密码", "原辅助邮箱", "新密码", "新辅助邮箱", "是否更改完成", "代理"
|
||
]
|
||
optional_columns = ["区域"]
|
||
all_columns = required_columns + optional_columns
|
||
|
||
missing_columns = [col for col in required_columns if col not in df.columns]
|
||
if missing_columns:
|
||
raise ValueError(f"表格缺少必要的列:{missing_columns}")
|
||
|
||
# 添加缺失的可选列并填充默认值
|
||
for col in optional_columns:
|
||
if col not in df.columns:
|
||
df[col] = None
|
||
|
||
# 转换数据为 AccountData 对象
|
||
account_list = [
|
||
AccountData(
|
||
email=row["邮箱"],
|
||
original_password=row["原密码"],
|
||
original_aux_email=row["原辅助邮箱"],
|
||
new_password=row["新密码"],
|
||
new_aux_email=row["新辅助邮箱"],
|
||
change_status=row["是否更改完成"],
|
||
proxy=row["代理"],
|
||
region=row.get("区域", None)
|
||
)
|
||
for _, row in df.iterrows()
|
||
]
|
||
|
||
self.import_data(account_list)
|
||
except Exception as e:
|
||
self.debug_print(f"导入失败:{e}")
|
||
raise
|
||
|
||
def export_to_excel(self, excel_path: str):
|
||
"""导出数据到 Excel 文件"""
|
||
try:
|
||
accounts = self.export_data()
|
||
df = pd.DataFrame([{
|
||
"邮箱": account.email,
|
||
"原密码": account.original_password,
|
||
"原辅助邮箱": account.original_aux_email,
|
||
"新密码": account.new_password,
|
||
"新辅助邮箱": account.new_aux_email,
|
||
"是否更改完成": account.change_status,
|
||
"代理": account.proxy,
|
||
"区域": account.region
|
||
} for account in accounts])
|
||
|
||
df.to_excel(excel_path, index=False, sheet_name="Accounts")
|
||
self.debug_print(f"成功导出数据到 {excel_path}!")
|
||
except Exception as e:
|
||
self.debug_print(f"导出失败:{e}")
|
||
raise
|