Files
ai-mail-bot/main.py
Zichao Lin d334b6f3eb feat: enhance logging detail across all modules
- email_client: log IMAP connection, login, UNSEEN count, each email
  details (subject/sender/body size), mark-as-seen progress
- ai_client: log AI request params, timing, token usage, response size
- smtp_client: log SMTP connect, login, send details
- tg_bot: log all callback actions with subject context, message states
- main: periodic queue depth report (email_queue/tg_queue/pending_uids)
2026-07-02 21:00:40 +08:00

139 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import logging.handlers
import queue as queue_module
import signal
import sys
import threading
import time
from src.config import load_config
from src.summarizer import poll_accounts, ai_process, tg_send_and_mark
from src.tg_bot import poll_telegram
# Thread-safe logging: all log records go through a single QueueListener thread
_log_queue = queue_module.Queue(-1)
_queue_handler = logging.handlers.QueueHandler(_log_queue)
_console_handler = logging.StreamHandler()
_console_handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] [%(threadName)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
))
_log_listener = logging.handlers.QueueListener(_log_queue, _console_handler)
root = logging.getLogger()
root.addHandler(_queue_handler)
root.setLevel(logging.INFO)
_log_listener.start()
logger = logging.getLogger("main")
_running = True
_pending_lock = threading.Lock()
_pending_uids: set[bytes] = set()
# Queue 1: raw emails → AI processor
_email_queue: queue_module.Queue = queue_module.Queue()
# Queue 2: AI summaries → TG sender
_tg_queue: queue_module.Queue = queue_module.Queue()
def _signal_handler(signum, frame):
global _running
logger.info("收到退出信号,正在停止...")
_running = False
def _email_poller(cfg):
logger.info("邮件轮询线程已启动")
while _running:
for acct_idx, mail in poll_accounts(cfg):
if not _running:
return
with _pending_lock:
if mail.uid not in _pending_uids:
_pending_uids.add(mail.uid)
_email_queue.put((acct_idx, mail))
if _running:
for _ in range(cfg.polling.interval_seconds):
if not _running:
return
time.sleep(1)
def _ai_processor(cfg):
logger.info("AI 处理线程已启动")
while _running:
try:
acct_idx, mail = _email_queue.get(timeout=1)
except queue_module.Empty:
continue
try:
info = ai_process(cfg, acct_idx, mail)
if info:
_tg_queue.put(info)
except Exception as e:
logger.error(f"AI 处理邮件失败: {e}", exc_info=True)
finally:
with _pending_lock:
_pending_uids.discard(mail.uid)
def _tg_worker(cfg):
logger.info("TG 线程已启动")
last_update_id = 0
while _running:
try:
last_update_id = poll_telegram(cfg.telegram, cfg, last_update_id)
except Exception as e:
logger.error(f"TG 轮询错误: {e}", exc_info=True)
try:
info = _tg_queue.get_nowait()
tg_send_and_mark(cfg, info)
except queue_module.Empty:
pass
except Exception as e:
logger.error(f"TG 发送失败: {e}", exc_info=True)
if _running:
time.sleep(1)
def main():
global _running
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
cfg_path = sys.argv[1] if len(sys.argv) > 1 else "config.yaml"
cfg = load_config(cfg_path)
logger.info(f"AI邮件摘要机器人已启动轮询间隔: {cfg.polling.interval_seconds}s")
threads = [
threading.Thread(target=_email_poller, args=(cfg,), daemon=True),
threading.Thread(target=_ai_processor, args=(cfg,), daemon=True),
threading.Thread(target=_tg_worker, args=(cfg,), daemon=True),
]
for t in threads:
t.start()
logger.info("所有线程已启动,进入主循环")
tick = 0
try:
while _running:
tick += 1
if tick % 30 == 0:
logger.info("队列状态: email_queue=%d tg_queue=%d pending_uids=%d",
_email_queue.qsize(), _tg_queue.qsize(), len(_pending_uids))
time.sleep(1)
except KeyboardInterrupt:
_running = False
for t in threads:
t.join(timeout=5)
_log_listener.stop()
logger.info("机器人已停止")
if __name__ == "__main__":
main()