fix: thread-safe logging with QueueHandler + QueueListener

- Replace logging.basicConfig with QueueHandler/QueueListener
  so all log output writes from a single thread
- Add %(threadName)s to format for thread identification
This commit is contained in:
2026-07-02 20:57:33 +08:00
parent be412168bb
commit 934d6a7545

32
main.py
View File

@@ -1,5 +1,6 @@
import logging
import queue
import logging.handlers
import queue as queue_module
import signal
import sys
import threading
@@ -8,11 +9,21 @@ from src.config import load_config
from src.summarizer import poll_accounts, ai_process, tg_send_and_mark
from src.tg_bot import poll_telegram
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
# Thread-safe logging: all log records go through a single QueueListener thread
_log_queue = queue_module.Queue(-1)
_queue_handler = logging.handlers.QueueHandler(_log_queue)
_console_handler = logging.StreamHandler()
_console_handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] [%(threadName)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
))
_log_listener = logging.handlers.QueueListener(_log_queue, _console_handler)
root = logging.getLogger()
root.addHandler(_queue_handler)
root.setLevel(logging.INFO)
_log_listener.start()
logger = logging.getLogger("main")
_running = True
@@ -20,9 +31,9 @@ _pending_lock = threading.Lock()
_pending_uids: set[bytes] = set()
# Queue 1: raw emails → AI processor
_email_queue: queue.Queue = queue.Queue()
# Queue 2: AI results → TG sender
_tg_queue: queue.Queue = queue.Queue()
_email_queue: queue_module.Queue = queue_module.Queue()
# Queue 2: AI summaries → TG sender
_tg_queue: queue_module.Queue = queue_module.Queue()
def _signal_handler(signum, frame):
@@ -53,7 +64,7 @@ def _ai_processor(cfg):
while _running:
try:
acct_idx, mail = _email_queue.get(timeout=1)
except queue.Empty:
except queue_module.Empty:
continue
try:
info = ai_process(cfg, acct_idx, mail)
@@ -78,7 +89,7 @@ def _tg_worker(cfg):
try:
info = _tg_queue.get_nowait()
tg_send_and_mark(cfg, info)
except queue.Empty:
except queue_module.Empty:
pass
except Exception as e:
logger.error(f"TG 发送失败: {e}", exc_info=True)
@@ -113,6 +124,7 @@ def main():
for t in threads:
t.join(timeout=5)
_log_listener.stop()
logger.info("机器人已停止")