Compare commits

..

1 Commits

Author SHA1 Message Date
bc41498e96 fix(trade): LIMIT calculated quantity algorithm
Closes #6
2025-08-10 02:39:29 +08:00
4 changed files with 923 additions and 222 deletions

1
config1 Submodule

Submodule config1 added at fab56fe709

607
main.py
View File

@@ -2,17 +2,22 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
定投交易机器人 MEXC 交易机器人
功能: 功能:
1. 从JSON配置文件读取交易指令 1. 从JSON配置文件读取交易指令
2. 支持按日期执行多个交易(*表示每天执行) 2. 支持按日期执行多个交易(*表示每天执行)
3. 支持多种订单类型 (limit, market) 3. 支持多种订单类型 (LIMIT, MARKET, LIMIT_MAKER, IMMEDIATE_OR_CANCEL, FILL_OR_KILL)
4. 当无price时自动获取实时价格作为限价 4. 当无price时自动获取实时价格作为限价
5. limit订单支持只提供quoteOrderQty自动计算quantity 5. LIMIT订单支持只提供quoteOrderQty自动计算quantity
6. 证券代码映射功能 6. 证券代码映射功能
7. 完整的交易记录和日志 7. 完整的交易记录和日志
类说明:
- MexcSpotMarket: 处理市场数据查询
- MexcSpotTrade: 处理现货交易逻辑
- TradingConfig: 管理交易配置
使用示例: 使用示例:
python main.py python main.py
""" """
@@ -26,7 +31,8 @@ from pathlib import Path
from datetime import datetime, date from datetime import datetime, date
from typing import Dict, Any, Optional, Tuple, List from typing import Dict, Any, Optional, Tuple, List
import git import git
import ccxt
import mexc_spot_v3
os.makedirs("output", exist_ok=True) os.makedirs("output", exist_ok=True)
@@ -35,150 +41,223 @@ logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[ handlers=[
logging.FileHandler("output/trading_bot.log", encoding="utf-8"), logging.FileHandler("output/mexc_trading_bot.log", encoding="utf-8"),
logging.StreamHandler(), logging.StreamHandler(),
], ],
) )
logger = logging.getLogger(__name__)
class BotSpotMarket: class MexcSpotMarket:
"""市场数据查询类""" """MEXC 市场数据查询类
def __init__(self, exchange: ccxt.Exchange): 提供获取交易对价格等功能
self.exchange = exchange
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__) 方法:
try: - get_exchange_info(symbol): 获取交易对信息
self.exchange.load_markets() - get_price(symbol): 获取指定交易对的当前价格
self.logger.info("加载市场信息成功") """
except Exception as e:
self.logger.error("加载市场信息失败: %s", str(e)) def __init__(self, config):
"""初始化市场数据查询接口"""
self.market = mexc_spot_v3.mexc_market(config)
def get_exchange_info(self, symbol: str) -> Optional[Dict[str, Any]]: def get_exchange_info(self, symbol: str) -> Optional[Dict[str, Any]]:
"""获取交易对信息,返回格式与原始代码兼容""" """
获取交易对信息
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
交易对信息字典或None(如果失败)
"""
params = {"symbol": symbol}
try: try:
self.logger.info("查询交易对信息: %s", symbol) logger.info("查询交易对信息: %s", symbol)
if symbol not in self.exchange.markets: exchange_info = self.market.get_exchangeInfo(params)
self.logger.warning("市场信息中未找到 %s,尝试重新加载", symbol)
self.exchange.load_markets() if not exchange_info or "symbols" not in exchange_info:
market = self.exchange.markets.get(symbol) logger.error("获取交易对信息失败: %s", exchange_info)
if not market:
self.logger.error("交易对 %s 不存在", symbol)
return None return None
exchange_info = { logger.info("获取交易对信息成功")
"symbols": [
{
"baseAssetPrecision": market["precision"]["amount"],
"quoteAmountPrecision": str(market["limits"]["cost"]["min"] or 0),
}
]
}
self.logger.info("获取交易对信息成功: %s", symbol)
return exchange_info return exchange_info
except Exception as e: except Exception as e:
self.logger.error("查询交易所信息失败: %s", str(e)) logger.error("查询交易所信息失败: %s", str(e))
return None return None
def get_price(self, symbol: str) -> Optional[float]: def get_price(self, symbol: str) -> Optional[float]:
"""获取指定交易对的当前价格""" """
获取指定交易对的当前价格
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
当前价格(浮点数)或None(如果失败)
Raises:
Exception: 当API调用失败时抛出异常
"""
params = {"symbol": symbol}
try: try:
self.logger.info("查询交易对价格: %s", symbol) logger.info("查询交易对价格: %s", symbol)
ticker = self.exchange.fetch_ticker(symbol) price_data = self.market.get_price(params)
if not ticker or "last" not in ticker:
self.logger.error("获取价格数据失败: %s", ticker) if not price_data or "price" not in price_data:
logger.error("获取价格数据失败: %s", price_data)
return None return None
price = float(ticker["last"])
self.logger.info("获取价格成功: %s = %f", symbol, price) price_str = price_data["price"]
price = float(price_str)
logger.info("获取价格成功: %s = %f", symbol, price)
return price return price
except Exception as e: except Exception as e:
self.logger.error("查询价格失败: %s", str(e)) logger.error("查询价格失败: %s", str(e))
return None return None
class BotSpotTrade: class MexcSpotTrade:
"""现货交易类""" """MEXC 现货交易类
处理所有现货交易逻辑,包括订单创建、状态查询和记录
属性:
- SYMBOL_MAPPING: 证券代码映射字典
- ORDER_TYPE_REQUIREMENTS: 各订单类型必需参数
方法:
- trade(): 执行现货交易
- _api_get_order(): 查询订单状态
- _tool_map_symbol(): 映射证券代码
- _tool_validate_order_params(): 验证订单参数
- _tool_record_transaction(): 记录交易到CSV
"""
# 订单类型与必需参数
ORDER_TYPE_REQUIREMENTS = { ORDER_TYPE_REQUIREMENTS = {
"limit": ["quantity", "price", "quoteOrderQty"], "LIMIT": [
"market": ["quantity", "quoteOrderQty"] "quantity",
"price",
"quoteOrderQty",
], # quantity和price或quoteOrderQty和price
"MARKET": ["quantity", "quoteOrderQty"], # quantity或quoteOrderQty
"LIMIT_MAKER": ["quantity", "price"],
"IMMEDIATE_OR_CANCEL": ["quantity", "price"],
"FILL_OR_KILL": ["quantity", "price"],
} }
def __init__(self, exchange: ccxt.Exchange, symbol_mapping: Dict[str, str], config_file_name: str): def __init__(self, config, symbol_mapping, config_file_name):
self.exchange = exchange """初始化交易机器人"""
self.market = BotSpotMarket(exchange) self.trader = mexc_spot_v3.mexc_trade(config)
self.market = MexcSpotMarket(config)
self.csv_file = f"output/{config_file_name}.csv" self.csv_file = f"output/{config_file_name}.csv"
self.symbol_mapping = symbol_mapping self.symbol_mapping = symbol_mapping
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def _api_get_balance(self) -> str:
try:
self.logger.info("查询账户余额")
balance = self.exchange.fetch_balance()
free = balance.get("free", {})
balances = ""
for asset, amount in free.items():
if amount and amount > 0:
balances += f"{amount} {asset} "
self.logger.info("获取账户余额成功")
return balances
except Exception as e:
self.logger.error("查询账户信息失败: %s", str(e))
return f"ERROR: {str(e)}"
def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]: def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]:
"""
查询订单状态
Args:
symbol: 交易对
order_id: 订单ID
Returns:
订单详情字典或None(如果失败)
"""
params = {
"symbol": symbol,
"orderId": order_id,
}
try: try:
self.logger.info("查询订单状态, 订单ID: %s", order_id) logger.info("查询订单状态, 订单ID: %s", order_id)
order = self.exchange.fetch_order(order_id, symbol) order = self.trader.get_order(params)
self.logger.info("订单状态: %s", order.get("status")) logger.info("订单状态: %s", order.get("status"))
return order return order
except Exception as e: except Exception as e:
self.logger.error("查询订单失败: %s", str(e)) logger.error("查询订单失败: %s", str(e))
return None return None
def _tool_map_symbol(self, symbol: str) -> str: def _tool_map_symbol(self, symbol: str) -> str:
"""映射证券代码用于记录"""
return self.symbol_mapping.get(symbol, symbol) return self.symbol_mapping.get(symbol, symbol)
def _tool_validate_order_params(self, order_type: str, params: Dict[str, Any]) -> Tuple[bool, str]: def _tool_validate_order_params(
self, order_type: str, params: Dict[str, Any]
) -> Tuple[bool, str]:
"""
验证订单参数是否符合要求
Args:
order_type: 订单类型
params: 订单参数
Returns:
元组(是否有效, 错误信息)
"""
required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, []) required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, [])
if not required_params: if not required_params:
return False, f"未知的订单类型: {order_type}" return False, f"未知的订单类型: {order_type}"
if order_type == "limit": # 特殊处理LIMIT订单
if order_type == "LIMIT":
# 需要price和(quantity或quoteOrderQty)
if "price" not in params: if "price" not in params:
return False, "limit订单需要price参数" return False, "LIMIT订单需要price参数"
if "quantity" not in params and "quoteOrderQty" not in params: if "quantity" not in params and "quoteOrderQty" not in params:
return False, "limit订单需要quantity或quoteOrderQty参数" return False, "LIMIT订单需要quantity或quoteOrderQty参数"
return True, "" return True, ""
if order_type == "market": # 特殊处理MARKET订单
if order_type == "MARKET":
if "quantity" not in params and "quoteOrderQty" not in params: if "quantity" not in params and "quoteOrderQty" not in params:
return False, "market订单需要quantity或quoteOrderQty参数" return False, "MARKET订单需要quantity或quoteOrderQty参数"
return True, "" return True, ""
# 检查其他订单类型的必需参数
missing_params = [p for p in required_params if p not in params] missing_params = [p for p in required_params if p not in params]
if missing_params: if missing_params:
return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}" return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}"
return True, "" return True, ""
def _tool_sci_to_decimal(self, num, precision = 30):
if isinstance(num, str):
num = float(num)
result = f"{num:.{precision}f}"
result = result.rstrip('0').rstrip('.')
return result
def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool: def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool:
"""
记录交易到CSV文件
Args:
order_data: 订单数据字典
Returns:
是否成功记录
"""
try: try:
original_symbol = order_data["symbol"] original_symbol = order_data["symbol"]
mapped_symbol = self._tool_map_symbol(original_symbol) mapped_symbol = self._tool_map_symbol(original_symbol)
order_id = order_data["id"] order_id = order_data["orderId"]
executed_qty = self._tool_sci_to_decimal(order_data.get("filled", 0.0)) executed_qty = order_data["executedQty"]
cummulative_quote_qty = self._tool_sci_to_decimal(order_data.get("cost", 0.0)) cummulative_quote_qty = order_data["cummulativeQuoteQty"]
side = order_data["side"] side = order_data["side"]
balances = self._api_get_balance()
trade_type = "买入" if side == "buy" else "卖出" # 确定交易类型显示
timestamp = datetime.fromtimestamp(order_data["timestamp"] / 1000).strftime("%Y-%m-%dT%H:%M") trade_type = "买入" if side == "BUY" else "卖出"
timestamp = datetime.fromtimestamp(order_data["time"] / 1000).strftime(
"%Y-%m-%dT%H:%M"
)
row = [ row = [
timestamp, timestamp,
@@ -188,10 +267,10 @@ class BotSpotTrade:
cummulative_quote_qty, cummulative_quote_qty,
"资金账户", "资金账户",
"CEX", "CEX",
f"DCA Order ID: {order_id}", f"MEXC API - Order ID: {order_id}",
balances,
] ]
# 检查文件是否存在
file_exists = False file_exists = False
try: try:
with open(self.csv_file, "r", encoding="utf-8") as f: with open(self.csv_file, "r", encoding="utf-8") as f:
@@ -199,118 +278,191 @@ class BotSpotTrade:
except FileNotFoundError: except FileNotFoundError:
pass pass
# 写入CSV
with open(self.csv_file, "a", newline="", encoding="utf-8") as f: with open(self.csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f) writer = csv.writer(f)
if not file_exists: if not file_exists:
writer.writerow([ writer.writerow(
"日期", "类型", "证券代码", "份额", "净额", [
"现金账户", "目标账户", "备注", "balances", "日期",
]) "类型",
"证券代码",
"份额",
"净额",
"现金账户",
"目标账户",
"备注",
]
)
writer.writerow(row) writer.writerow(row)
self.logger.info("交易记录成功, 订单ID: %s", order_id) logger.info("交易记录成功, 订单ID: %s", order_id)
return True return True
except Exception as e: except Exception as e:
self.logger.error("记录交易失败: %s", str(e)) logger.error("记录交易失败: %s", str(e))
return False return False
def trade(self, symbol: str, order_type: str, side: str, **kwargs) -> Optional[Dict[str, Any]]: def _tool_calculate_quantity(
if side not in ["buy", "sell"]: self,
self.logger.error("无效的交易方向: %s", side) quantity: float,
price: float,
base_asset_precision: int,
quote_amount_precision: float,
) -> float:
"""
调整下单数量以满足最小成交额要求。
策略说明:
- 计算出的quantity如果乘以price后小于交易对的最小成交额(quoteAmountPrecision)
则将quantity增加一个最小单位(10^(-base_asset_precision)),确保下单金额满足交易所要求。
Args:
quantity: 初步计算出的下单数量
price: 当前价格
base_asset_precision: 交易对基础资产的小数精度
quote_amount_precision: 交易对最小成交额
Returns:
满足最小成交额要求的下单数量
"""
processed_quantity = round(quantity, base_asset_precision)
if processed_quantity * price < quote_amount_precision:
logger.info(
"计算的quantity小于最低要求%f * %f = %f < %f,进行调整",
processed_quantity,
price,
processed_quantity * price,
quote_amount_precision,
)
processed_quantity = round(
quantity + 10 ** (-base_asset_precision), base_asset_precision
)
logger.info("调整后的quantity: %f", processed_quantity)
return processed_quantity
def trade(
self, symbol: str, order_type: str, side: str, **kwargs
) -> Optional[Dict[str, Any]]:
"""
执行现货交易
Args:
symbol: 交易对 (如 BTCUSDC)
order_type: 订单类型 (LIMIT, MARKET等)
side: 交易方向 (BUY, SELL)
**kwargs: 其他订单参数
Returns:
订单信息字典或None(如果失败)
Raises:
ValueError: 当参数验证失败时
Exception: 当API调用失败时
"""
# 基本参数验证
if side not in ["BUY", "SELL"]:
logger.error("无效的交易方向: %s", side)
return None return None
order_type = order_type.upper()
processed_kwargs = kwargs.copy() processed_kwargs = kwargs.copy()
# 记录未经过偏移的价格以供LIMIT订单只有quoteOrderQty没有quantity的情况使用
# 参数有price时直接使用否则就为实时价格
clean_price = processed_kwargs.get("price") clean_price = processed_kwargs.get("price")
if order_type == "limit" and "price" not in processed_kwargs: # 处理无price的情况获取实时价格
if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs:
current_price = self.market.get_price(symbol) current_price = self.market.get_price(symbol)
if current_price is None: if current_price is None:
self.logger.error("无法获取实时价格,交易取消") logger.error("无法获取实时价格,交易取消")
return None return None
clean_price = current_price clean_price = current_price
# 防止挂单不成交 # 防止挂单不成交
if side == "buy": if side == "BUY":
processed_kwargs["price"] = current_price * 1.001 # 买入加价0.1% processed_kwargs["price"] = current_price * 1.01 # 买入加价1%
elif side == "sell": elif side == "SELL":
processed_kwargs["price"] = current_price * 0.999 # 卖出减价0.1% processed_kwargs["price"] = current_price * 0.99 # 卖出减价1%
self.logger.info("使用调整0.1%%后价格作为限价: %f", processed_kwargs["price"]) logger.info("使用调整1%%后价格作为限价: %f", processed_kwargs["price"])
# 处理LIMIT订单只有quoteOrderQty没有quantity的情况
if ( if (
order_type == "limit" order_type in ["LIMIT", "LIMIT_MAKER"]
and "quoteOrderQty" in processed_kwargs and "quoteOrderQty" in processed_kwargs
and "quantity" not in processed_kwargs and "quantity" not in processed_kwargs
): ):
try: try:
exchange_info = self.market.get_exchange_info(symbol) exchange_info = self.market.get_exchange_info(symbol)
if not exchange_info:
return None
quote_amount = float(processed_kwargs["quoteOrderQty"]) quote_amount = float(processed_kwargs["quoteOrderQty"])
if clean_price is None: quantity = quote_amount / clean_price
self.logger.error("无法获取价格来计算数量") base_asset_precision = int(
return None exchange_info["symbols"][0]["baseAssetPrecision"]
price_for_calc = float(clean_price) )
quantity = quote_amount / price_for_calc quote_amount_precision = float(
self.logger.info("根据quoteOrderQty计算quantity: %f", quantity) exchange_info["symbols"][0]["quoteAmountPrecision"]
processed_kwargs["quantity"] = str(quantity) )
processed_quantity = self._tool_calculate_quantity(
quantity, clean_price, base_asset_precision, quote_amount_precision
)
logger.info("根据quoteOrderQty计算quantity: %f", processed_quantity)
processed_kwargs["quantity"] = str(processed_quantity)
processed_kwargs.pop("quoteOrderQty") processed_kwargs.pop("quoteOrderQty")
except (ValueError, KeyError, TypeError) as e: except (ValueError, KeyError) as e:
self.logger.error("计算quantity失败: %s", str(e)) logger.error("计算quantity失败: %s", str(e))
return None return None
amount = processed_kwargs.get("quantity") # 准备订单参数
price = processed_kwargs.get("price") base_params = {
if amount is not None: "symbol": symbol,
amount = float(amount) "side": side,
if price is not None: "type": order_type,
price = float(price) **processed_kwargs,
}
params = {} # 验证参数
if order_type == "market" and "quoteOrderQty" in processed_kwargs: is_valid, error_msg = self._tool_validate_order_params(order_type, base_params)
current_price = self.market.get_price(symbol)
if current_price is None:
self.logger.error("无法获取实时价格,交易取消")
return None
clean_price = current_price
amount = float(processed_kwargs["quoteOrderQty"]) / clean_price
is_valid, error_msg = self._tool_validate_order_params(order_type, processed_kwargs)
if not is_valid: if not is_valid:
self.logger.error("订单参数验证失败: %s", error_msg) logger.error("订单参数验证失败: %s", error_msg)
return None return None
try: try:
self.logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol) logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
self.logger.info("订单参数: symbol=%s, type=%s, side=%s, amount=%s, price=%s, params=%s", logger.debug("订单参数: %s", base_params)
symbol, order_type, side, amount, price, params)
order = self.exchange.create_order( # 测试订单
symbol=symbol, test_result = self.trader.post_order_test(base_params.copy())
type=order_type, if test_result != {}:
side=side, logger.error("订单测试失败,参数有误: %s", test_result)
amount=amount,
price=price,
params=params,
)
order_id = order.get("id")
if not order_id:
self.logger.error("下单失败: 未获取到订单ID")
self.logger.error(order)
return None return None
self.logger.info("订单创建成功, 订单ID: %s", order_id) logger.info("订单参数测试通过,准备正式下单")
self.logger.info("等待1秒后查询订单状态...")
# 正式下单
order = self.trader.post_order(base_params.copy())
order_id = order.get("orderId")
if not order_id:
logger.error("下单失败: 未获取到订单ID")
logger.error(base_params.copy())
logger.error(order)
return None
logger.info("订单创建成功, 订单ID: %s", order_id)
# 查询订单详情
logger.info("等待1秒后查询订单状态...")
time.sleep(1) time.sleep(1)
order_detail = self._api_get_order(symbol, order_id) order_detail = self._api_get_order(symbol, order_id)
if not order_detail: if not order_detail:
self.logger.error("获取订单详情失败") logger.error("获取订单详情失败")
return None return None
# 如果不是FILLED则重复查询最多10次
retry_count = 0 retry_count = 0
filled_statuses = ("closed", "filled") while order_detail.get("status") != "FILLED" and retry_count < 10:
while order_detail.get("status", "").lower() not in filled_statuses and retry_count < 10:
retry_count += 1 retry_count += 1
self.logger.info( logger.info(
"订单未完成(状态: %s)等待1秒后第%s次重试查询...", "订单未完成(状态: %s)等待1秒后第%s次重试查询...",
order_detail.get("status", "UNKNOWN"), order_detail.get("status", "UNKNOWN"),
retry_count, retry_count,
@@ -318,14 +470,15 @@ class BotSpotTrade:
time.sleep(1) time.sleep(1)
order_detail = self._api_get_order(symbol, order_id) order_detail = self._api_get_order(symbol, order_id)
if not order_detail: if not order_detail:
self.logger.error("获取订单详情失败") logger.error("获取订单详情失败")
return None return None
if order_detail.get("status", "").lower() in filled_statuses: # 记录交易
if order_detail.get("status") == "FILLED":
if not self._tool_record_transaction(order_detail): if not self._tool_record_transaction(order_detail):
self.logger.error("交易记录失败") logger.error("交易记录失败")
else: else:
self.logger.warning( logger.warning(
"订单未完成(状态: %s)未被记录到CSV。订单ID: %s", "订单未完成(状态: %s)未被记录到CSV。订单ID: %s",
order_detail.get("status", "UNKNOWN"), order_detail.get("status", "UNKNOWN"),
order_id, order_id,
@@ -334,99 +487,114 @@ class BotSpotTrade:
return order_detail return order_detail
except Exception as e: except Exception as e:
self.logger.error("交易执行失败: %s", str(e)) logger.error("交易执行失败: %s", str(e))
return None return None
class TradingConfig: class TradingConfig:
"""交易配置管理类""" """交易配置管理类
负责加载和管理交易配置
方法:
- get_today_trades(): 获取今天需要执行的交易列表
- _load_config(): 加载JSON配置文件
"""
def __init__(self, config_file: str = "config/trading_config.json"): def __init__(self, config_file: str = "config/trading_config.json"):
"""
初始化交易配置
Args:
config_file: 配置文件路径
"""
self.config_file = config_file self.config_file = config_file
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
self.config_data = self._load_config() self.config_data = self._load_config()
def _load_config(self) -> Dict[str, Any]: def _load_config(self) -> Dict[str, Any]:
"""加载JSON配置文件"""
try: try:
with open(self.config_file, "r", encoding="utf-8") as f: with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f) config = json.load(f)
self.logger.info("成功加载配置文件: %s", self.config_file) logger.info("成功加载配置文件: %s", self.config_file)
return config return config
except FileNotFoundError: except FileNotFoundError:
self.logger.error("配置文件不存在: %s", self.config_file) logger.error("配置文件不存在: %s", self.config_file)
return {} return {}
except json.JSONDecodeError: except json.JSONDecodeError:
self.logger.error("配置文件格式错误不是有效的JSON") logger.error("配置文件格式错误不是有效的JSON")
return {} return {}
except Exception as e: except Exception as e:
self.logger.error("加载配置文件时出错: %s", str(e)) logger.error("加载配置文件时出错: %s", str(e))
return {} return {}
def get_today_trades(self) -> List[Dict[str, Any]]: def get_today_trades(self) -> List[Dict[str, Any]]:
"""
获取今天需要执行的交易列表
Returns:
今天需要执行的交易配置列表
Example:
>>> config = TradingConfig()
>>> today_trades = config.get_today_trades()
>>> print(len(today_trades))
"""
if not self.config_data or "trades" not in self.config_data: if not self.config_data or "trades" not in self.config_data:
return [] return []
today = date.today().isoformat() today = date.today().isoformat()
today_trades = [] today_trades = []
for trade in self.config_data["trades"]: for trade in self.config_data["trades"]:
# 检查交易是否包含执行日期
if "execute_dates" not in trade: if "execute_dates" not in trade:
continue continue
# 检查是否设置了每天执行(*)
if "*" in trade["execute_dates"]: if "*" in trade["execute_dates"]:
today_trades.append(trade) today_trades.append(trade)
continue continue
# 检查今天是否在执行日期列表中
if today in trade["execute_dates"]: if today in trade["execute_dates"]:
today_trades.append(trade) today_trades.append(trade)
return today_trades return today_trades
def build_ccxt_exchange(api_config: Dict[str, Any], exchange_id: str) -> ccxt.Exchange: def git_commit(repo_path: str = ".") -> str:
""" """获取Git仓库版本"""
根据配置文件和交易所ID构建 ccxt 交易所实例
:param api_config: 包含 api_key, api_secret 等字段的字典
:param exchange_id: ccxt 交易所标识符(小写),如 "mexc", "binance", "bybit"
"""
api_key = api_config.get("api_key")
secret = api_config.get("api_secret")
exchange_class = getattr(ccxt, exchange_id, None)
if exchange_class is None:
raise ValueError(f"不支持的交易所ID: {exchange_id},请检查配置文件中的 'exchange' 字段")
exchange_params = {
"apiKey": api_key,
"secret": secret,
"enableRatelimit": True,
}
if "password" in api_config:
exchange_params["password"] = api_config["password"]
return exchange_class(exchange_params)
def git_commit(repo_path: str = ".") -> Optional[str]:
try: try:
repo = git.Repo(repo_path) repo = git.Repo(repo_path)
return repo.head.commit.hexsha return repo.head.commit.hexsha
except Exception: except Exception as _:
return None return None
def main(): def main():
logger = logging.getLogger(f"{__name__}.main") """主函数"""
logger.info("=" * 40) logger.info("=" * 40)
# 获取主程序Git仓库版本
app_commit = git_commit(".")[:10]
app_commit = git_commit(".") # 确保config目录存在
app_commit_short = app_commit[:10] if app_commit else "unknown"
if not os.path.exists("config"): if not os.path.exists("config"):
logger.error("配置目录 config 不存在") logger.error("配置目录 config 不存在")
return return
config_commit = git_commit("config") # 获取config, output仓库版本
config_commit_short = config_commit[:10] if config_commit else "unknown" config_commit = git_commit("config")[:10]
output_commit = git_commit("output") output_commit = git_commit("output")[:10]
output_commit_short = output_commit[:10] if output_commit else "unknown" logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit, config_commit, output_commit)
logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit_short, config_commit_short, output_commit_short)
# 获取config目录下所有json文件
config_files = list(Path("config").glob("*.json")) config_files = list(Path("config").glob("*.json"))
if not config_files: if not config_files:
logger.info("配置目录中没有找到任何JSON文件") logger.info("配置目录中没有找到任何JSON文件")
return return
@@ -435,22 +603,16 @@ def main():
for config_file in config_files: for config_file in config_files:
try: try:
# 提取交易参数
logger.info("处理配置文件: %s", config_file) logger.info("处理配置文件: %s", config_file)
config = TradingConfig(str(config_file)) config = TradingConfig(str(config_file))
spot_trader = MexcSpotTrade(
exchange_id = config.config_data.get("exchange") config.config_data.get("api", {}),
logger.info("使用交易所: %s", exchange_id) config.config_data.get("symbol_mapping", {}),
os.path.basename(config_file).replace(".json", ""),
api_conf = config.config_data.get("api", {})
exchange = build_ccxt_exchange(api_conf, exchange_id)
spot_trader = BotSpotTrade(
exchange=exchange,
symbol_mapping=config.config_data.get("symbol_mapping", {}),
config_file_name=os.path.basename(config_file).replace(".json", ""),
) )
today_trades = config.get_today_trades() today_trades = config.get_today_trades()
if not today_trades: if not today_trades:
logger.info("%s - 今天没有需要执行的交易", config_file) logger.info("%s - 今天没有需要执行的交易", config_file)
continue continue
@@ -465,16 +627,21 @@ def main():
params = trade_config.get("params", {}) params = trade_config.get("params", {})
if not all([symbol, order_type, side]): if not all([symbol, order_type, side]):
logger.error("%s - 交易配置缺少必要参数: %s", config_file, trade_config) logger.error(
"%s - 交易配置缺少必要参数: %s", config_file, trade_config
)
continue continue
logger.info("%s - 执行交易: %s %s %s", config_file, symbol, order_type, side) logger.info(
result = spot_trader.trade( "%s - 执行交易: %s %s %s", config_file, symbol, order_type, side
symbol=symbol,
order_type=order_type,
side=side,
**params,
) )
logger.debug("%s - 交易参数: %s", config_file, params)
# 执行交易
result = spot_trader.trade(
symbol=symbol, order_type=order_type, side=side, **params
)
if result: if result:
logger.info("%s - 交易执行成功: %s", config_file, result) logger.info("%s - 交易执行成功: %s", config_file, result)
else: else:

534
mexc_spot_v3.py Normal file
View File

@@ -0,0 +1,534 @@
import requests
import hmac
import hashlib
from urllib.parse import urlencode, quote
# ServerTime、Signature
class TOOL(object):
def _get_server_time(self):
return requests.request('get', 'https://api.mexc.com/api/v3/time').json()['serverTime']
def _sign_v3(self, req_time, sign_params=None):
if sign_params:
sign_params = urlencode(sign_params, quote_via=quote)
to_sign = "{}&timestamp={}".format(sign_params, req_time)
else:
to_sign = "timestamp={}".format(req_time)
sign = hmac.new(self.mexc_secret.encode('utf-8'), to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
return sign
def public_request(self, method, url, params=None):
url = '{}{}'.format(self.hosts, url)
return requests.request(method, url, params=params)
def sign_request(self, method, url, params=None):
url = '{}{}'.format(self.hosts, url)
req_time = self._get_server_time()
if params:
params['signature'] = self._sign_v3(req_time=req_time, sign_params=params)
else:
params = {}
params['signature'] = self._sign_v3(req_time=req_time)
params['timestamp'] = req_time
headers = {
'x-mexc-apikey': self.mexc_key,
'Content-Type': 'application/json',
}
return requests.request(method, url, params=params, headers=headers)
# Market Data
class mexc_market(TOOL):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config["mexc_host"]
self.method = 'GET'
def get_ping(self):
"""test connectivity"""
url = '{}{}'.format(self.api, '/ping')
response = self.public_request(self.method, url)
return response.json()
def get_timestamp(self):
"""get sever time"""
url = '{}{}'.format(self.api, '/time')
response = self.public_request(self.method, url)
return response.json()
def get_defaultSymbols(self):
"""get defaultSymbols"""
url = '{}{}'.format(self.api, '/defaultSymbols')
response = self.public_request(self.method, url)
return response.json()
def get_exchangeInfo(self, params=None):
"""get exchangeInfo"""
url = '{}{}'.format(self.api, '/exchangeInfo')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_depth(self, params):
"""get symbol depth"""
url = '{}{}'.format(self.api, '/depth')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_deals(self, params):
"""get current trade deals list"""
url = '{}{}'.format(self.api, '/trades')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_aggtrades(self, params):
"""get aggregate trades list"""
url = '{}{}'.format(self.api, '/aggTrades')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_kline(self, params):
"""get k-line data"""
url = '{}{}'.format(self.api, '/klines')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_avgprice(self, params):
"""get current average prcie(default : 5m)"""
url = '{}{}'.format(self.api, '/avgPrice')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_24hr_ticker(self, params=None):
"""get 24hr prcie ticker change statistics"""
url = '{}{}'.format(self.api, '/ticker/24hr')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_price(self, params=None):
"""get symbol price ticker"""
url = '{}{}'.format(self.api, '/ticker/price')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_bookticker(self, params=None):
"""get symbol order book ticker"""
url = '{}{}'.format(self.api, '/ticker/bookTicker')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_ETF_info(self, params=None):
"""get ETF information"""
url = '{}{}'.format(self.api, '/etf/info')
response = self.public_request(self.method, url, params=params)
return response.json()
# Spot Trade
class mexc_trade(TOOL):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_selfSymbols(self):
"""get currency information"""
method = 'GET'
url = '{}{}'.format(self.api, '/selfSymbols')
response = self.sign_request(method, url)
return response.json()
def post_order_test(self, params):
"""test new order"""
method = 'POST'
url = '{}{}'.format(self.api, '/order/test')
response = self.sign_request(method, url, params=params)
return response.json()
def post_order(self, params):
"""place order"""
method = 'POST'
url = '{}{}'.format(self.api, '/order')
response = self.sign_request(method, url, params=params)
return response.json()
def post_batchorders(self, params):
"""place batch orders(same symbol)"""
method = 'POST'
url = '{}{}'.format(self.api, '/batchOrders')
params = {"batchOrders": str(params)}
response = self.sign_request(method, url, params=params)
print(response.url)
return response.json()
def delete_order(self, params):
"""
cancel order
'origClientOrderId' or 'orderId' must be sent
"""
method = 'DELETE'
url = '{}{}'.format(self.api, '/order')
response = self.sign_request(method, url, params=params)
return response.json()
def delete_openorders(self, params):
"""
cancel all order for a single symbol
"""
method = 'DELETE'
url = '{}{}'.format(self.api, '/openOrders')
response = self.sign_request(method, url, params=params)
return response.json()
def get_order(self, params):
"""
get order
'origClientOrderId' or 'orderId' must be sent
"""
method = 'GET'
url = '{}{}'.format(self.api, '/order')
response = self.sign_request(method, url, params=params)
return response.json()
def get_openorders(self, params):
"""get current pending order """
method = 'GET'
url = '{}{}'.format(self.api, '/openOrders')
response = self.sign_request(method, url, params=params)
return response.json()
def get_allorders(self, params):
"""
get current all order
startTime and endTime need to use at the same time
"""
method = 'GET'
url = '{}{}'.format(self.api, '/allOrders')
response = self.sign_request(method, url, params=params)
return response.json()
def get_mytrades(self, params):
"""
get current all order
orderId need to use with symbol at the same time
"""
method = 'GET'
url = '{}{}'.format(self.api, '/myTrades')
response = self.sign_request(method, url, params=params)
return response.json()
def post_mxDeDuct(self, params):
"""Enable MX DeDuct"""
method = 'POST'
url = '{}{}'.format(self.api, '/mxDeduct/enable')
response = self.sign_request(method, url, params=params)
return response.json()
def get_mxDeDuct(self):
"""MX DeDuct status"""
method = 'GET'
url = '{}{}'.format(self.api, '/mxDeduct/enable')
response = self.sign_request(method, url)
return response.json()
def get_account_info(self):
"""get account information"""
method = 'GET'
url = '{}{}'.format(self.api, '/account')
response = self.sign_request(method, url)
return response.json()
# Wallet
class mexc_wallet(TOOL):
def __init__(self, config):
self.api = '/api/v3/capital'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_coinlist(self):
"""get currency information"""
method = 'GET'
url = '{}{}'.format(self.api, '/config/getall')
response = self.sign_request(method, url)
return response.json()
def post_withdraw(self, params):
"""withdraw"""
method = 'POST'
url = '{}{}'.format(self.api, '/withdraw/apply')
response = self.sign_request(method, url, params=params)
return response.json()
def cancel_withdraw(self, params):
"""withdraw"""
method = 'DELETE'
url = '{}{}'.format(self.api, '/withdraw')
response = self.sign_request(method, url, params=params)
return response.json()
def get_deposit_list(self, params):
"""deposit history list"""
method = 'GET'
url = '{}{}'.format(self.api, '/deposit/hisrec')
response = self.sign_request(method, url, params=params)
return response.json()
def get_withdraw_list(self, params):
"""withdraw history list"""
method = 'GET'
url = '{}{}'.format(self.api, '/withdraw/history')
response = self.sign_request(method, url, params=params)
return response.json()
def post_deposit_address(self, params):
"""generate deposit address"""
method = 'POST'
url = '{}{}'.format(self.api, '/deposit/address')
response = self.sign_request(method, url, params=params)
return response.json()
def get_deposit_address(self, params):
"""get deposit address"""
method = 'GET'
url = '{}{}'.format(self.api, '/deposit/address')
response = self.sign_request(method, url, params=params)
return response.json()
def get_withdraw_address(self, params):
"""get deposit address"""
method = 'GET'
url = '{}{}'.format(self.api, '/withdraw/address')
response = self.sign_request(method, url, params=params)
return response.json()
def post_transfer(self, params):
"""universal transfer"""
method = 'POST'
url = '{}{}'.format(self.api, '/transfer')
response = self.sign_request(method, url, params=params)
return response.json()
def get_transfer_list(self, params):
"""universal transfer history"""
method = 'GET'
url = '{}{}'.format(self.api, '/transfer')
response = self.sign_request(method, url, params=params)
return response.json()
def get_transfer_list_byId(self, params):
"""universal transfer history (by tranId)"""
method = 'GET'
url = '{}{}'.format(self.api, '/transfer/tranId')
response = self.sign_request(method, url, params=params)
return response.json()
def post_transfer_internal(self, params):
"""universal transfer"""
method = 'POST'
url = '{}{}'.format(self.api, '/transfer/internal')
response = self.sign_request(method, url, params=params)
return response.json()
def get_transfer_internal_list(self, params=None):
"""universal transfer"""
method = 'GET'
url = '{}{}'.format(self.api, '/transfer/internal')
response = self.sign_request(method, url, params=params)
return response.json()
def get_smallAssets_list(self):
"""small Assets convertible list"""
method = 'GET'
url = '{}{}'.format(self.api, '/convert/list')
response = self.sign_request(method, url)
return response.json()
def post_smallAssets_convert(self, params):
"""small Assets convert"""
method = 'POST'
url = '{}{}'.format(self.api, '/convert')
response = self.sign_request(method, url, params=params)
return response.json()
def get_smallAssets_history(self, params=None):
"""small Assets convertible history"""
method = 'GET'
url = '{}{}'.format(self.api, '/convert')
response = self.sign_request(method, url, params=params)
return response.json()
# Sub-Account
class mexc_subaccount(TOOL):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def post_virtualSubAccount(self, params):
"""create a sub-account"""
method = 'POST'
url = '{}{}'.format(self.api, '/sub-account/virtualSubAccount')
response = self.sign_request(method, url, params=params)
return response.json()
def get_SubAccountList(self, params=None):
"""get sub-account list"""
method = 'GET'
url = '{}{}'.format(self.api, '/sub-account/list')
response = self.sign_request(method, url, params=params)
return response.json()
def post_virtualApiKey(self, params):
"""create sub-account's apikey"""
method = 'POST'
url = '{}{}'.format(self.api, '/sub-account/apiKey')
response = self.sign_request(method, url, params=params)
return response.json()
def get_virtualApiKey(self, params):
"""get sub-account's apikey"""
method = 'GET'
url = '{}{}'.format(self.api, '/sub-account/apiKey')
response = self.sign_request(method, url, params=params)
return response.json()
def delete_virtualApiKey(self, params):
"""delete sub-account's apikey"""
method = 'DELETE'
url = '{}{}'.format(self.api, '/sub-account/apiKey')
response = self.sign_request(method, url, params=params)
return response.json()
def post_universalTransfer(self, params):
"""universal transfer between accounts"""
method = 'POST'
url = '{}{}'.format(self.api, '/capital/sub-account/universalTransfer')
response = self.sign_request(method, url, params=params)
return response.json()
def get_universalTransfer(self, params):
"""universal transfer history between accounts"""
method = 'GET'
url = '{}{}'.format(self.api, '/capital/sub-account/universalTransfer')
response = self.sign_request(method, url, params=params)
return response.json()
# Rebate
class mexc_rebate(TOOL):
def __init__(self, config):
self.api = '/api/v3/rebate'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_taxQuery(self, params=None):
"""get the rebate commission record"""
method = 'GET'
url = '{}{}'.format(self.api, '/taxQuery')
response = self.sign_request(method, url, params=params)
return response.json()
def get_rebate_detail(self, params=None):
"""get rebate record details"""
method = 'GET'
url = '{}{}'.format(self.api, '/detail')
response = self.sign_request(method, url, params=params)
return response.json()
def get_kickback_detail(self, params=None):
"""get self-return record details"""
method = 'GET'
url = '{}{}'.format(self.api, '/detail/kickback')
response = self.sign_request(method, url, params=params)
return response.json()
def get_inviter(self, params=None):
"""get self-return record details"""
method = 'GET'
url = '{}{}'.format(self.api, '/referCode')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_commission(self, params=None):
"""get affiliate commission history"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/commission')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_withdraw(self, params=None):
"""get affiliate withdraw history"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/withdraw')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_commission_detail(self, params=None):
"""get affiliate commission details"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/commission/detail')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_referral(self, params=None):
"""get affiliate referral"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/referral')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_subaffiliates(self, params=None):
"""get affiliate subaffiliates"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/subaffiliates')
response = self.sign_request(method, url, params=params)
return response.json()
# WebSocket ListenKey
class mexc_listenkey(TOOL):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def post_listenKey(self):
""" generate ListenKey """
method = 'POST'
url = '{}{}'.format(self.api, '/userDataStream')
response = self.sign_request(method, url)
return response.json()
def get_listenKey(self):
""" get valid ListenKey """
method = 'GET'
url = '{}{}'.format(self.api, '/userDataStream')
response = self.sign_request(method, url)
return response.json()
def put_listenKey(self, params):
""" extend ListenKey validity """
method = 'PUT'
url = '{}{}'.format(self.api, '/userDataStream')
response = self.sign_request(method, url, params=params)
return response.json()
def delete_listenKey(self, params):
""" delete ListenKey """
method = 'DELETE'
url = '{}{}'.format(self.api, '/userDataStream')
response = self.sign_request(method, url, params=params)
return response.json()

View File

@@ -10,4 +10,3 @@ six
smmap smmap
tzdata tzdata
urllib3 urllib3
ccxt