refactor: split AI processor and TG worker into separate threads
- Three independent threads: email poller, AI processor, TG worker - Two queues: _email_queue (raw emails→AI), _tg_queue (AI results→TG) - summarizer.py split: ai_process() (AI only) + tg_send_and_mark() (TG only) - AI retries no longer affect TG polling or email polling
This commit is contained in:
44
main.py
44
main.py
@@ -5,7 +5,7 @@ import sys
|
||||
import threading
|
||||
import time
|
||||
from src.config import load_config
|
||||
from src.summarizer import poll_accounts, process_email
|
||||
from src.summarizer import poll_accounts, ai_process, tg_send_and_mark
|
||||
from src.tg_bot import poll_telegram
|
||||
|
||||
logging.basicConfig(
|
||||
@@ -19,6 +19,11 @@ _running = True
|
||||
_pending_lock = threading.Lock()
|
||||
_pending_uids: set[bytes] = set()
|
||||
|
||||
# Queue 1: raw emails → AI processor
|
||||
_email_queue: queue.Queue = queue.Queue()
|
||||
# Queue 2: AI results → TG sender
|
||||
_tg_queue: queue.Queue = queue.Queue()
|
||||
|
||||
|
||||
def _signal_handler(signum, frame):
|
||||
global _running
|
||||
@@ -26,7 +31,7 @@ def _signal_handler(signum, frame):
|
||||
_running = False
|
||||
|
||||
|
||||
def _email_poller(cfg, processing_queue):
|
||||
def _email_poller(cfg):
|
||||
logger.info("邮件轮询线程已启动")
|
||||
while _running:
|
||||
for acct_idx, mail in poll_accounts(cfg):
|
||||
@@ -35,7 +40,7 @@ def _email_poller(cfg, processing_queue):
|
||||
with _pending_lock:
|
||||
if mail.uid not in _pending_uids:
|
||||
_pending_uids.add(mail.uid)
|
||||
processing_queue.put((acct_idx, mail))
|
||||
_email_queue.put((acct_idx, mail))
|
||||
if _running:
|
||||
for _ in range(cfg.polling.interval_seconds):
|
||||
if not _running:
|
||||
@@ -43,30 +48,41 @@ def _email_poller(cfg, processing_queue):
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def _email_processor(cfg, processing_queue):
|
||||
logger.info("邮件处理线程已启动")
|
||||
def _ai_processor(cfg):
|
||||
logger.info("AI 处理线程已启动")
|
||||
while _running:
|
||||
try:
|
||||
acct_idx, mail = processing_queue.get(timeout=1)
|
||||
acct_idx, mail = _email_queue.get(timeout=1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
try:
|
||||
process_email(cfg, acct_idx, mail)
|
||||
info = ai_process(cfg, acct_idx, mail)
|
||||
if info:
|
||||
_tg_queue.put(info)
|
||||
except Exception as e:
|
||||
logger.error(f"处理邮件失败: {e}", exc_info=True)
|
||||
logger.error(f"AI 处理邮件失败: {e}", exc_info=True)
|
||||
finally:
|
||||
with _pending_lock:
|
||||
_pending_uids.discard(mail.uid)
|
||||
|
||||
|
||||
def _tg_poller(cfg):
|
||||
logger.info("TG 轮询线程已启动")
|
||||
def _tg_worker(cfg):
|
||||
logger.info("TG 线程已启动")
|
||||
last_update_id = 0
|
||||
while _running:
|
||||
try:
|
||||
last_update_id = poll_telegram(cfg.telegram, cfg, last_update_id)
|
||||
except Exception as e:
|
||||
logger.error(f"TG 轮询错误: {e}", exc_info=True)
|
||||
|
||||
try:
|
||||
info = _tg_queue.get_nowait()
|
||||
tg_send_and_mark(cfg, info)
|
||||
except queue.Empty:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.error(f"TG 发送失败: {e}", exc_info=True)
|
||||
|
||||
if _running:
|
||||
time.sleep(1)
|
||||
|
||||
@@ -80,12 +96,10 @@ def main():
|
||||
cfg = load_config(cfg_path)
|
||||
logger.info(f"AI邮件摘要机器人已启动,轮询间隔: {cfg.polling.interval_seconds}s")
|
||||
|
||||
processing_queue: queue.Queue = queue.Queue()
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=_email_poller, args=(cfg, processing_queue), daemon=True),
|
||||
threading.Thread(target=_email_processor, args=(cfg, processing_queue), daemon=True),
|
||||
threading.Thread(target=_tg_poller, args=(cfg,), daemon=True),
|
||||
threading.Thread(target=_email_poller, args=(cfg,), daemon=True),
|
||||
threading.Thread(target=_ai_processor, args=(cfg,), daemon=True),
|
||||
threading.Thread(target=_tg_worker, args=(cfg,), daemon=True),
|
||||
]
|
||||
|
||||
for t in threads:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import logging
|
||||
from typing import Generator
|
||||
from typing import Generator, Optional
|
||||
from src.config import Config
|
||||
from src.email_client import fetch_unseen_emails, mark_as_seen, Email
|
||||
from src.ai_client import summarize_email
|
||||
@@ -21,12 +21,25 @@ def poll_accounts(cfg: Config) -> Generator[tuple[int, Email], None, None]:
|
||||
logger.error(f"轮询 {acct.username} 失败: {e}", exc_info=True)
|
||||
|
||||
|
||||
def process_email(cfg: Config, acct_idx: int, mail: Email):
|
||||
def ai_process(cfg: Config, acct_idx: int, mail: Email) -> Optional[dict]:
|
||||
acct = cfg.email_accounts[acct_idx]
|
||||
logger.info(f" 正在摘要: {mail.subject}")
|
||||
summary = summarize_email(cfg.ai, acct.username, mail.subject, mail.sender, mail.body)
|
||||
summary["recipient"] = acct.username
|
||||
text = format_summary(summary)
|
||||
send_summary(cfg.telegram, cfg.telegram.chat_id, text, summary, mail.body, acct_idx)
|
||||
mark_as_seen(acct, [mail.uid])
|
||||
logger.info(f" 已处理: {mail.subject}")
|
||||
return {
|
||||
"text": text,
|
||||
"data": summary,
|
||||
"original_body": mail.body,
|
||||
"acct_idx": acct_idx,
|
||||
"uid": mail.uid,
|
||||
}
|
||||
|
||||
|
||||
def tg_send_and_mark(cfg: Config, info: dict):
|
||||
acct = cfg.email_accounts[info["acct_idx"]]
|
||||
send_summary(cfg.telegram, cfg.telegram.chat_id,
|
||||
info["text"], info["data"],
|
||||
info["original_body"], info["acct_idx"])
|
||||
mark_as_seen(acct, [info["uid"]])
|
||||
logger.info(f" 已发送并标记已读")
|
||||
|
||||
Reference in New Issue
Block a user