feat: init AI email summarization bot

- Multi-account IMAP email polling with UID tracking
- DeepSeek API integration with JSON Mode structured output
- Telegram notification with formatted MarkdownV2 message
- YAML config with dataclass-based type validation
- Graceful shutdown on SIGINT/SIGTERM
- 60s default polling interval
This commit is contained in:
2026-07-02 19:45:34 +08:00
commit e2826a3e3b
11 changed files with 364 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
.venv/
__pycache__/
*.pyc
config.yaml

2
activate.bat Normal file
View File

@@ -0,0 +1,2 @@
@echo off
start "Python Venv" /D "%~dp0" cmd /k "%~dp0.venv\Scripts\activate.bat"

1
activate.ps1 Normal file
View File

@@ -0,0 +1 @@
Start-Process -WindowStyle Normal -FilePath "powershell" -ArgumentList "-NoExit", "-Command", "& '.\.venv\Scripts\Activate.ps1'"

22
config.yaml.example Normal file
View File

@@ -0,0 +1,22 @@
email_accounts:
- imap_server: "imap.gmail.com"
imap_port: 993
username: "your_email@gmail.com"
password: "your_app_password"
- imap_server: "imap.qq.com"
imap_port: 993
username: "your_email@qq.com"
password: "your_authorization_code"
ai:
api_key: "sk-your_deepseek_api_key"
model: "deepseek-chat"
base_url: "https://api.deepseek.com"
telegram:
bot_token: "1234567890:ABCdefGHIjklmNOPqrstUVwxyz"
chat_id: "123456789"
polling:
interval_seconds: 60

49
main.py Normal file
View File

@@ -0,0 +1,49 @@
import logging
import signal
import sys
import time
from src.config import load_config
from src.summarizer import process_all
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("main")
_running = True
def _signal_handler(signum, frame):
global _running
logger.info("收到退出信号,正在停止...")
_running = False
def main():
global _running
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
cfg_path = sys.argv[1] if len(sys.argv) > 1 else "config.yaml"
cfg = load_config(cfg_path)
logger.info(f"AI邮件摘要机器人已启动轮询间隔: {cfg.polling.interval_seconds}s")
while _running:
try:
process_all(cfg)
except Exception as e:
logger.error(f"轮询出错: {e}", exc_info=True)
if _running:
for _ in range(cfg.polling.interval_seconds):
if not _running:
break
time.sleep(1)
logger.info("机器人已停止")
if __name__ == "__main__":
main()

0
src/__init__.py Normal file
View File

48
src/ai_client.py Normal file
View File

@@ -0,0 +1,48 @@
import json
from typing import Any
import requests
from src.config import AIConfig
SYSTEM_PROMPT = """你是一个邮件摘要助手。请分析邮件内容并以 JSON 格式返回结构化摘要。
返回格式必须严格遵循以下 JSON schema
{
"subject": "邮件主题",
"sender": "发件人",
"summary": "50字以内的核心摘要",
"priority": "high/medium/low",
"action_required": true/false,
"action_items": ["待办事项1", "待办事项2"],
"key_points": ["关键要点1", "关键要点2"]
}
只返回 JSON不要包含任何其他文字。"""
def summarize_email(ai_cfg: AIConfig, subject: str, sender: str, body: str) -> dict[str, Any]:
content = f"发件人: {sender}\n主题: {subject}\n正文:\n{body[:4000]}"
payload = {
"model": ai_cfg.model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": content},
],
"response_format": {"type": "json_object"},
}
headers = {
"Authorization": f"Bearer {ai_cfg.api_key}",
"Content-Type": "application/json",
}
resp = requests.post(
f"{ai_cfg.base_url.rstrip('/')}/chat/completions",
headers=headers,
json=payload,
timeout=60,
)
resp.raise_for_status()
result = resp.json()
raw = result["choices"][0]["message"]["content"]
return json.loads(raw)

54
src/config.py Normal file
View File

