From e2826a3e3b297b27b18bbe98b7a3d94afd2f3fef Mon Sep 17 00:00:00 2001 From: Zichao Lin Date: Thu, 2 Jul 2026 19:45:34 +0800 Subject: [PATCH] feat: init AI email summarization bot - Multi-account IMAP email polling with UID tracking - DeepSeek API integration with JSON Mode structured output - Telegram notification with formatted MarkdownV2 message - YAML config with dataclass-based type validation - Graceful shutdown on SIGINT/SIGTERM - 60s default polling interval --- .gitignore | 4 +++ activate.bat | 2 ++ activate.ps1 | 1 + config.yaml.example | 22 ++++++++++++ main.py | 49 ++++++++++++++++++++++++++ src/__init__.py | 0 src/ai_client.py | 48 +++++++++++++++++++++++++ src/config.py | 54 ++++++++++++++++++++++++++++ src/email_client.py | 86 +++++++++++++++++++++++++++++++++++++++++++++ src/summarizer.py | 42 ++++++++++++++++++++++ src/tg_bot.py | 56 +++++++++++++++++++++++++++++ 11 files changed, 364 insertions(+) create mode 100644 .gitignore create mode 100644 activate.bat create mode 100644 activate.ps1 create mode 100644 config.yaml.example create mode 100644 main.py create mode 100644 src/__init__.py create mode 100644 src/ai_client.py create mode 100644 src/config.py create mode 100644 src/email_client.py create mode 100644 src/summarizer.py create mode 100644 src/tg_bot.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d1daf71 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.venv/ +__pycache__/ +*.pyc +config.yaml diff --git a/activate.bat b/activate.bat new file mode 100644 index 0000000..c39246f --- /dev/null +++ b/activate.bat @@ -0,0 +1,2 @@ +@echo off +start "Python Venv" /D "%~dp0" cmd /k "%~dp0.venv\Scripts\activate.bat" \ No newline at end of file diff --git a/activate.ps1 b/activate.ps1 new file mode 100644 index 0000000..bc769e7 --- /dev/null +++ b/activate.ps1 @@ -0,0 +1 @@ +Start-Process -WindowStyle Normal -FilePath "powershell" -ArgumentList "-NoExit", "-Command", "& '.\.venv\Scripts\Activate.ps1'" \ No newline at end of file diff --git a/config.yaml.example b/config.yaml.example new file mode 100644 index 0000000..3cc68ed --- /dev/null +++ b/config.yaml.example @@ -0,0 +1,22 @@ +email_accounts: + - imap_server: "imap.gmail.com" + imap_port: 993 + username: "your_email@gmail.com" + password: "your_app_password" + + - imap_server: "imap.qq.com" + imap_port: 993 + username: "your_email@qq.com" + password: "your_authorization_code" + +ai: + api_key: "sk-your_deepseek_api_key" + model: "deepseek-chat" + base_url: "https://api.deepseek.com" + +telegram: + bot_token: "1234567890:ABCdefGHIjklmNOPqrstUVwxyz" + chat_id: "123456789" + +polling: + interval_seconds: 60 diff --git a/main.py b/main.py new file mode 100644 index 0000000..a6d0c4f --- /dev/null +++ b/main.py @@ -0,0 +1,49 @@ +import logging +import signal +import sys +import time +from src.config import load_config +from src.summarizer import process_all + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger("main") + +_running = True + + +def _signal_handler(signum, frame): + global _running + logger.info("收到退出信号,正在停止...") + _running = False + + +def main(): + global _running + signal.signal(signal.SIGINT, _signal_handler) + signal.signal(signal.SIGTERM, _signal_handler) + + cfg_path = sys.argv[1] if len(sys.argv) > 1 else "config.yaml" + cfg = load_config(cfg_path) + logger.info(f"AI邮件摘要机器人已启动,轮询间隔: {cfg.polling.interval_seconds}s") + + while _running: + try: + process_all(cfg) + except Exception as e: + logger.error(f"轮询出错: {e}", exc_info=True) + + if _running: + for _ in range(cfg.polling.interval_seconds): + if not _running: + break + time.sleep(1) + + logger.info("机器人已停止") + + +if __name__ == "__main__": + main() diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ai_client.py b/src/ai_client.py new file mode 100644 index 0000000..b6b6c39 --- /dev/null +++ b/src/ai_client.py @@ -0,0 +1,48 @@ +import json +from typing import Any +import requests +from src.config import AIConfig + +SYSTEM_PROMPT = """你是一个邮件摘要助手。请分析邮件内容并以 JSON 格式返回结构化摘要。 + +返回格式必须严格遵循以下 JSON schema: +{ + "subject": "邮件主题", + "sender": "发件人", + "summary": "50字以内的核心摘要", + "priority": "high/medium/low", + "action_required": true/false, + "action_items": ["待办事项1", "待办事项2"], + "key_points": ["关键要点1", "关键要点2"] +} + +只返回 JSON,不要包含任何其他文字。""" + + +def summarize_email(ai_cfg: AIConfig, subject: str, sender: str, body: str) -> dict[str, Any]: + content = f"发件人: {sender}\n主题: {subject}\n正文:\n{body[:4000]}" + + payload = { + "model": ai_cfg.model, + "messages": [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": content}, + ], + "response_format": {"type": "json_object"}, + } + + headers = { + "Authorization": f"Bearer {ai_cfg.api_key}", + "Content-Type": "application/json", + } + + resp = requests.post( + f"{ai_cfg.base_url.rstrip('/')}/chat/completions", + headers=headers, + json=payload, + timeout=60, + ) + resp.raise_for_status() + result = resp.json() + raw = result["choices"][0]["message"]["content"] + return json.loads(raw) diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..18d901f --- /dev/null +++ b/src/config.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass, field +from typing import List +import yaml + + +@dataclass +class EmailAccount: + imap_server: str + imap_port: int + username: str + password: str + + +@dataclass +class AIConfig: + api_key: str + model: str + base_url: str + + +@dataclass +class TelegramConfig: + bot_token: str + chat_id: str + + +@dataclass +class PollingConfig: + interval_seconds: int + + +@dataclass +class Config: + email_accounts: List[EmailAccount] + ai: AIConfig + telegram: TelegramConfig + polling: PollingConfig + + +def load_config(path: str) -> Config: + with open(path, "r", encoding="utf-8") as f: + raw = yaml.safe_load(f) + + accounts = [EmailAccount(**a) for a in raw["email_accounts"]] + ai = AIConfig(**raw["ai"]) + tg = TelegramConfig(**raw["telegram"]) + polling = PollingConfig(**raw["polling"]) + + return Config( + email_accounts=accounts, + ai=ai, + telegram=tg, + polling=polling, + ) diff --git a/src/email_client.py b/src/email_client.py new file mode 100644 index 0000000..207d234 --- /dev/null +++ b/src/email_client.py @@ -0,0 +1,86 @@ +import imaplib +import email +from email.header import decode_header +from email.utils import parsedate_to_datetime +from typing import Optional +from src.config import EmailAccount + + +class Email: + def __init__(self, uid: bytes, subject: str, sender: str, body: str, date: str): + self.uid = uid + self.subject = subject + self.sender = sender + self.body = body + self.date = date + + +def _decode_str(s: str) -> str: + parts = decode_header(s) + result = [] + for part, charset in parts: + if isinstance(part, bytes): + try: + result.append(part.decode(charset or "utf-8", errors="replace")) + except LookupError: + result.append(part.decode("utf-8", errors="replace")) + else: + result.append(part) + return "".join(result) + + +def _get_text_from_msg(msg) -> str: + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + if content_type == "text/plain": + payload = part.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + elif content_type == "text/html": + payload = part.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + else: + payload = msg.get_payload(decode=True) + if payload: + return payload.decode("utf-8", errors="replace") + return "" + + +def fetch_unseen_emails(account: EmailAccount) -> list[Email]: + conn = imaplib.IMAP4_SSL(account.imap_server, account.imap_port) + conn.login(account.username, account.password) + conn.select("INBOX") + + _, data = conn.uid("SEARCH", None, "UNSEEN") + uids = data[0].split() if data[0] else [] + emails = [] + + for uid in uids: + _, msg_data = conn.uid("FETCH", uid, "RFC822") + if msg_data[0] is None: + continue + raw_email = msg_data[0][1] + msg = email.message_from_bytes(raw_email) + + subject = _decode_str(msg.get("Subject", "")) + sender = _decode_str(msg.get("From", "")) + date_str = msg.get("Date", "") + body = _get_text_from_msg(msg) + + emails.append(Email(uid=uid, subject=subject, sender=sender, body=body, date=date_str)) + + conn.logout() + return emails + + +def mark_as_seen(account: EmailAccount, uids: list[bytes]): + if not uids: + return + conn = imaplib.IMAP4_SSL(account.imap_server, account.imap_port) + conn.login(account.username, account.password) + conn.select("INBOX") + for uid in uids: + conn.uid("STORE", uid, "+FLAGS", "\\Seen") + conn.logout() diff --git a/src/summarizer.py b/src/summarizer.py new file mode 100644 index 0000000..35ecff9 --- /dev/null +++ b/src/summarizer.py @@ -0,0 +1,42 @@ +import logging +from src.config import Config +from src.email_client import fetch_unseen_emails, mark_as_seen +from src.ai_client import summarize_email +from src.tg_bot import send_message, format_summary + +logger = logging.getLogger(__name__) + + +def process_all(cfg: Config): + for acct in cfg.email_accounts: + try: + _process_account(cfg, acct) + except Exception as e: + logger.error(f"处理邮箱 {acct.username} 时出错: {e}", exc_info=True) + + +def _process_account(cfg: Config, acct): + logger.info(f"检查邮箱: {acct.username}") + emails = fetch_unseen_emails(acct) + + if not emails: + logger.info(f" 没有新邮件") + return + + logger.info(f" 发现 {len(emails)} 封新邮件") + seen_uids = [] + + for mail in emails: + try: + logger.info(f" 正在摘要: {mail.subject}") + summary = summarize_email(cfg.ai, mail.subject, mail.sender, mail.body) + text = format_summary(summary) + send_message(cfg.telegram, text) + seen_uids.append(mail.uid) + logger.info(f" 已发送到 Telegram") + except Exception as e: + logger.error(f" 处理邮件 '{mail.subject}' 失败: {e}", exc_info=True) + seen_uids.append(mail.uid) + + if seen_uids: + mark_as_seen(acct, seen_uids) diff --git a/src/tg_bot.py b/src/tg_bot.py new file mode 100644 index 0000000..6f086f6 --- /dev/null +++ b/src/tg_bot.py @@ -0,0 +1,56 @@ +import requests +from src.config import TelegramConfig + + +def send_message(tg_cfg: TelegramConfig, text: str): + url = f"https://api.telegram.org/bot{tg_cfg.bot_token}/sendMessage" + payload = { + "chat_id": tg_cfg.chat_id, + "text": text, + "parse_mode": "MarkdownV2", + } + resp = requests.post(url, json=payload, timeout=30) + resp.raise_for_status() + + +_priority_icon = {"high": "🔴", "medium": "🟡", "low": "🟢"} + + +def format_summary(data: dict) -> str: + priority = data.get("priority", "medium") + icon = _priority_icon.get(priority, "⚪") + + lines = [ + f"*📧 新邮件摘要*", + f"━━━━━━━━━━━━━━━━━━", + f"*发件人:* {_escape(data.get('sender', '未知'))}", + f"*主题:* {_escape(data.get('subject', '无主题'))}", + f"*优先级:* {icon} {priority.upper()}", + "", + _escape(data.get("summary", "")), + "", + ] + + if data.get("action_required"): + lines.append(f"*📌 需要处理:* 是") + items = data.get("action_items", []) + if items: + lines.append(f"*待办事项:*") + for item in items: + lines.append(f" • {_escape(item)}") + lines.append("") + + points = data.get("key_points", []) + if points: + lines.append(f"*关键要点:*") + for p in points: + lines.append(f" • {_escape(p)}") + + return "\n".join(lines) + + +def _escape(text: str) -> str: + special = "_*[]()~`>#+-=|{}.!" + for ch in special: + text = text.replace(ch, f"\\{ch}") + return text