- email_client: log IMAP connection, login, UNSEEN count, each email details (subject/sender/body size), mark-as-seen progress - ai_client: log AI request params, timing, token usage, response size - smtp_client: log SMTP connect, login, send details - tg_bot: log all callback actions with subject context, message states - main: periodic queue depth report (email_queue/tg_queue/pending_uids)
139 lines
4.0 KiB
Python
139 lines
4.0 KiB
Python
import logging
|
||
import logging.handlers
|
||
import queue as queue_module
|
||
import signal
|
||
import sys
|
||
import threading
|
||
import time
|
||
from src.config import load_config
|
||
from src.summarizer import poll_accounts, ai_process, tg_send_and_mark
|
||
from src.tg_bot import poll_telegram
|
||
|
||
# Thread-safe logging: all log records go through a single QueueListener thread
|
||
_log_queue = queue_module.Queue(-1)
|
||
_queue_handler = logging.handlers.QueueHandler(_log_queue)
|
||
_console_handler = logging.StreamHandler()
|
||
_console_handler.setFormatter(logging.Formatter(
|
||
"%(asctime)s [%(levelname)s] [%(threadName)s] %(name)s: %(message)s",
|
||
datefmt="%Y-%m-%d %H:%M:%S",
|
||
))
|
||
_log_listener = logging.handlers.QueueListener(_log_queue, _console_handler)
|
||
|
||
root = logging.getLogger()
|
||
root.addHandler(_queue_handler)
|
||
root.setLevel(logging.INFO)
|
||
_log_listener.start()
|
||
|
||
logger = logging.getLogger("main")
|
||
|
||
_running = True
|
||
_pending_lock = threading.Lock()
|
||
_pending_uids: set[bytes] = set()
|
||
|
||
# Queue 1: raw emails → AI processor
|
||
_email_queue: queue_module.Queue = queue_module.Queue()
|
||
# Queue 2: AI summaries → TG sender
|
||
_tg_queue: queue_module.Queue = queue_module.Queue()
|
||
|
||
|
||
def _signal_handler(signum, frame):
|
||
global _running
|
||
logger.info("收到退出信号,正在停止...")
|
||
_running = False
|
||
|
||
|
||
def _email_poller(cfg):
|
||
logger.info("邮件轮询线程已启动")
|
||
while _running:
|
||
for acct_idx, mail in poll_accounts(cfg):
|
||
if not _running:
|
||
return
|
||
with _pending_lock:
|
||
if mail.uid not in _pending_uids:
|
||
_pending_uids.add(mail.uid)
|
||
_email_queue.put((acct_idx, mail))
|
||
if _running:
|
||
for _ in range(cfg.polling.interval_seconds):
|
||
if not _running:
|
||
return
|
||
time.sleep(1)
|
||
|
||
|
||
def _ai_processor(cfg):
|
||
logger.info("AI 处理线程已启动")
|
||
while _running:
|
||
try:
|
||
acct_idx, mail = _email_queue.get(timeout=1)
|
||
except queue_module.Empty:
|
||
continue
|
||
try:
|
||
info = ai_process(cfg, acct_idx, mail)
|
||
if info:
|
||
_tg_queue.put(info)
|
||
except Exception as e:
|
||
logger.error(f"AI 处理邮件失败: {e}", exc_info=True)
|
||
finally:
|
||
with _pending_lock:
|
||
_pending_uids.discard(mail.uid)
|
||
|
||
|
||
def _tg_worker(cfg):
|
||
logger.info("TG 线程已启动")
|
||
last_update_id = 0
|
||
while _running:
|
||
try:
|
||
last_update_id = poll_telegram(cfg.telegram, cfg, last_update_id)
|
||
except Exception as e:
|
||
logger.error(f"TG 轮询错误: {e}", exc_info=True)
|
||
|
||
try:
|
||
info = _tg_queue.get_nowait()
|
||
tg_send_and_mark(cfg, info)
|
||
except queue_module.Empty:
|
||
pass
|
||
except Exception as e:
|
||
logger.error(f"TG 发送失败: {e}", exc_info=True)
|
||
|
||
if _running:
|
||
time.sleep(1)
|
||
|
||
|
||
def main():
|
||
global _running
|
||
signal.signal(signal.SIGINT, _signal_handler)
|
||
signal.signal(signal.SIGTERM, _signal_handler)
|
||
|
||
cfg_path = sys.argv[1] if len(sys.argv) > 1 else "config.yaml"
|
||
cfg = load_config(cfg_path)
|
||
logger.info(f"AI邮件摘要机器人已启动,轮询间隔: {cfg.polling.interval_seconds}s")
|
||
|
||
threads = [
|
||
threading.Thread(target=_email_poller, args=(cfg,), daemon=True),
|
||
threading.Thread(target=_ai_processor, args=(cfg,), daemon=True),
|
||
threading.Thread(target=_tg_worker, args=(cfg,), daemon=True),
|
||
]
|
||
|
||
for t in threads:
|
||
t.start()
|
||
|
||
logger.info("所有线程已启动,进入主循环")
|
||
tick = 0
|
||
try:
|
||
while _running:
|
||
tick += 1
|
||
if tick % 30 == 0:
|
||
logger.info("队列状态: email_queue=%d tg_queue=%d pending_uids=%d",
|
||
_email_queue.qsize(), _tg_queue.qsize(), len(_pending_uids))
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
_running = False
|
||
|
||
for t in threads:
|
||
t.join(timeout=5)
|
||
_log_listener.stop()
|
||
logger.info("机器人已停止")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|