"""运行时数据库模块""" import json import os class Database: """数据库类""" def __init__(self, config_name): self.db_file = f'output/{config_name}_db.json' self.load() def load(self): """加载数据库""" if not os.path.exists(self.db_file): with open(self.db_file, 'w', encoding='utf-8') as f: json.dump({}, f) with open(self.db_file, 'r', encoding='utf-8') as f: self.data = json.load(f) def save(self): """保存数据库""" with open(self.db_file, 'w', encoding='utf-8') as f: json.dump(self.data, f, indent=4) def sync_trades(self, trades): """ 同步数据库中的trade_id,确保所有trade_id都存在,且没有多余的trade_id trades: 当前config中有效的trade_id列表 """ self.load() schema = { "order_id": { "open": [], "filled": [], "canceled": [] }, "counter": { "open": 0, "filled": 0, "canceled": 0 } } for db_trade_id in list(self.data.keys()): if db_trade_id not in trades: del self.data[db_trade_id] for trade_id in trades: if trade_id not in self.data: self.data[trade_id] = schema for trade_id in trades: self.data[trade_id]["counter"]["open"] = len(self.data[trade_id]["order_id"]["open"]) self.data[trade_id]["counter"]["filled"] = len(self.data[trade_id]["order_id"]["filled"]) self.data[trade_id]["counter"]["canceled"] = len(self.data[trade_id]["order_id"]["canceled"]) self.save()