#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MEXC 交易机器人 功能: 1. 从JSON配置文件读取交易指令 2. 支持按日期执行多个交易(*表示每天执行) 3. 支持多种订单类型 (LIMIT, MARKET, LIMIT_MAKER, IMMEDIATE_OR_CANCEL, FILL_OR_KILL) 4. 当无price时自动获取实时价格作为限价 5. LIMIT订单支持只提供quoteOrderQty自动计算quantity 6. 证券代码映射功能 7. 完整的交易记录和日志 类说明: - MexcSpotMarket: 处理市场数据查询 - MexcSpotTrade: 处理现货交易逻辑 - TradingConfig: 管理交易配置 使用示例: python main.py """ import csv import logging import os import time import json from datetime import datetime, date from typing import Dict, Any, Optional, Tuple, List import mexc_spot_v3 os.makedirs("output", exist_ok=True) # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("output/mexc_trading_bot.log", encoding="utf-8"), logging.StreamHandler(), ], ) logger = logging.getLogger(__name__) class MexcSpotMarket: """MEXC 市场数据查询类 提供获取交易对价格等功能 方法: - get_price(symbol): 获取指定交易对的当前价格 """ def __init__(self): """初始化市场数据查询接口""" self.market = mexc_spot_v3.mexc_market() def get_price(self, symbol: str) -> Optional[float]: """ 获取指定交易对的当前价格 Args: symbol: 交易对,如 "BTCUSDT" Returns: 当前价格(浮点数)或None(如果失败) Raises: Exception: 当API调用失败时抛出异常 """ params = {"symbol": symbol} try: logger.info("查询交易对价格: %s", symbol) price_data = self.market.get_price(params) if not price_data or "price" not in price_data: logger.error("获取价格数据失败: %s", price_data) return None price_str = price_data["price"] price = float(price_str) logger.info("获取价格成功: %s = %f", symbol, price) return price except Exception as e: logger.error("查询价格失败: %s", str(e)) return None class MexcSpotTrade: """MEXC 现货交易类 处理所有现货交易逻辑,包括订单创建、状态查询和记录 属性: - SYMBOL_MAPPING: 证券代码映射字典 - ORDER_TYPE_REQUIREMENTS: 各订单类型必需参数 方法: - trade(): 执行现货交易 - _api_get_order(): 查询订单状态 - _tool_map_symbol(): 映射证券代码 - _tool_validate_order_params(): 验证订单参数 - _tool_record_transaction(): 记录交易到CSV """ # 订单类型与必需参数 ORDER_TYPE_REQUIREMENTS = { "LIMIT": [ "quantity", "price", "quoteOrderQty", ], # quantity和price或quoteOrderQty和price "MARKET": ["quantity", "quoteOrderQty"], # quantity或quoteOrderQty "LIMIT_MAKER": ["quantity", "price"], "IMMEDIATE_OR_CANCEL": ["quantity", "price"], "FILL_OR_KILL": ["quantity", "price"], } def __init__(self): """初始化交易机器人""" self.trader = mexc_spot_v3.mexc_trade() self.market = MexcSpotMarket() self.csv_file = "output/mexc_spot_trade.csv" self.symbol_mapping = TradingConfig().config_data.get("symbol_mapping", {}) def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]: """ 查询订单状态 Args: symbol: 交易对 order_id: 订单ID Returns: 订单详情字典或None(如果失败) """ params = { "symbol": symbol, "orderId": order_id, } try: logger.info("查询订单状态, 订单ID: %s", order_id) order = self.trader.get_order(params) logger.info("订单状态: %s", order.get("status")) return order except Exception as e: logger.error("查询订单失败: %s", str(e)) return None def _tool_map_symbol(self, symbol: str) -> str: """映射证券代码用于记录""" return self.symbol_mapping.get(symbol, symbol) def _tool_validate_order_params( self, order_type: str, params: Dict[str, Any] ) -> Tuple[bool, str]: """ 验证订单参数是否符合要求 Args: order_type: 订单类型 params: 订单参数 Returns: 元组(是否有效, 错误信息) """ required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, []) if not required_params: return False, f"未知的订单类型: {order_type}" # 特殊处理LIMIT订单 if order_type == "LIMIT": # 需要price和(quantity或quoteOrderQty) if "price" not in params: return False, "LIMIT订单需要price参数" if "quantity" not in params and "quoteOrderQty" not in params: return False, "LIMIT订单需要quantity或quoteOrderQty参数" return True, "" # 特殊处理MARKET订单 if order_type == "MARKET": if "quantity" not in params and "quoteOrderQty" not in params: return False, "MARKET订单需要quantity或quoteOrderQty参数" return True, "" # 检查其他订单类型的必需参数 missing_params = [p for p in required_params if p not in params] if missing_params: return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}" return True, "" def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool: """ 记录交易到CSV文件 Args: order_data: 订单数据字典 Returns: 是否成功记录 """ try: original_symbol = order_data["symbol"] mapped_symbol = self._tool_map_symbol(original_symbol) order_id = order_data["orderId"] executed_qty = order_data["executedQty"] cummulative_quote_qty = order_data["cummulativeQuoteQty"] side = order_data["side"] # 确定交易类型显示 trade_type = "买入" if side == "BUY" else "卖出" timestamp = datetime.fromtimestamp(order_data["time"] / 1000).strftime( "%Y-%m-%dT%H:%M" ) row = [ timestamp, trade_type, mapped_symbol, executed_qty, cummulative_quote_qty, "资金账户", "CEX", f"MEXC API - Order ID: {order_id}", ] # 检查文件是否存在 file_exists = False try: with open(self.csv_file, "r", encoding="utf-8") as f: file_exists = True except FileNotFoundError: pass # 写入CSV with open(self.csv_file, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if not file_exists: writer.writerow( [ "日期", "类型", "证券代码", "份额", "净额", "现金账户", "目标账户", "备注", ] ) writer.writerow(row) logger.info("交易记录成功, 订单ID: %s", order_id) return True except Exception as e: logger.error("记录交易失败: %s", str(e)) return False def trade( self, symbol: str, order_type: str, side: str, **kwargs ) -> Optional[Dict[str, Any]]: """ 执行现货交易 Args: symbol: 交易对 (如 BTCUSDC) order_type: 订单类型 (LIMIT, MARKET等) side: 交易方向 (BUY, SELL) **kwargs: 其他订单参数 Returns: 订单信息字典或None(如果失败) Raises: ValueError: 当参数验证失败时 Exception: 当API调用失败时 """ # 基本参数验证 if side not in ["BUY", "SELL"]: logger.error("无效的交易方向: %s", side) return None order_type = order_type.upper() processed_kwargs = kwargs.copy() # 处理无price的情况,获取实时价格 if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs: current_price = self.market.get_price(symbol) if current_price is None: logger.error("无法获取实时价格,交易取消") return None processed_kwargs["price"] = current_price logger.info("使用实时价格作为限价: %f", current_price) # 处理LIMIT订单只有quoteOrderQty没有quantity的情况 if ( order_type in ["LIMIT", "LIMIT_MAKER"] and "quoteOrderQty" in processed_kwargs and "quantity" not in processed_kwargs ): try: quote_amount = float(processed_kwargs["quoteOrderQty"]) price = float(processed_kwargs["price"]) quantity = quote_amount / price processed_kwargs["quantity"] = str(quantity) logger.info("根据quoteOrderQty计算quantity: %f", quantity) except (ValueError, KeyError) as e: logger.error("计算quantity失败: %s", str(e)) return None # 准备订单参数 base_params = { "symbol": symbol, "side": side, "type": order_type, **processed_kwargs, } # 验证参数 is_valid, error_msg = self._tool_validate_order_params(order_type, base_params) if not is_valid: logger.error("订单参数验证失败: %s", error_msg) return None try: logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol) logger.debug("订单参数: %s", base_params) # 测试订单 test_result = self.trader.post_order_test(base_params.copy()) if test_result != {}: logger.error("订单测试失败,参数有误: %s", test_result) return None logger.info("订单参数测试通过,准备正式下单") # 正式下单 order = self.trader.post_order(base_params.copy()) order_id = order.get("orderId") if not order_id: logger.error("下单失败: 未获取到订单ID") logger.error(base_params.copy()) logger.error(order) return None logger.info("订单创建成功, 订单ID: %s", order_id) # 查询订单详情 logger.info("等待1秒后查询订单状态...") time.sleep(1) order_detail = self._api_get_order(symbol, order_id) if not order_detail: logger.error("获取订单详情失败") return None # 如果不是FILLED则重复查询最多10次 retry_count = 0 while order_detail.get("status") != "FILLED" and retry_count < 10: retry_count += 1 logger.info( "订单未完成(状态: %s),等待1秒后第%s次重试查询...", order_detail.get("status", "UNKNOWN"), retry_count, ) time.sleep(1) order_detail = self._api_get_order(symbol, order_id) if not order_detail: logger.error("获取订单详情失败") return None # 记录交易 if order_detail.get("status") == "FILLED": if not self._tool_record_transaction(order_detail): logger.error("交易记录失败") else: logger.warning( "订单未完成(状态: %s),未被记录到CSV。订单ID: %s", order_detail.get("status", "UNKNOWN"), order_id, ) return order_detail except Exception as e: logger.error("交易执行失败: %s", str(e)) return None class TradingConfig: """交易配置管理类 负责加载和管理交易配置 方法: - get_today_trades(): 获取今天需要执行的交易列表 - _load_config(): 加载JSON配置文件 """ def __init__(self, config_file: str = "public/trading_config.json"): """ 初始化交易配置 Args: config_file: 配置文件路径 """ self.config_file = config_file self.config_data = self._load_config() def _load_config(self) -> Dict[str, Any]: """加载JSON配置文件""" try: with open(self.config_file, "r", encoding="utf-8") as f: config = json.load(f) logger.info("成功加载配置文件: %s", self.config_file) return config except FileNotFoundError: logger.error("配置文件不存在: %s", self.config_file) return {} except json.JSONDecodeError: logger.error("配置文件格式错误,不是有效的JSON") return {} except Exception as e: logger.error("加载配置文件时出错: %s", str(e)) return {} def get_today_trades(self) -> List[Dict[str, Any]]: """ 获取今天需要执行的交易列表 Returns: 今天需要执行的交易配置列表 Example: >>> config = TradingConfig() >>> today_trades = config.get_today_trades() >>> print(len(today_trades)) """ if not self.config_data or "trades" not in self.config_data: return [] today = date.today().isoformat() today_trades = [] for trade in self.config_data["trades"]: # 检查交易是否包含执行日期 if "execute_dates" not in trade: continue # 检查是否设置了每天执行(*) if "*" in trade["execute_dates"]: today_trades.append(trade) continue # 检查今天是否在执行日期列表中 if today in trade["execute_dates"]: today_trades.append(trade) return today_trades def main(): """主函数""" # 初始化交易配置 config = TradingConfig() # 获取今天需要执行的交易 today_trades = config.get_today_trades() if not today_trades: logger.info("今天没有需要执行的交易") return logger.info("今天有 %d 个交易需要执行", len(today_trades)) # 初始化交易类 spot_trader = MexcSpotTrade() # 执行每个交易 for trade_config in today_trades: try: # 提取交易参数 symbol = trade_config.get("symbol") order_type = trade_config.get("order_type") side = trade_config.get("side") params = trade_config.get("params", {}) if not all([symbol, order_type, side]): logger.error("交易配置缺少必要参数: %s", trade_config) continue logger.info("执行交易: %s %s %s", symbol, order_type, side) logger.debug("交易参数: %s", params) # 执行交易 result = spot_trader.trade( symbol=symbol, order_type=order_type, side=side, **params ) if result: logger.info("交易执行成功: %s", result) else: logger.error("交易执行失败") except Exception as e: logger.error("执行交易时出错: %s", str(e)) continue logger.info("执行完毕") if __name__ == "__main__": main()