563 lines
18 KiB
Python
563 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
MEXC 交易机器人
|
||
|
||
功能:
|
||
1. 从JSON配置文件读取交易指令
|
||
2. 支持按日期执行多个交易(*表示每天执行)
|
||
3. 支持多种订单类型 (LIMIT, MARKET, LIMIT_MAKER, IMMEDIATE_OR_CANCEL, FILL_OR_KILL)
|
||
4. 当无price时自动获取实时价格作为限价
|
||
5. LIMIT订单支持只提供quoteOrderQty自动计算quantity
|
||
6. 证券代码映射功能
|
||
7. 完整的交易记录和日志
|
||
|
||
类说明:
|
||
- MexcSpotMarket: 处理市场数据查询
|
||
- MexcSpotTrade: 处理现货交易逻辑
|
||
- TradingConfig: 管理交易配置
|
||
|
||
使用示例:
|
||
python main.py
|
||
"""
|
||
|
||
import csv
|
||
import logging
|
||
import os
|
||
import time
|
||
import json
|
||
from pathlib import Path
|
||
from datetime import datetime, date
|
||
from typing import Dict, Any, Optional, Tuple, List
|
||
|
||
import mexc_spot_v3
|
||
|
||
os.makedirs("output", exist_ok=True)
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
handlers=[
|
||
logging.FileHandler("output/mexc_trading_bot.log", encoding="utf-8"),
|
||
logging.StreamHandler(),
|
||
],
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MexcSpotMarket:
|
||
"""MEXC 市场数据查询类
|
||
|
||
提供获取交易对价格等功能
|
||
|
||
方法:
|
||
- get_price(symbol): 获取指定交易对的当前价格
|
||
"""
|
||
|
||
def __init__(self, config):
|
||
"""初始化市场数据查询接口"""
|
||
self.market = mexc_spot_v3.mexc_market(config)
|
||
|
||
def get_price(self, symbol: str) -> Optional[float]:
|
||
"""
|
||
获取指定交易对的当前价格
|
||
|
||
Args:
|
||
symbol: 交易对,如 "BTCUSDT"
|
||
|
||
Returns:
|
||
当前价格(浮点数)或None(如果失败)
|
||
|
||
Raises:
|
||
Exception: 当API调用失败时抛出异常
|
||
"""
|
||
|
||
params = {"symbol": symbol}
|
||
|
||
try:
|
||
logger.info("查询交易对价格: %s", symbol)
|
||
price_data = self.market.get_price(params)
|
||
|
||
if not price_data or "price" not in price_data:
|
||
logger.error("获取价格数据失败: %s", price_data)
|
||
return None
|
||
|
||
price_str = price_data["price"]
|
||
price = float(price_str)
|
||
logger.info("获取价格成功: %s = %f", symbol, price)
|
||
return price
|
||
|
||
except Exception as e:
|
||
logger.error("查询价格失败: %s", str(e))
|
||
return None
|
||
|
||
|
||
class MexcSpotTrade:
|
||
"""MEXC 现货交易类
|
||
|
||
处理所有现货交易逻辑,包括订单创建、状态查询和记录
|
||
|
||
属性:
|
||
- SYMBOL_MAPPING: 证券代码映射字典
|
||
- ORDER_TYPE_REQUIREMENTS: 各订单类型必需参数
|
||
|
||
方法:
|
||
- trade(): 执行现货交易
|
||
- _api_get_order(): 查询订单状态
|
||
- _tool_map_symbol(): 映射证券代码
|
||
- _tool_validate_order_params(): 验证订单参数
|
||
- _tool_record_transaction(): 记录交易到CSV
|
||
"""
|
||
|
||
# 订单类型与必需参数
|
||
ORDER_TYPE_REQUIREMENTS = {
|
||
"LIMIT": [
|
||
"quantity",
|
||
"price",
|
||
"quoteOrderQty",
|
||
], # quantity和price或quoteOrderQty和price
|
||
"MARKET": ["quantity", "quoteOrderQty"], # quantity或quoteOrderQty
|
||
"LIMIT_MAKER": ["quantity", "price"],
|
||
"IMMEDIATE_OR_CANCEL": ["quantity", "price"],
|
||
"FILL_OR_KILL": ["quantity", "price"],
|
||
}
|
||
|
||
def __init__(self, config, symbol_mapping, config_file_name):
|
||
"""初始化交易机器人"""
|
||
self.trader = mexc_spot_v3.mexc_trade(config)
|
||
self.market = MexcSpotMarket(config)
|
||
self.csv_file = f"output/{config_file_name}.csv"
|
||
self.symbol_mapping = symbol_mapping
|
||
|
||
def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
查询订单状态
|
||
|
||
Args:
|
||
symbol: 交易对
|
||
order_id: 订单ID
|
||
|
||
Returns:
|
||
订单详情字典或None(如果失败)
|
||
"""
|
||
|
||
params = {
|
||
"symbol": symbol,
|
||
"orderId": order_id,
|
||
}
|
||
|
||
try:
|
||
logger.info("查询订单状态, 订单ID: %s", order_id)
|
||
order = self.trader.get_order(params)
|
||
logger.info("订单状态: %s", order.get("status"))
|
||
return order
|
||
except Exception as e:
|
||
logger.error("查询订单失败: %s", str(e))
|
||
return None
|
||
|
||
def _tool_map_symbol(self, symbol: str) -> str:
|
||
"""映射证券代码用于记录"""
|
||
return self.symbol_mapping.get(symbol, symbol)
|
||
|
||
def _tool_validate_order_params(
|
||
self, order_type: str, params: Dict[str, Any]
|
||
) -> Tuple[bool, str]:
|
||
"""
|
||
验证订单参数是否符合要求
|
||
|
||
Args:
|
||
order_type: 订单类型
|
||
params: 订单参数
|
||
|
||
Returns:
|
||
元组(是否有效, 错误信息)
|
||
"""
|
||
|
||
required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, [])
|
||
|
||
if not required_params:
|
||
return False, f"未知的订单类型: {order_type}"
|
||
|
||
# 特殊处理LIMIT订单
|
||
if order_type == "LIMIT":
|
||
# 需要price和(quantity或quoteOrderQty)
|
||
if "price" not in params:
|
||
return False, "LIMIT订单需要price参数"
|
||
|
||
if "quantity" not in params and "quoteOrderQty" not in params:
|
||
return False, "LIMIT订单需要quantity或quoteOrderQty参数"
|
||
|
||
return True, ""
|
||
|
||
# 特殊处理MARKET订单
|
||
if order_type == "MARKET":
|
||
if "quantity" not in params and "quoteOrderQty" not in params:
|
||
return False, "MARKET订单需要quantity或quoteOrderQty参数"
|
||
return True, ""
|
||
|
||
# 检查其他订单类型的必需参数
|
||
missing_params = [p for p in required_params if p not in params]
|
||
if missing_params:
|
||
return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}"
|
||
|
||
return True, ""
|
||
|
||
def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool:
|
||
"""
|
||
记录交易到CSV文件
|
||
|
||
Args:
|
||
order_data: 订单数据字典
|
||
|
||
Returns:
|
||
是否成功记录
|
||
"""
|
||
|
||
try:
|
||
original_symbol = order_data["symbol"]
|
||
mapped_symbol = self._tool_map_symbol(original_symbol)
|
||
order_id = order_data["orderId"]
|
||
executed_qty = order_data["executedQty"]
|
||
cummulative_quote_qty = order_data["cummulativeQuoteQty"]
|
||
side = order_data["side"]
|
||
|
||
# 确定交易类型显示
|
||
trade_type = "买入" if side == "BUY" else "卖出"
|
||
|
||
timestamp = datetime.fromtimestamp(order_data["time"] / 1000).strftime(
|
||
"%Y-%m-%dT%H:%M"
|
||
)
|
||
|
||
row = [
|
||
timestamp,
|
||
trade_type,
|
||
mapped_symbol,
|
||
executed_qty,
|
||
cummulative_quote_qty,
|
||
"资金账户",
|
||
"CEX",
|
||
f"MEXC API - Order ID: {order_id}",
|
||
]
|
||
|
||
# 检查文件是否存在
|
||
file_exists = False
|
||
try:
|
||
with open(self.csv_file, "r", encoding="utf-8") as f:
|
||
file_exists = True
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
# 写入CSV
|
||
with open(self.csv_file, "a", newline="", encoding="utf-8") as f:
|
||
writer = csv.writer(f)
|
||
if not file_exists:
|
||
writer.writerow(
|
||
[
|
||
"日期",
|
||
"类型",
|
||
"证券代码",
|
||
"份额",
|
||
"净额",
|
||
"现金账户",
|
||
"目标账户",
|
||
"备注",
|
||
]
|
||
)
|
||
writer.writerow(row)
|
||
|
||
logger.info("交易记录成功, 订单ID: %s", order_id)
|
||
return True
|
||
except Exception as e:
|
||
logger.error("记录交易失败: %s", str(e))
|
||
return False
|
||
|
||
def trade(
|
||
self, symbol: str, order_type: str, side: str, **kwargs
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
执行现货交易
|
||
|
||
Args:
|
||
symbol: 交易对 (如 BTCUSDC)
|
||
order_type: 订单类型 (LIMIT, MARKET等)
|
||
side: 交易方向 (BUY, SELL)
|
||
**kwargs: 其他订单参数
|
||
|
||
Returns:
|
||
订单信息字典或None(如果失败)
|
||
|
||
Raises:
|
||
ValueError: 当参数验证失败时
|
||
Exception: 当API调用失败时
|
||
"""
|
||
|
||
# 基本参数验证
|
||
if side not in ["BUY", "SELL"]:
|
||
logger.error("无效的交易方向: %s", side)
|
||
return None
|
||
|
||
order_type = order_type.upper()
|
||
processed_kwargs = kwargs.copy()
|
||
|
||
# 处理无price的情况,获取实时价格
|
||
if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs:
|
||
current_price = self.market.get_price(symbol)
|
||
if current_price is None:
|
||
logger.error("无法获取实时价格,交易取消")
|
||
return None
|
||
# 防止挂单不成交
|
||
if side == "BUY":
|
||
processed_kwargs["price"] = current_price * 1.01 # 买入加价0.5%
|
||
elif side == "SELL":
|
||
processed_kwargs["price"] = current_price * 0.91 # 卖出减价0.5%
|
||
logger.info("使用调整0.5%%后价格作为限价: %f", processed_kwargs["price"])
|
||
|
||
# 处理LIMIT订单只有quoteOrderQty没有quantity的情况
|
||
if (
|
||
order_type in ["LIMIT", "LIMIT_MAKER"]
|
||
and "quoteOrderQty" in processed_kwargs
|
||
and "quantity" not in processed_kwargs
|
||
):
|
||
try:
|
||
quote_amount = float(processed_kwargs["quoteOrderQty"])
|
||
price = float(processed_kwargs["price"])
|
||
quantity = quote_amount / price
|
||
processed_kwargs["quantity"] = str(quantity)
|
||
processed_kwargs.pop("quoteOrderQty")
|
||
logger.info("根据quoteOrderQty计算quantity: %f", quantity)
|
||
except (ValueError, KeyError) as e:
|
||
logger.error("计算quantity失败: %s", str(e))
|
||
return None
|
||
|
||
# 准备订单参数
|
||
base_params = {
|
||
"symbol": symbol,
|
||
"side": side,
|
||
"type": order_type,
|
||
**processed_kwargs,
|
||
}
|
||
|
||
# 验证参数
|
||
is_valid, error_msg = self._tool_validate_order_params(order_type, base_params)
|
||
if not is_valid:
|
||
logger.error("订单参数验证失败: %s", error_msg)
|
||
return None
|
||
|
||
try:
|
||
logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
|
||
logger.debug("订单参数: %s", base_params)
|
||
|
||
# 测试订单
|
||
test_result = self.trader.post_order_test(base_params.copy())
|
||
if test_result != {}:
|
||
logger.error("订单测试失败,参数有误: %s", test_result)
|
||
return None
|
||
|
||
logger.info("订单参数测试通过,准备正式下单")
|
||
|
||
# 正式下单
|
||
order = self.trader.post_order(base_params.copy())
|
||
order_id = order.get("orderId")
|
||
|
||
if not order_id:
|
||
logger.error("下单失败: 未获取到订单ID")
|
||
logger.error(base_params.copy())
|
||
logger.error(order)
|
||
return None
|
||
|
||
logger.info("订单创建成功, 订单ID: %s", order_id)
|
||
|
||
# 查询订单详情
|
||
logger.info("等待1秒后查询订单状态...")
|
||
time.sleep(1)
|
||
order_detail = self._api_get_order(symbol, order_id)
|
||
if not order_detail:
|
||
logger.error("获取订单详情失败")
|
||
return None
|
||
|
||
# 如果不是FILLED则重复查询最多10次
|
||
retry_count = 0
|
||
while order_detail.get("status") != "FILLED" and retry_count < 10:
|
||
retry_count += 1
|
||
logger.info(
|
||
"订单未完成(状态: %s),等待1秒后第%s次重试查询...",
|
||
order_detail.get("status", "UNKNOWN"),
|
||
retry_count,
|
||
)
|
||
time.sleep(1)
|
||
order_detail = self._api_get_order(symbol, order_id)
|
||
if not order_detail:
|
||
logger.error("获取订单详情失败")
|
||
return None
|
||
|
||
# 记录交易
|
||
if order_detail.get("status") == "FILLED":
|
||
if not self._tool_record_transaction(order_detail):
|
||
logger.error("交易记录失败")
|
||
else:
|
||
logger.warning(
|
||
"订单未完成(状态: %s),未被记录到CSV。订单ID: %s",
|
||
order_detail.get("status", "UNKNOWN"),
|
||
order_id,
|
||
)
|
||
|
||
return order_detail
|
||
|
||
except Exception as e:
|
||
logger.error("交易执行失败: %s", str(e))
|
||
return None
|
||
|
||
|
||
class TradingConfig:
|
||
"""交易配置管理类
|
||
|
||
负责加载和管理交易配置
|
||
|
||
方法:
|
||
- get_today_trades(): 获取今天需要执行的交易列表
|
||
- _load_config(): 加载JSON配置文件
|
||
"""
|
||
|
||
def __init__(self, config_file: str = "config/trading_config.json"):
|
||
"""
|
||
初始化交易配置
|
||
|
||
Args:
|
||
config_file: 配置文件路径
|
||
"""
|
||
|
||
self.config_file = config_file
|
||
self.config_data = self._load_config()
|
||
|
||
def _load_config(self) -> Dict[str, Any]:
|
||
"""加载JSON配置文件"""
|
||
try:
|
||
with open(self.config_file, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
logger.info("成功加载配置文件: %s", self.config_file)
|
||
return config
|
||
except FileNotFoundError:
|
||
logger.error("配置文件不存在: %s", self.config_file)
|
||
return {}
|
||
except json.JSONDecodeError:
|
||
logger.error("配置文件格式错误,不是有效的JSON")
|
||
return {}
|
||
except Exception as e:
|
||
logger.error("加载配置文件时出错: %s", str(e))
|
||
return {}
|
||
|
||
def get_today_trades(self) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取今天需要执行的交易列表
|
||
|
||
Returns:
|
||
今天需要执行的交易配置列表
|
||
|
||
Example:
|
||
>>> config = TradingConfig()
|
||
>>> today_trades = config.get_today_trades()
|
||
>>> print(len(today_trades))
|
||
"""
|
||
|
||
if not self.config_data or "trades" not in self.config_data:
|
||
return []
|
||
|
||
today = date.today().isoformat()
|
||
today_trades = []
|
||
|
||
for trade in self.config_data["trades"]:
|
||
# 检查交易是否包含执行日期
|
||
if "execute_dates" not in trade:
|
||
continue
|
||
|
||
# 检查是否设置了每天执行(*)
|
||
if "*" in trade["execute_dates"]:
|
||
today_trades.append(trade)
|
||
continue
|
||
|
||
# 检查今天是否在执行日期列表中
|
||
if today in trade["execute_dates"]:
|
||
today_trades.append(trade)
|
||
|
||
return today_trades
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
|
||
# 确保config目录存在
|
||
if not os.path.exists("config"):
|
||
logger.error("配置目录 config 不存在")
|
||
return
|
||
|
||
# 获取config目录下所有json文件
|
||
config_files = list(Path("config").glob("*.json"))
|
||
|
||
if not config_files:
|
||
logger.info("配置目录中没有找到任何JSON文件")
|
||
return
|
||
|
||
logger.info("找到 %d 个配置文件需要处理", len(config_files))
|
||
|
||
for config_file in config_files:
|
||
try:
|
||
# 提取交易参数
|
||
logger.info("处理配置文件: %s", config_file)
|
||
config = TradingConfig(str(config_file))
|
||
spot_trader = MexcSpotTrade(
|
||
config.config_data.get("api", {}),
|
||
config.config_data.get("symbol_mapping", {}),
|
||
os.path.basename(config_file).replace('.json', '')
|
||
)
|
||
today_trades = config.get_today_trades()
|
||
|
||
if not today_trades:
|
||
logger.info("%s - 今天没有需要执行的交易", config_file)
|
||
continue
|
||
|
||
logger.info("%s - 今天有 %d 个交易需要执行", config_file, len(today_trades))
|
||
|
||
for trade_config in today_trades:
|
||
try:
|
||
symbol = trade_config.get("symbol")
|
||
order_type = trade_config.get("order_type")
|
||
side = trade_config.get("side")
|
||
params = trade_config.get("params", {})
|
||
|
||
if not all([symbol, order_type, side]):
|
||
logger.error(
|
||
"%s - 交易配置缺少必要参数: %s", config_file, trade_config
|
||
)
|
||
continue
|
||
|
||
logger.info(
|
||
"%s - 执行交易: %s %s %s", config_file, symbol, order_type, side
|
||
)
|
||
logger.debug("%s - 交易参数: %s", config_file, params)
|
||
|
||
# 执行交易
|
||
result = spot_trader.trade(
|
||
symbol=symbol, order_type=order_type, side=side, **params
|
||
)
|
||
|
||
if result:
|
||
logger.info("%s - 交易执行成功: %s", config_file, result)
|
||
else:
|
||
logger.error("%s - 交易执行失败", config_file)
|
||
|
||
except Exception as e:
|
||
logger.error("%s - 执行交易时出错: %s", config_file, str(e))
|
||
continue
|
||
|
||
except Exception as e:
|
||
logger.error("处理配置文件 %s 时出错: %s", config_file, str(e))
|
||
continue
|
||
|
||
logger.info("所有配置文件处理完毕")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|