refactor: separate email poll, email process, TG poll into threads

- main.py spawns 3 daemon threads: email poller, email processor, TG poller
- Each thread runs independently so retries don't block other operations
- _pending_uids set + lock prevents duplicate queueing across poll cycles
- summarizer.py split into poll_accounts() (generator) and process_email()
- Graceful shutdown via shared _running flag
This commit is contained in:
2026-07-02 20:48:05 +08:00
parent 09d11a6c03
commit 1be3aadb08
2 changed files with 83 additions and 43 deletions

77
main.py
View File

@@ -1,9 +1,11 @@
import logging
import queue
import signal
import sys
import threading
import time
from src.config import load_config
from src.summarizer import process_all
from src.summarizer import poll_accounts, process_email
from src.tg_bot import poll_telegram
logging.basicConfig(
@@ -14,6 +16,8 @@ logging.basicConfig(
logger = logging.getLogger("main")
_running = True
_pending_lock = threading.Lock()
_pending_uids: set[bytes] = set()
def _signal_handler(signum, frame):
@@ -22,6 +26,51 @@ def _signal_handler(signum, frame):
_running = False
def _email_poller(cfg, processing_queue):
logger.info("邮件轮询线程已启动")
while _running:
for acct_idx, mail in poll_accounts(cfg):
if not _running:
return
with _pending_lock:
if mail.uid not in _pending_uids:
_pending_uids.add(mail.uid)
processing_queue.put((acct_idx, mail))
if _running:
for _ in range(cfg.polling.interval_seconds):
if not _running:
return
time.sleep(1)
def _email_processor(cfg, processing_queue):
logger.info("邮件处理线程已启动")
while _running:
try:
acct_idx, mail = processing_queue.get(timeout=1)
except queue.Empty:
continue
try:
process_email(cfg, acct_idx, mail)
except Exception as e:
logger.error(f"处理邮件失败: {e}", exc_info=True)
finally:
with _pending_lock:
_pending_uids.discard(mail.uid)
def _tg_poller(cfg):
logger.info("TG 轮询线程已启动")
last_update_id = 0
while _running:
try:
last_update_id = poll_telegram(cfg.telegram, cfg, last_update_id)
except Exception as e:
logger.error(f"TG 轮询错误: {e}", exc_info=True)
if _running:
time.sleep(1)
def main():
global _running
signal.signal(signal.SIGINT, _signal_handler)
@@ -31,23 +80,25 @@ def main():
cfg = load_config(cfg_path)
logger.info(f"AI邮件摘要机器人已启动轮询间隔: {cfg.polling.interval_seconds}s")
last_check = 0.0
last_update_id = 0
processing_queue: queue.Queue = queue.Queue()
while _running:
try:
now = time.time()
if now - last_check >= cfg.polling.interval_seconds:
process_all(cfg)
last_check = now
threads = [
threading.Thread(target=_email_poller, args=(cfg, processing_queue), daemon=True),
threading.Thread(target=_email_processor, args=(cfg, processing_queue), daemon=True),
threading.Thread(target=_tg_poller, args=(cfg,), daemon=True),
]
last_update_id = poll_telegram(cfg.telegram, cfg, last_update_id)
except Exception as e:
logger.error(f"主循环出错: {e}", exc_info=True)
for t in threads:
t.start()
if _running:
try:
while _running:
time.sleep(1)
except KeyboardInterrupt:
_running = False
for t in threads:
t.join(timeout=5)
logger.info("机器人已停止")

View File

@@ -1,43 +1,32 @@
import logging
from typing import Generator
from src.config import Config
from src.email_client import fetch_unseen_emails, mark_as_seen
from src.email_client import fetch_unseen_emails, mark_as_seen, Email
from src.ai_client import summarize_email
from src.tg_bot import send_summary, format_summary
logger = logging.getLogger(__name__)
def process_all(cfg: Config):
def poll_accounts(cfg: Config) -> Generator[tuple[int, Email], None, None]:
for idx, acct in enumerate(cfg.email_accounts):
try:
_process_account(cfg, acct, idx)
logger.info(f"检查邮箱: {acct.username}")
emails = fetch_unseen_emails(acct)
if emails:
logger.info(f" 发现 {len(emails)} 封新邮件")
for mail in emails:
yield idx, mail
except Exception as e:
logger.error(f"处理邮箱 {acct.username} 时出错: {e}", exc_info=True)
logger.error(f"轮询 {acct.username} 失败: {e}", exc_info=True)
def _process_account(cfg: Config, acct, acct_idx: int):
logger.info(f"检查邮箱: {acct.username}")
emails = fetch_unseen_emails(acct)
if not emails:
logger.info(f" 没有新邮件")
return
logger.info(f" 发现 {len(emails)} 封新邮件")
seen_uids = []
for mail in emails:
try:
logger.info(f" 正在摘要: {mail.subject}")
summary = summarize_email(cfg.ai, acct.username, mail.subject, mail.sender, mail.body)
summary["recipient"] = acct.username
text = format_summary(summary)
send_summary(cfg.telegram, cfg.telegram.chat_id, text, summary, mail.body, acct_idx)
seen_uids.append(mail.uid)
logger.info(f" 已发送到 Telegram")
except Exception as e:
logger.error(f" 处理邮件 '{mail.subject}' 失败: {e}", exc_info=True)
seen_uids.append(mail.uid)
if seen_uids:
mark_as_seen(acct, seen_uids)
def process_email(cfg: Config, acct_idx: int, mail: Email):
acct = cfg.email_accounts[acct_idx]
logger.info(f" 正在摘要: {mail.subject}")
summary = summarize_email(cfg.ai, acct.username, mail.subject, mail.sender, mail.body)
summary["recipient"] = acct.username
text = format_summary(summary)
send_summary(cfg.telegram, cfg.telegram.chat_id, text, summary, mail.body, acct_idx)
mark_as_seen(acct, [mail.uid])
logger.info(f" 已处理: {mail.subject}")