@@ -0,0 +1,54 @@
from dataclasses import dataclass, field
from typing import List
import yaml
@dataclass
class EmailAccount:
imap_server: str
imap_port: int
username: str
password: str
@dataclass
class AIConfig:
api_key: str
model: str
base_url: str
@dataclass
class TelegramConfig:
bot_token: str
chat_id: str
@dataclass
class PollingConfig:
interval_seconds: int
@dataclass
class Config:
email_accounts: List[EmailAccount]
ai: AIConfig
telegram: TelegramConfig
polling: PollingConfig
def load_config(path: str) -> Config:
with open(path, "r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
accounts = [EmailAccount(**a) for a in raw["email_accounts"]]
ai = AIConfig(**raw["ai"])
tg = TelegramConfig(**raw["telegram"])
polling = PollingConfig(**raw["polling"])
return Config(
email_accounts=accounts,
ai=ai,
telegram=tg,
polling=polling,
)

86
src/email_client.py Normal file
View File

@@ -0,0 +1,86 @@
import imaplib
import email
from email.header import decode_header
from email.utils import parsedate_to_datetime
from typing import Optional
from src.config import EmailAccount
class Email:
def __init__(self, uid: bytes, subject: str, sender: str, body: str, date: str):
self.uid = uid
self.subject = subject
self.sender = sender
self.body = body
self.date = date
def _decode_str(s: str) -> str:
parts = decode_header(s)
result = []
for part, charset in parts:
if isinstance(part, bytes):
try:
result.append(part.decode(charset or "utf-8", errors="replace"))
except LookupError:
result.append(part.decode("utf-8", errors="replace"))
else:
result.append(part)
return "".join(result)
def _get_text_from_msg(msg) -> str:
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
payload = part.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
elif content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
else:
payload = msg.get_payload(decode=True)
if payload:
return payload.decode("utf-8", errors="replace")
return ""
def fetch_unseen_emails(account: EmailAccount) -> list[Email]:
conn = imaplib.IMAP4_SSL(account.imap_server, account.imap_port)
conn.login(account.username, account.password)
conn.select("INBOX")
_, data = conn.uid("SEARCH", None, "UNSEEN")
uids = data[0].split() if data[0] else []
emails = []
for uid in uids:
_, msg_data = conn.uid("FETCH", uid, "RFC822")
if msg_data[0] is None:
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
subject = _decode_str(msg.get("Subject", ""))
sender = _decode_str(msg.get("From", ""))
date_str = msg.get("Date", "")
body = _get_text_from_msg(msg)
emails.append(Email(uid=uid, subject=subject, sender=sender, body=body, date=date_str))
conn.logout()
return emails
def mark_as_seen(account: EmailAccount, uids: list[bytes]):
if not uids:
return
conn = imaplib.IMAP4_SSL(account.imap_server, account.imap_port)
conn.login(account.username, account.password)
conn.select("INBOX")
for uid in uids:
conn.uid("STORE", uid, "+FLAGS", "\\Seen")
conn.logout()

42
src/summarizer.py Normal file
View File

@@ -0,0 +1,42 @@
import logging
from src.config import Config
from src.email_client import fetch_unseen_emails, mark_as_seen
from src.ai_client import summarize_email
from src.tg_bot import send_message, format_summary
logger = logging.getLogger(__name__)
def process_all(cfg: Config):
for acct in cfg.email_accounts:
try:
_process_account(cfg, acct)
except Exception as e:
logger.error(f"处理邮箱 {acct.username} 时出错: {e}", exc_info=True)
def _process_account(cfg: Config, acct):
logger.info(f"检查邮箱: {acct.username}")
emails = fetch_unseen_emails(acct)
if not emails:
logger.info(f" 没有新邮件")
return
logger.info(f" 发现 {len(emails)} 封新邮件")
seen_uids = []
for mail in emails:
try:
logger.info(f" 正在摘要: {mail.subject}")
summary = summarize_email(cfg.ai, mail.subject, mail.sender, mail.body)
text = format_summary(summary)
send_message(cfg.telegram, text)
seen_uids.append(mail.uid)
logger.info(f" 已发送到 Telegram")
except Exception as e:
logger.error(f" 处理邮件 '{mail.subject}' 失败: {e}", exc_info=True)
seen_uids.append(mail.uid)
if seen_uids:
mark_as_seen(acct, seen_uids)

56
src/tg_bot.py Normal file
View File

@@ -0,0 +1,56 @@
import requests
from src.config import TelegramConfig
def send_message(tg_cfg: TelegramConfig, text: str):
url = f"https://api.telegram.org/bot{tg_cfg.bot_token}/sendMessage"
payload = {
"chat_id": tg_cfg.chat_id,
"text": text,
"parse_mode": "MarkdownV2",
}
resp = requests.post(url, json=payload, timeout=30)
resp.raise_for_status()
_priority_icon = {"high": "🔴", "medium": "🟡", "low": "🟢"}
def format_summary(data: dict) -> str:
priority = data.get("priority", "medium")
icon = _priority_icon.get(priority, "")
lines = [
f"*📧 新邮件摘要*",
f"━━━━━━━━━━━━━━━━━━",
f"*发件人:* {_escape(data.get('sender', '未知'))}",
f"*主题:* {_escape(data.get('subject', '无主题'))}",
f"*优先级:* {icon} {priority.upper()}",
"",
_escape(data.get("summary", "")),
"",
]
if data.get("action_required"):
lines.append(f"*📌 需要处理:* 是")
items = data.get("action_items", [])
if items:
lines.append(f"*待办事项:*")
for item in items:
lines.append(f"{_escape(item)}")
lines.append("")
points = data.get("key_points", [])
if points:
lines.append(f"*关键要点:*")
for p in points:
lines.append(f"{_escape(p)}")
return "\n".join(lines)
def _escape(text: str) -> str:
special = "_*[]()~`>#+-=|{}.!"
for ch in special:
text = text.replace(ch, f"\\{ch}")
return text