import logging import logging.handlers import queue as queue_module import signal import sys import threading import time from src.config import load_config from src.summarizer import poll_accounts, ai_process, tg_send_and_mark from src.tg_bot import poll_telegram # Thread-safe logging: all log records go through a single QueueListener thread _log_queue = queue_module.Queue(-1) _queue_handler = logging.handlers.QueueHandler(_log_queue) _console_handler = logging.StreamHandler() _console_handler.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] [%(threadName)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", )) _log_listener = logging.handlers.QueueListener(_log_queue, _console_handler) root = logging.getLogger() root.addHandler(_queue_handler) root.setLevel(logging.INFO) _log_listener.start() logger = logging.getLogger("main") _running = True _pending_lock = threading.Lock() _pending_uids: set[bytes] = set() # Queue 1: raw emails → AI processor _email_queue: queue_module.Queue = queue_module.Queue() # Queue 2: AI summaries → TG sender _tg_queue: queue_module.Queue = queue_module.Queue() def _signal_handler(signum, frame): global _running logger.info("收到退出信号,正在停止...") _running = False def _email_poller(cfg): logger.info("邮件轮询线程已启动") while _running: for acct_idx, mail in poll_accounts(cfg): if not _running: return with _pending_lock: if mail.uid not in _pending_uids: _pending_uids.add(mail.uid) _email_queue.put((acct_idx, mail)) if _running: for _ in range(cfg.polling.interval_seconds): if not _running: return time.sleep(1) def _ai_processor(cfg): logger.info("AI 处理线程已启动") while _running: try: acct_idx, mail = _email_queue.get(timeout=1) except queue_module.Empty: continue try: info = ai_process(cfg, acct_idx, mail) if info: _tg_queue.put(info) except Exception as e: logger.error(f"AI 处理邮件失败: {e}", exc_info=True) finally: with _pending_lock: _pending_uids.discard(mail.uid) def _tg_worker(cfg): logger.info("TG 线程已启动") last_update_id = 0 while _running: try: last_update_id = poll_telegram(cfg.telegram, cfg, last_update_id) except Exception as e: logger.error(f"TG 轮询错误: {e}", exc_info=True) try: info = _tg_queue.get_nowait() tg_send_and_mark(cfg, info) except queue_module.Empty: pass except Exception as e: logger.error(f"TG 发送失败: {e}", exc_info=True) if _running: time.sleep(1) def main(): global _running signal.signal(signal.SIGINT, _signal_handler) signal.signal(signal.SIGTERM, _signal_handler) cfg_path = sys.argv[1] if len(sys.argv) > 1 else "config.yaml" cfg = load_config(cfg_path) logger.info(f"AI邮件摘要机器人已启动,轮询间隔: {cfg.polling.interval_seconds}s") threads = [ threading.Thread(target=_email_poller, args=(cfg,), daemon=True), threading.Thread(target=_ai_processor, args=(cfg,), daemon=True), threading.Thread(target=_tg_worker, args=(cfg,), daemon=True), ] for t in threads: t.start() logger.info("所有线程已启动,进入主循环") tick = 0 try: while _running: tick += 1 if tick % 30 == 0: logger.info("队列状态: email_queue=%d tg_queue=%d pending_uids=%d", _email_queue.qsize(), _tg_queue.qsize(), len(_pending_uids)) time.sleep(1) except KeyboardInterrupt: _running = False for t in threads: t.join(timeout=5) _log_listener.stop() logger.info("机器人已停止") if __name__ == "__main__": main()