Files
mexc-spot-dca-bot/main.py

563 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MEXC 交易机器人
功能:
1. 从JSON配置文件读取交易指令
2. 支持按日期执行多个交易(*表示每天执行)
3. 支持多种订单类型 (LIMIT, MARKET, LIMIT_MAKER, IMMEDIATE_OR_CANCEL, FILL_OR_KILL)
4. 当无price时自动获取实时价格作为限价
5. LIMIT订单支持只提供quoteOrderQty自动计算quantity
6. 证券代码映射功能
7. 完整的交易记录和日志
类说明:
- MexcSpotMarket: 处理市场数据查询
- MexcSpotTrade: 处理现货交易逻辑
- TradingConfig: 管理交易配置
使用示例:
python main.py
"""
import csv
import logging
import os
import time
import json
from pathlib import Path
from datetime import datetime, date
from typing import Dict, Any, Optional, Tuple, List
import mexc_spot_v3
os.makedirs("output", exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("output/mexc_trading_bot.log", encoding="utf-8"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
class MexcSpotMarket:
"""MEXC 市场数据查询类
提供获取交易对价格等功能
方法:
- get_price(symbol): 获取指定交易对的当前价格
"""
def __init__(self, config):
"""初始化市场数据查询接口"""
self.market = mexc_spot_v3.mexc_market(config)
def get_price(self, symbol: str) -> Optional[float]:
"""
获取指定交易对的当前价格
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
当前价格(浮点数)或None(如果失败)
Raises:
Exception: 当API调用失败时抛出异常
"""
params = {"symbol": symbol}
try:
logger.info("查询交易对价格: %s", symbol)
price_data = self.market.get_price(params)
if not price_data or "price" not in price_data:
logger.error("获取价格数据失败: %s", price_data)
return None
price_str = price_data["price"]
price = float(price_str)
logger.info("获取价格成功: %s = %f", symbol, price)
return price
except Exception as e:
logger.error("查询价格失败: %s", str(e))
return None
class MexcSpotTrade:
"""MEXC 现货交易类
处理所有现货交易逻辑,包括订单创建、状态查询和记录
属性:
- SYMBOL_MAPPING: 证券代码映射字典
- ORDER_TYPE_REQUIREMENTS: 各订单类型必需参数
方法:
- trade(): 执行现货交易
- _api_get_order(): 查询订单状态
- _tool_map_symbol(): 映射证券代码
- _tool_validate_order_params(): 验证订单参数
- _tool_record_transaction(): 记录交易到CSV
"""
# 订单类型与必需参数
ORDER_TYPE_REQUIREMENTS = {
"LIMIT": [
"quantity",
"price",
"quoteOrderQty",
], # quantity和price或quoteOrderQty和price
"MARKET": ["quantity", "quoteOrderQty"], # quantity或quoteOrderQty
"LIMIT_MAKER": ["quantity", "price"],
"IMMEDIATE_OR_CANCEL": ["quantity", "price"],
"FILL_OR_KILL": ["quantity", "price"],
}
def __init__(self, config, symbol_mapping, config_file_name):
"""初始化交易机器人"""
self.trader = mexc_spot_v3.mexc_trade(config)
self.market = MexcSpotMarket(config)
self.csv_file = f"output/{config_file_name}.csv"
self.symbol_mapping = symbol_mapping
def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]:
"""
查询订单状态
Args:
symbol: 交易对
order_id: 订单ID
Returns:
订单详情字典或None(如果失败)
"""
params = {
"symbol": symbol,
"orderId": order_id,
}
try:
logger.info("查询订单状态, 订单ID: %s", order_id)
order = self.trader.get_order(params)
logger.info("订单状态: %s", order.get("status"))
return order
except Exception as e:
logger.error("查询订单失败: %s", str(e))
return None
def _tool_map_symbol(self, symbol: str) -> str:
"""映射证券代码用于记录"""
return self.symbol_mapping.get(symbol, symbol)
def _tool_validate_order_params(
self, order_type: str, params: Dict[str, Any]
) -> Tuple[bool, str]:
"""
验证订单参数是否符合要求
Args:
order_type: 订单类型
params: 订单参数
Returns:
元组(是否有效, 错误信息)
"""
required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, [])
if not required_params:
return False, f"未知的订单类型: {order_type}"
# 特殊处理LIMIT订单
if order_type == "LIMIT":
# 需要price和(quantity或quoteOrderQty)
if "price" not in params:
return False, "LIMIT订单需要price参数"
if "quantity" not in params and "quoteOrderQty" not in params:
return False, "LIMIT订单需要quantity或quoteOrderQty参数"
return True, ""
# 特殊处理MARKET订单
if order_type == "MARKET":
if "quantity" not in params and "quoteOrderQty" not in params:
return False, "MARKET订单需要quantity或quoteOrderQty参数"
return True, ""
# 检查其他订单类型的必需参数
missing_params = [p for p in required_params if p not in params]
if missing_params:
return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}"
return True, ""
def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool:
"""
记录交易到CSV文件
Args:
order_data: 订单数据字典
Returns:
是否成功记录
"""
try:
original_symbol = order_data["symbol"]
mapped_symbol = self._tool_map_symbol(original_symbol)
order_id = order_data["orderId"]
executed_qty = order_data["executedQty"]
cummulative_quote_qty = order_data["cummulativeQuoteQty"]
side = order_data["side"]
# 确定交易类型显示
trade_type = "买入" if side == "BUY" else "卖出"
timestamp = datetime.fromtimestamp(order_data["time"] / 1000).strftime(
"%Y-%m-%dT%H:%M"
)
row = [
timestamp,
trade_type,
mapped_symbol,
executed_qty,
cummulative_quote_qty,
"资金账户",
"CEX",
f"MEXC API - Order ID: {order_id}",
]
# 检查文件是否存在
file_exists = False
try:
with open(self.csv_file, "r", encoding="utf-8") as f:
file_exists = True
except FileNotFoundError:
pass
# 写入CSV
with open(self.csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(
[
"日期",
"类型",
"证券代码",
"份额",
"净额",
"现金账户",
"目标账户",
"备注",
]
)
writer.writerow(row)
logger.info("交易记录成功, 订单ID: %s", order_id)
return True
except Exception as e:
logger.error("记录交易失败: %s", str(e))
return False
def trade(
self, symbol: str, order_type: str, side: str, **kwargs
) -> Optional[Dict[str, Any]]:
"""
执行现货交易
Args:
symbol: 交易对 (如 BTCUSDC)
order_type: 订单类型 (LIMIT, MARKET等)
side: 交易方向 (BUY, SELL)
**kwargs: 其他订单参数
Returns:
订单信息字典或None(如果失败)
Raises:
ValueError: 当参数验证失败时
Exception: 当API调用失败时
"""
# 基本参数验证
if side not in ["BUY", "SELL"]:
logger.error("无效的交易方向: %s", side)
return None
order_type = order_type.upper()
processed_kwargs = kwargs.copy()
# 处理无price的情况获取实时价格
if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs:
current_price = self.market.get_price(symbol)
if current_price is None:
logger.error("无法获取实时价格,交易取消")
return None
# 防止挂单不成交
if side == "BUY":
processed_kwargs["price"] = current_price * 1.01 # 买入加价0.5%
elif side == "SELL":
processed_kwargs["price"] = current_price * 0.91 # 卖出减价0.5%
logger.info("使用调整0.5%%后价格作为限价: %f", processed_kwargs["price"])
# 处理LIMIT订单只有quoteOrderQty没有quantity的情况
if (
order_type in ["LIMIT", "LIMIT_MAKER"]
and "quoteOrderQty" in processed_kwargs
and "quantity" not in processed_kwargs
):
try:
quote_amount = float(processed_kwargs["quoteOrderQty"])
price = float(processed_kwargs["price"])
quantity = quote_amount / price
processed_kwargs["quantity"] = str(quantity)
processed_kwargs.pop("quoteOrderQty")
logger.info("根据quoteOrderQty计算quantity: %f", quantity)
except (ValueError, KeyError) as e:
logger.error("计算quantity失败: %s", str(e))
return None
# 准备订单参数
base_params = {
"symbol": symbol,
"side": side,
"type": order_type,
**processed_kwargs,
}
# 验证参数
is_valid, error_msg = self._tool_validate_order_params(order_type, base_params)
if not is_valid:
logger.error("订单参数验证失败: %s", error_msg)
return None
try:
logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
logger.debug("订单参数: %s", base_params)
# 测试订单
test_result = self.trader.post_order_test(base_params.copy())
if test_result != {}:
logger.error("订单测试失败,参数有误: %s", test_result)
return None
logger.info("订单参数测试通过,准备正式下单")
# 正式下单
order = self.trader.post_order(base_params.copy())
order_id = order.get("orderId")
if not order_id:
logger.error("下单失败: 未获取到订单ID")
logger.error(base_params.copy())
logger.error(order)
return None
logger.info("订单创建成功, 订单ID: %s", order_id)
# 查询订单详情
logger.info("等待1秒后查询订单状态...")
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
if not order_detail:
logger.error("获取订单详情失败")
return None
# 如果不是FILLED则重复查询最多10次
retry_count = 0
while order_detail.get("status") != "FILLED" and retry_count < 10:
retry_count += 1
logger.info(
"订单未完成(状态: %s)等待1秒后第%s次重试查询...",
order_detail.get("status", "UNKNOWN"),
retry_count,
)
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
if not order_detail:
logger.error("获取订单详情失败")
return None
# 记录交易
if order_detail.get("status") == "FILLED":
if not self._tool_record_transaction(order_detail):
logger.error("交易记录失败")
else:
logger.warning(
"订单未完成(状态: %s)未被记录到CSV。订单ID: %s",
order_detail.get("status", "UNKNOWN"),
order_id,
)
return order_detail
except Exception as e:
logger.error("交易执行失败: %s", str(e))
return None
class TradingConfig:
"""交易配置管理类
负责加载和管理交易配置
方法:
- get_today_trades(): 获取今天需要执行的交易列表
- _load_config(): 加载JSON配置文件
"""
def __init__(self, config_file: str = "config/trading_config.json"):
"""
初始化交易配置
Args:
config_file: 配置文件路径
"""
self.config_file = config_file
self.config_data = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""加载JSON配置文件"""
try:
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
logger.info("成功加载配置文件: %s", self.config_file)
return config
except FileNotFoundError:
logger.error("配置文件不存在: %s", self.config_file)
return {}
except json.JSONDecodeError:
logger.error("配置文件格式错误不是有效的JSON")
return {}
except Exception as e:
logger.error("加载配置文件时出错: %s", str(e))
return {}
def get_today_trades(self) -> List[Dict[str, Any]]:
"""
获取今天需要执行的交易列表
Returns:
今天需要执行的交易配置列表
Example:
>>> config = TradingConfig()
>>> today_trades = config.get_today_trades()
>>> print(len(today_trades))
"""
if not self.config_data or "trades" not in self.config_data:
return []
today = date.today().isoformat()
today_trades = []
for trade in self.config_data["trades"]:
# 检查交易是否包含执行日期
if "execute_dates" not in trade:
continue
# 检查是否设置了每天执行(*)
if "*" in trade["execute_dates"]:
today_trades.append(trade)
continue
# 检查今天是否在执行日期列表中
if today in trade["execute_dates"]:
today_trades.append(trade)
return today_trades
def main():
"""主函数"""
# 确保config目录存在
if not os.path.exists("config"):
logger.error("配置目录 config 不存在")
return
# 获取config目录下所有json文件
config_files = list(Path("config").glob("*.json"))
if not config_files:
logger.info("配置目录中没有找到任何JSON文件")
return
logger.info("找到 %d 个配置文件需要处理", len(config_files))
for config_file in config_files:
try:
# 提取交易参数
logger.info("处理配置文件: %s", config_file)
config = TradingConfig(str(config_file))
spot_trader = MexcSpotTrade(
config.config_data.get("api", {}),
config.config_data.get("symbol_mapping", {}),
os.path.basename(config_file).replace('.json', '')
)
today_trades = config.get_today_trades()
if not today_trades:
logger.info("%s - 今天没有需要执行的交易", config_file)
continue
logger.info("%s - 今天有 %d 个交易需要执行", config_file, len(today_trades))
for trade_config in today_trades:
try:
symbol = trade_config.get("symbol")
order_type = trade_config.get("order_type")
side = trade_config.get("side")
params = trade_config.get("params", {})
if not all([symbol, order_type, side]):
logger.error(
"%s - 交易配置缺少必要参数: %s", config_file, trade_config
)
continue
logger.info(
"%s - 执行交易: %s %s %s", config_file, symbol, order_type, side
)
logger.debug("%s - 交易参数: %s", config_file, params)
# 执行交易
result = spot_trader.trade(
symbol=symbol, order_type=order_type, side=side, **params
)
if result:
logger.info("%s - 交易执行成功: %s", config_file, result)
else:
logger.error("%s - 交易执行失败", config_file)
except Exception as e:
logger.error("%s - 执行交易时出错: %s", config_file, str(e))
continue
except Exception as e:
logger.error("处理配置文件 %s 时出错: %s", config_file, str(e))
continue
logger.info("所有配置文件处理完毕")
if __name__ == "__main__":
main()