From 3144ff492abd879baa8b1fdb41184df5cc681d42 Mon Sep 17 00:00:00 2001 From: Zichao Lin Date: Sun, 20 Jul 2025 23:05:16 +0800 Subject: [PATCH] refactor: add comments and logs --- main.py | 498 ++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 319 insertions(+), 179 deletions(-) diff --git a/main.py b/main.py index 913f722..42f9bcb 100644 --- a/main.py +++ b/main.py @@ -7,9 +7,10 @@ from dataclasses import dataclass import mexc_spot_v3 +# 配置日志格式 logging.basicConfig( - level=logging.DEBUG, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("output/mexc_spot_grid_bot.log", encoding="utf-8"), logging.StreamHandler(), @@ -20,60 +21,70 @@ logger = logging.getLogger(__name__) @dataclass class Order: - order_id: str - price: float - quantity: float - side: str - status: str - filled_time: Optional[float] = None + """订单数据结构""" + + order_id: str # 订单ID + price: float # 订单价格 + quantity: float # 订单数量 + side: str # 买卖方向: BUY/SELL + status: str # 订单状态: NEW/FILLED/CANCELED + filled_time: Optional[float] = None # 成交时间戳 class GridTradingBot: + """网格交易机器人核心类""" + def __init__(self, conf: Dict): + """初始化网格交易机器人""" self.config = conf - self.symbol = conf["symbol"] - self.csv_symbol = conf["csv_symbol"] - self.csv_file = conf["csv_file"] - self.grid_percentage = conf["grid_percentage"] - self.grid_count = conf["grid_count"] - self.order_amount = conf["order_amount"] - self.min_order_value = conf["min_order_value"] - self.reserve_base = conf["reserve_base"] - self.reserve_quote = conf["reserve_quote"] - self.active_orders: Dict[str, Order] = {} - self.current_buy_levels = 0 - self.current_sell_levels = 0 - self.api_trade = mexc_spot_v3.mexc_trade() - self.api_market = mexc_spot_v3.mexc_market() - self.running = False + self.symbol = conf["symbol"] # 交易对符号 + self.csv_symbol = conf["csv_symbol"] # CSV记录使用的交易对符号 + self.csv_file = conf["csv_file"] # 交易记录文件路径 + self.grid_percentage = conf["grid_percentage"] # 网格间距百分比 + self.grid_count = conf["grid_count"] # 单边网格数量 + self.order_amount = conf["order_amount"] # 每单交易数量 + self.min_order_value = conf["min_order_value"] # 最小订单价值 + self.reserve_base = conf["reserve_base"] # 保留的基础货币数量 + self.reserve_quote = conf["reserve_quote"] # 保留的报价货币数量 + + # 运行时状态 + self.active_orders: Dict[str, Order] = {} # 活跃订单字典 + self.current_buy_levels = 0 # 当前买单网格层级 + self.current_sell_levels = 0 # 当前卖单网格层级 + self.api_trade = mexc_spot_v3.mexc_trade() # 交易API + self.api_market = mexc_spot_v3.mexc_market() # 市场API + self.running = False # 运行状态标志 + + logger.info("网格交易机器人初始化完成,交易对: %s", self.symbol) def record_transaction(self, order_response: Dict[str, Any]) -> bool: + """记录交易到CSV文件""" try: - csv_symbol = self.csv_symbol - order_id = order_response["orderId"] - executed_qty = order_response["executedQty"] - cummulative_quote_qty = order_response["cummulativeQuoteQty"] - side = order_response["side"] - trade_type = "买入" if side == "BUY" else "卖出" + trade_type = "买入" if order_response["side"] == "BUY" else "卖出" timestamp = datetime.fromtimestamp( order_response["updateTime"] / 1000 ).strftime("%Y-%m-%dT%H:%M") + + # 构建CSV行数据 row = [ timestamp, trade_type, - csv_symbol, - executed_qty, - cummulative_quote_qty, + self.csv_symbol, + order_response["executedQty"], + order_response["cummulativeQuoteQty"], "资金账户", "CEX", - f"MEXC API - Order ID: {order_id}", + f"MEXC API - Order ID: {order_response['orderId']}", ] - file_exists = False + + # 检查文件是否存在 try: with open(self.csv_file, "r", encoding="utf-8") as f: file_exists = True except FileNotFoundError: - pass + file_exists = False + + # 写入CSV文件 with open(self.csv_file, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if not file_exists: @@ -90,23 +101,35 @@ class GridTradingBot: ] ) writer.writerow(row) + + logger.info( + "交易记录已保存: %s %s %s", + trade_type, + self.csv_symbol, + order_response["orderId"], + ) return True except Exception as e: + logger.error("交易记录保存失败: %s", str(e)) return False def api_get_price(self) -> float: + """获取当前市场价格""" try: ticker = self.api_market.get_price({"symbol": self.symbol}) price = float(ticker["price"]) + logger.debug("当前市场价格: %f", price) return price except Exception as e: + logger.warning("获取价格失败,重试中... 错误: %s", str(e)) return self.api_get_price() def api_get_balances(self) -> Tuple[float, float]: - base_currency = self.symbol[:-4] - quote_currency = self.symbol[-4:] - base_available = 0.0 - quote_available = 0.0 + """获取基础货币和报价货币可用余额""" + base_currency = self.symbol[:-4] # 提取基础货币 + quote_currency = self.symbol[-4:] # 提取报价货币 + base_available, quote_available = 0.0, 0.0 + try: balances = self.api_trade.get_account_info() for balance in balances.get("balances", []): @@ -114,32 +137,46 @@ class GridTradingBot: base_available = float(balance["free"]) elif balance["asset"] == quote_currency: quote_available = float(balance["free"]) + + logger.debug( + "当前余额 - %s: %f, %s: %f", + base_currency, + base_available, + quote_currency, + quote_available, + ) return base_available, quote_available except Exception as e: + logger.warning("获取余额失败,重试中... 错误: %s", str(e)) return self.api_get_balances() def calculate_order_value(self, price: float) -> float: - value = price * self.order_amount - return value + """计算订单价值(价格*数量)""" + return price * self.order_amount def is_order_value_valid(self, price: float) -> bool: - value = self.calculate_order_value(price) - is_valid = value >= self.min_order_value - return is_valid + """检查订单价值是否满足最小要求""" + return self.calculate_order_value(price) >= self.min_order_value def calculate_grid_price(self, base_price: float, level: int, side: str) -> float: + """计算网格价格""" if side == "BUY": - price = base_price / (1 + self.grid_percentage) ** level - return price + return base_price / (1 + self.grid_percentage) ** level elif side == "SELL": - price = base_price * (1 + self.grid_percentage) ** level - return price + return base_price * (1 + self.grid_percentage) ** level else: - raise ValueError(f"Invalid side: {side}") + raise ValueError(f"无效的交易方向: {side}") def api_place_order(self, price: float, side: str) -> Optional[str]: + """下订单""" if not self.is_order_value_valid(price): + logger.warning( + "订单价值 %f 低于最小值 %f", + price * self.order_amount, + self.min_order_value, + ) return None + try: order_detail = self.api_trade.post_order( { @@ -150,286 +187,389 @@ class GridTradingBot: "quantity": self.order_amount, } ) - order_id = order_detail.get("orderId") - if order_id: + + if order_id := order_detail.get("orderId"): + logger.info( + "成功下单 %s - 价格: %f 数量: %f 订单ID: %s", + side, + price, + self.order_amount, + order_id, + ) return order_id else: + logger.error("下单失败: %r", order_detail) return None except Exception as e: + logger.warning("下单异常,重试中... 错误: %s", str(e)) return self.api_place_order(price, side) def api_cancel_order(self, order_id: str): + """取消指定订单""" try: self.api_trade.delete_order({"symbol": self.symbol, "orderId": order_id}) + logger.info("订单已取消: %s", order_id) except Exception as e: + logger.warning( + "取消订单失败,重试中... 订单ID: %s 错误: %s", order_id, str(e) + ) self.api_cancel_order(order_id) def api_cancel_all_orders(self): + """取消所有活跃订单""" try: self.api_trade.delete_openorders({"symbol": self.symbol}) self.active_orders.clear() self.current_buy_levels = 0 self.current_sell_levels = 0 + logger.info("已取消所有订单") except Exception as e: + logger.warning("取消所有订单失败,重试中... 错误: %s", str(e)) self.api_cancel_all_orders() def api_update_order_statuses(self): + """更新所有活跃订单状态""" for side in ["BUY", "SELL"]: - orders = self.get_active_orders_by_side(side) - if side == "BUY": - orders.sort(key=lambda x: float(x.price), reverse=True) - else: - orders.sort(key=lambda x: float(x.price)) + # 按价格排序订单(买单从高到低,卖单从低到高) + orders = sorted( + [o for o in self.active_orders.values() if o.side == side], + key=lambda x: x.price, + reverse=(side == "BUY"), + ) + for order in orders: - order_id = order.order_id try: order_info = self.api_trade.get_order( - {"symbol": self.symbol, "orderId": order_id} + {"symbol": self.symbol, "orderId": order.order_id} ) - old_status = self.active_orders[order_id].status - new_status = order_info["status"] - self.active_orders[order_id].status = new_status - if new_status == "FILLED": - self.active_orders[order_id].filled_time = time.time() + + # 更新订单状态 + old_status = order.status + order.status = order_info["status"] + + if old_status != order.status: + logger.info( + "订单状态更新: %s %s -> %s", + order.order_id, + old_status, + order.status, + ) + + # 处理已成交订单 + if order.status == "FILLED": + order.filled_time = time.time() self.record_transaction(order_info) - if new_status in ["CANCELED"]: - self.active_orders.pop(order_id) - if new_status == "NEW": - break + logger.info( + "订单已成交: %s 价格: %f 数量: %f", + order.order_id, + order.price, + order.quantity, + ) + + # 移除已取消订单 + if order.status == "CANCELED": + self.active_orders.pop(order.order_id, None) except Exception as e: - self.api_update_order_statuses() + logger.warning( + "获取订单状态失败: %s 错误: %s", order.order_id, str(e) + ) def get_active_orders_by_side(self, side: str) -> List[Order]: - orders = [order for order in self.active_orders.values() if order.side == side] - return orders + """获取指定方向的所有活跃订单""" + return [o for o in self.active_orders.values() if o.side == side] def get_extreme_prices(self) -> Tuple[Optional[float], Optional[float]]: + """获取当前最极端的买单和卖单价格""" buy_orders = self.get_active_orders_by_side("BUY") sell_orders = self.get_active_orders_by_side("SELL") - lowest_buy = min([order.price for order in buy_orders]) if buy_orders else None - highest_sell = ( - max([order.price for order in sell_orders]) if sell_orders else None - ) + lowest_buy = min(o.price for o in buy_orders) if buy_orders else None + highest_sell = max(o.price for o in sell_orders) if sell_orders else None return lowest_buy, highest_sell def initialize_grid(self): + """初始化网格""" try: market_price = self.api_get_price() + logger.info("开始初始化网格,当前价格: %f", market_price) + + # 计算初始买卖价格 buy_price = self.calculate_grid_price(market_price, 1, "BUY") sell_price = self.calculate_grid_price(market_price, 1, "SELL") + + # 获取当前余额 base_balance, quote_balance = self.api_get_balances() - if quote_balance > self.reserve_quote: - if self.is_order_value_valid(buy_price): - buy_order_id = self.api_place_order(buy_price, "BUY") - if buy_order_id: - self.active_orders[buy_order_id] = Order( - order_id=buy_order_id, - price=buy_price, - quantity=self.order_amount, - side="BUY", - status="NEW", - ) - self.current_buy_levels = 1 - if base_balance > self.reserve_base: - if self.is_order_value_valid(sell_price): - sell_order_id = self.api_place_order(sell_price, "SELL") - if sell_order_id: - self.active_orders[sell_order_id] = Order( - order_id=sell_order_id, - price=sell_price, - quantity=self.order_amount, - side="SELL", - status="NEW", - ) - self.current_sell_levels = 1 + + # 下单买1 + if ( + quote_balance - self.order_amount * buy_price > self.reserve_quote + and self.is_order_value_valid(buy_price) + ): + if order_id := self.api_place_order(buy_price, "BUY"): + self.active_orders[order_id] = Order( + order_id=order_id, + price=buy_price, + quantity=self.order_amount, + side="BUY", + status="NEW", + ) + self.current_buy_levels = 1 + + # 下单卖1 + if ( + base_balance - self.order_amount > self.reserve_base + and self.is_order_value_valid(sell_price) + ): + if order_id := self.api_place_order(sell_price, "SELL"): + self.active_orders[order_id] = Order( + order_id=order_id, + price=sell_price, + quantity=self.order_amount, + side="SELL", + status="NEW", + ) + self.current_sell_levels = 1 + + # 扩展网格 self.extend_grid() except Exception as e: + logger.error("网格初始化失败: %s", str(e)) self.initialize_grid() def extend_grid(self): + """扩展网格到指定层级""" try: lowest_buy, highest_sell = self.get_extreme_prices() + # 波动过大导致两侧无挂单,则重置机器人 if lowest_buy is None and highest_sell is None: self.stop() self.run() + # 扩展买单网格(向下) while self.current_buy_levels < self.grid_count: - lowest_buy, highest_sell = self.get_extreme_prices() - base_balance, quote_balance = self.api_get_balances() + lowest_buy, _ = self.get_extreme_prices() + _, quote_balance = self.api_get_balances() if lowest_buy is None: lowest_buy = self.calculate_grid_price( highest_sell, self.grid_count, "BUY" ) new_buy_price = self.calculate_grid_price(lowest_buy, 1, "BUY") - if ( - quote_balance - self.order_amount * new_buy_price - > self.reserve_quote - ): + required_quote = new_buy_price * self.order_amount + + if quote_balance - required_quote > self.reserve_quote: if self.is_order_value_valid(new_buy_price): - buy_order_id = self.api_place_order(new_buy_price, "BUY") - if buy_order_id: - self.active_orders[buy_order_id] = Order( - order_id=buy_order_id, + if order_id := self.api_place_order(new_buy_price, "BUY"): + self.active_orders[order_id] = Order( + order_id=order_id, price=new_buy_price, quantity=self.order_amount, side="BUY", status="NEW", ) self.current_buy_levels += 1 + logger.info( + "扩展买单网格到层级 %d 价格: %f", + self.current_buy_levels, + new_buy_price, + ) else: + logger.warning( + "买单价值 %f 低于最小值 %f", + new_buy_price * self.order_amount, + self.min_order_value, + ) break else: + logger.debug("报价货币余额不足,无法扩展买单网格") break + + # 扩展卖单网格(向上) while self.current_sell_levels < self.grid_count: - lowest_buy, highest_sell = self.get_extreme_prices() - base_balance, quote_balance = self.api_get_balances() + _, highest_sell = self.get_extreme_prices() + base_balance, _ = self.api_get_balances() + if highest_sell is None: highest_sell = self.calculate_grid_price( lowest_buy, self.grid_count, "SELL" ) new_sell_price = self.calculate_grid_price(highest_sell, 1, "SELL") - if base_balance > self.reserve_base: + if base_balance - self.order_amount > self.reserve_base: if self.is_order_value_valid(new_sell_price): - sell_order_id = self.api_place_order(new_sell_price, "SELL") - if sell_order_id: - self.active_orders[sell_order_id] = Order( - order_id=sell_order_id, + if order_id := self.api_place_order(new_sell_price, "SELL"): + self.active_orders[order_id] = Order( + order_id=order_id, price=new_sell_price, quantity=self.order_amount, side="SELL", status="NEW", ) self.current_sell_levels += 1 + logger.info( + "扩展卖单网格到层级 %d 价格: %f", + self.current_sell_levels, + new_sell_price, + ) else: + logger.warning( + "卖单价值 %f 低于最小值 %f", + new_sell_price * self.order_amount, + self.min_order_value, + ) break else: + logger.debug("基础货币余额不足,无法扩展卖单网格") break except Exception as e: + logger.error("扩展网格失败: %s", str(e)) self.extend_grid() def adjust_grid_for_filled(self): + """根据成交订单调整网格""" try: - current_order_ids = set(self.active_orders.keys()) - newly_filled_orders = [ - order - for order in self.active_orders.values() - if order.order_id in current_order_ids and order.status == "FILLED" + # 找出新成交的订单 + filled_orders = [ + o for o in self.active_orders.values() if o.status == "FILLED" ] - if newly_filled_orders: - for filled_order in newly_filled_orders: - filled_side = filled_order.side - filled_price = filled_order.price - filled_id = filled_order.order_id - self.active_orders.pop(filled_id, None) - active_buys = [ - o - for o in self.active_orders.values() - if o.side == "BUY" and o.status == "NEW" - ] - active_sells = [ - o - for o in self.active_orders.values() - if o.side == "SELL" and o.status == "NEW" - ] - if filled_side == "BUY": - self.current_buy_levels -= 1 - elif filled_side == "SELL": - self.current_sell_levels -= 1 - if filled_side == "BUY" and active_sells: - highest_sell = max(active_sells, key=lambda x: x.price) + + for order in filled_orders: + logger.info( + "处理成交订单: %s %s %f", order.order_id, order.side, order.price + ) + + # 从活跃订单中移除 + self.active_orders.pop(order.order_id, None) + + if order.side == "BUY": + self.current_buy_levels -= 1 + # 买单成交时取消最高价卖单并下新卖单 + if sell_orders := self.get_active_orders_by_side("SELL"): + highest_sell = max(sell_orders, key=lambda x: x.price) self.api_cancel_order(highest_sell.order_id) self.active_orders.pop(highest_sell.order_id, None) + + # 下新卖单(卖0) new_sell_price = self.calculate_grid_price( - filled_price, 1, "SELL" + order.price, 1, "SELL" ) - base_balance, _ = self.api_get_balances() - if ( - base_balance > self.reserve_base - and self.is_order_value_valid(new_sell_price) - ): - sell_order_id = self.api_place_order(new_sell_price, "SELL") - if sell_order_id: - self.active_orders[sell_order_id] = Order( - order_id=sell_order_id, + if self.is_order_value_valid(new_sell_price): + if order_id := self.api_place_order(new_sell_price, "SELL"): + self.active_orders[order_id] = Order( + order_id=order_id, price=new_sell_price, quantity=self.order_amount, side="SELL", status="NEW", ) - elif filled_side == "SELL" and active_buys: - lowest_buy = min(active_buys, key=lambda x: x.price) + + elif order.side == "SELL": + self.current_sell_levels -= 1 + # 卖单成交时取消最低价买单并下新买单 + if buy_orders := self.get_active_orders_by_side("BUY"): + lowest_buy = min(buy_orders, key=lambda x: x.price) self.api_cancel_order(lowest_buy.order_id) self.active_orders.pop(lowest_buy.order_id, None) - new_buy_price = self.calculate_grid_price( - filled_price, 1, "BUY" - ) - _, quote_balance = self.api_get_balances() - if ( - quote_balance > self.reserve_quote - and self.is_order_value_valid(new_buy_price) - ): - buy_order_id = self.api_place_order(new_buy_price, "BUY") - if buy_order_id: - self.active_orders[buy_order_id] = Order( - order_id=buy_order_id, + + # 下新买单(买0) + new_buy_price = self.calculate_grid_price(order.price, 1, "BUY") + if self.is_order_value_valid(new_buy_price): + if order_id := self.api_place_order(new_buy_price, "BUY"): + self.active_orders[order_id] = Order( + order_id=order_id, price=new_buy_price, quantity=self.order_amount, side="BUY", status="NEW", ) except Exception as e: + logger.error("调整网格失败: %s", str(e)) self.adjust_grid_for_filled() def adjust_grid_for_violation(self): + """处理价格超出网格范围的情况""" try: market_price = self.api_get_price() lowest_buy, highest_sell = self.get_extreme_prices() - if lowest_buy is None and market_price < self.calculate_grid_price( - highest_sell, self.grid_count + 1, "BUY" - ): - self.api_cancel_all_orders() - self.initialize_grid() - elif highest_sell is None and market_price > self.calculate_grid_price( - lowest_buy, self.grid_count + 1, "SELL" - ): - self.api_cancel_all_orders() - self.initialize_grid() + + # 价格跌破最低买单网格 + if lowest_buy is None and highest_sell is not None: + threshold = self.calculate_grid_price( + highest_sell, self.grid_count + 1, "BUY" + ) + if market_price < threshold: + logger.warning( + "价格跌破网格下限(%f < %f),重置网格", + market_price, + threshold, + ) + self.api_cancel_all_orders() + self.initialize_grid() + + # 价格突破最高卖单网格 + elif highest_sell is None and lowest_buy is not None: + threshold = self.calculate_grid_price( + lowest_buy, self.grid_count + 1, "SELL" + ) + if market_price > threshold: + logger.warning( + "价格突破网格上限(%f > %f),重置网格", + market_price, + threshold, + ) + self.api_cancel_all_orders() + self.initialize_grid() except Exception as e: + logger.error("网格边界检查失败: %s", str(e)) self.adjust_grid_for_violation() def run(self): + """运行网格交易机器人主循环""" self.running = True + logger.info("启动网格交易机器人,交易对: %s", self.symbol) + try: self.initialize_grid() + while self.running: try: + # 更新订单状态 self.api_update_order_statuses() + + # 调整网格 self.adjust_grid_for_filled() self.adjust_grid_for_violation() + + # 扩展网格 self.extend_grid() + + # 控制循环频率 time.sleep(1) except Exception as e: + logger.error("主循环异常: %s,3秒后重试...", str(e)) time.sleep(3) except KeyboardInterrupt: - pass + logger.info("接收到键盘中断信号,停止机器人...") finally: self.stop() def stop(self): + """停止机器人运行""" self.running = False self.api_cancel_all_orders() + logger.info("网格交易机器人已停止") if __name__ == "__main__": + # 默认配置 config = { "symbol": "BTCUSDC", "csv_symbol": "BTCUSDT", "csv_file": "output/mexc_spot_grid_trades.csv", - "grid_percentage": 0.001, - "grid_count": 3, + "grid_percentage": 0.0002, + "grid_count": 4, "order_amount": 0.00001, "min_order_value": 1, "reserve_base": 0, "reserve_quote": 0, } + + logger.info("创建网格交易机器人实例") bot = GridTradingBot(config) bot.run()