feat: update to ccxt

This commit is contained in:
2026-04-25 22:36:50 +08:00
parent 9c124e2c56
commit 9a2d0a6c7a
2 changed files with 174 additions and 902 deletions

542
main.py
View File

@@ -2,22 +2,17 @@
# -*- coding: utf-8 -*-
"""
MEXC 交易机器人
定投交易机器人
功能:
1. 从JSON配置文件读取交易指令
2. 支持按日期执行多个交易(*表示每天执行)
3. 支持多种订单类型 (LIMIT, MARKET, LIMIT_MAKER, IMMEDIATE_OR_CANCEL, FILL_OR_KILL)
3. 支持多种订单类型 (limit, market)
4. 当无price时自动获取实时价格作为限价
5. LIMIT订单支持只提供quoteOrderQty自动计算quantity
5. limit订单支持只提供quoteOrderQty自动计算quantity
6. 证券代码映射功能
7. 完整的交易记录和日志
类说明:
- MexcSpotMarket: 处理市场数据查询
- MexcSpotTrade: 处理现货交易逻辑
- TradingConfig: 管理交易配置
使用示例:
python main.py
"""
@@ -31,8 +26,7 @@ from pathlib import Path
from datetime import datetime, date
from typing import Dict, Any, Optional, Tuple, List
import git
import mexc_spot_v3
import ccxt
os.makedirs("output", exist_ok=True)
@@ -41,140 +35,90 @@ logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("output/mexc_trading_bot.log", encoding="utf-8"),
logging.FileHandler("output/trading_bot.log", encoding="utf-8"),
logging.StreamHandler(),
],
)
class MexcSpotMarket:
"""MEXC 市场数据查询类
class BotSpotMarket:
"""市场数据查询类"""
提供获取交易对价格等功能
方法:
- get_exchange_info(symbol): 获取交易对信息
- get_price(symbol): 获取指定交易对的当前价格
"""
def __init__(self, config):
"""初始化市场数据查询接口"""
self.market = mexc_spot_v3.mexc_market(config)
def __init__(self, exchange: ccxt.Exchange):
self.exchange = exchange
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
try:
self.exchange.load_markets()
self.logger.info("加载市场信息成功")
except Exception as e:
self.logger.error("加载市场信息失败: %s", str(e))
def get_exchange_info(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
获取交易对信息
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
交易对信息字典或None(如果失败)
"""
params = {"symbol": symbol}
"""获取交易对信息,返回格式与原始代码兼容"""
try:
self.logger.info("查询交易对信息: %s", symbol)
exchange_info = self.market.get_exchangeInfo(params)
if not exchange_info or "symbols" not in exchange_info:
self.logger.error("获取交易对信息失败: %s", exchange_info)
if symbol not in self.exchange.markets:
self.logger.warning("市场信息中未找到 %s,尝试重新加载", symbol)
self.exchange.load_markets()
market = self.exchange.markets.get(symbol)
if not market:
self.logger.error("交易对 %s 不存在", symbol)
return None
self.logger.info("获取交易对信息成功")
exchange_info = {
"symbols": [
{
"baseAssetPrecision": market["precision"]["amount"],
"quoteAmountPrecision": str(market["limits"]["cost"]["min"] or 0),
}
]
}
self.logger.info("获取交易对信息成功: %s", symbol)
return exchange_info
except Exception as e:
self.logger.error("查询交易所信息失败: %s", str(e))
return None
def get_price(self, symbol: str) -> Optional[float]:
"""
获取指定交易对的当前价格
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
当前价格(浮点数)或None(如果失败)
Raises:
Exception: 当API调用失败时抛出异常
"""
params = {"symbol": symbol}
"""获取指定交易对的当前价格"""
try:
self.logger.info("查询交易对价格: %s", symbol)
price_data = self.market.get_price(params)
if not price_data or "price" not in price_data:
self.logger.error("获取价格数据失败: %s", price_data)
ticker = self.exchange.fetch_ticker(symbol)
if not ticker or "last" not in ticker:
self.logger.error("获取价格数据失败: %s", ticker)
return None
price_str = price_data["price"]
price = float(price_str)
price = float(ticker["last"])
self.logger.info("获取价格成功: %s = %f", symbol, price)
return price
except Exception as e:
self.logger.error("查询价格失败: %s", str(e))
return None
class MexcSpotTrade:
"""MEXC 现货交易类
class BotSpotTrade:
"""现货交易类"""
处理所有现货交易逻辑,包括订单创建、状态查询和记录
属性:
- SYMBOL_MAPPING: 证券代码映射字典
- ORDER_TYPE_REQUIREMENTS: 各订单类型必需参数
方法:
- trade(): 执行现货交易
- _api_get_order(): 查询订单状态
- _tool_map_symbol(): 映射证券代码
- _tool_validate_order_params(): 验证订单参数
- _tool_record_transaction(): 记录交易到CSV
"""
# 订单类型与必需参数
ORDER_TYPE_REQUIREMENTS = {
"LIMIT": [
"quantity",
"price",
"quoteOrderQty",
], # quantity和price或quoteOrderQty和price
"MARKET": ["quantity", "quoteOrderQty"], # quantity或quoteOrderQty
"LIMIT_MAKER": ["quantity", "price"],
"IMMEDIATE_OR_CANCEL": ["quantity", "price"],
"FILL_OR_KILL": ["quantity", "price"],
"limit": ["quantity", "price", "quoteOrderQty"],
"market": ["quantity", "quoteOrderQty"]
}
def __init__(self, config, symbol_mapping, config_file_name):
"""初始化交易机器人"""
self.trader = mexc_spot_v3.mexc_trade(config)
self.market = MexcSpotMarket(config)
def __init__(self, exchange: ccxt.Exchange, symbol_mapping: Dict[str, str], config_file_name: str):
self.exchange = exchange
self.market = BotSpotMarket(exchange)
self.csv_file = f"output/{config_file_name}.csv"
self.symbol_mapping = symbol_mapping
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def _api_get_balance(self) -> str:
"""
获取账户余额
Returns:
账户余额字典或None(如果失败)
"""
try:
self.logger.info("查询账户余额")
account_info = self.trader.get_account_info()
account_info_balance = account_info.get("balances", [])
balance = self.exchange.fetch_balance()
free = balance.get("free", {})
balances = ""
for item in account_info_balance:
balances += f"{item['available']} {item['asset']} "
for asset, amount in free.items():
if amount and amount > 0:
balances += f"{amount} {asset} "
self.logger.info("获取账户余额成功")
return balances
except Exception as e:
@@ -182,25 +126,9 @@ class MexcSpotTrade:
return f"ERROR: {str(e)}"
def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]:
"""
查询订单状态
Args:
symbol: 交易对
order_id: 订单ID
Returns:
订单详情字典或None(如果失败)
"""
params = {
"symbol": symbol,
"orderId": order_id,
}
try:
self.logger.info("查询订单状态, 订单ID: %s", order_id)
order = self.trader.get_order(params)
order = self.exchange.fetch_order(order_id, symbol)
self.logger.info("订单状态: %s", order.get("status"))
return order
except Exception as e:
@@ -208,78 +136,49 @@ class MexcSpotTrade:
return None
def _tool_map_symbol(self, symbol: str) -> str:
"""映射证券代码用于记录"""
return self.symbol_mapping.get(symbol, symbol)
def _tool_validate_order_params(
self, order_type: str, params: Dict[str, Any]
) -> Tuple[bool, str]:
"""
验证订单参数是否符合要求
Args:
order_type: 订单类型
params: 订单参数
Returns:
元组(是否有效, 错误信息)
"""
def _tool_validate_order_params(self, order_type: str, params: Dict[str, Any]) -> Tuple[bool, str]:
required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, [])
if not required_params:
return False, f"未知的订单类型: {order_type}"
# 特殊处理LIMIT订单
if order_type == "LIMIT":
# 需要price和(quantity或quoteOrderQty)
if order_type == "limit":
if "price" not in params:
return False, "LIMIT订单需要price参数"
return False, "limit订单需要price参数"
if "quantity" not in params and "quoteOrderQty" not in params:
return False, "LIMIT订单需要quantity或quoteOrderQty参数"
return False, "limit订单需要quantity或quoteOrderQty参数"
return True, ""
# 特殊处理MARKET订单
if order_type == "MARKET":
if order_type == "market":
if "quantity" not in params and "quoteOrderQty" not in params:
return False, "MARKET订单需要quantity或quoteOrderQty参数"
return False, "market订单需要quantity或quoteOrderQty参数"
return True, ""
# 检查其他订单类型的必需参数
missing_params = [p for p in required_params if p not in params]
if missing_params:
return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}"
return True, ""
def _tool_sci_to_decimal(self, num, precision = 30):
if isinstance(num, str):
num = float(num)
result = f"{num:.{precision}f}"
result = result.rstrip('0').rstrip('.')
return result
def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool:
"""
记录交易到CSV文件
Args:
order_data: 订单数据字典
Returns:
是否成功记录
"""
try:
original_symbol = order_data["symbol"]
mapped_symbol = self._tool_map_symbol(original_symbol)
order_id = order_data["orderId"]
executed_qty = order_data["executedQty"]
cummulative_quote_qty = order_data["cummulativeQuoteQty"]
order_id = order_data["id"]
executed_qty = self._tool_sci_to_decimal(order_data.get("filled", 0.0))
cummulative_quote_qty = self._tool_sci_to_decimal(order_data.get("cost", 0.0))
side = order_data["side"]
balances = self._api_get_balance()
# 确定交易类型显示
trade_type = "买入" if side == "BUY" else "卖出"
timestamp = datetime.fromtimestamp(order_data["time"] / 1000).strftime(
"%Y-%m-%dT%H:%M"
)
trade_type = "买入" if side == "buy" else "卖出"
timestamp = datetime.fromtimestamp(order_data["timestamp"] / 1000).strftime("%Y-%m-%dT%H:%M")
row = [
timestamp,
@@ -289,11 +188,10 @@ class MexcSpotTrade:
cummulative_quote_qty,
"资金账户",
"CEX",
f"MEXC API - Order ID: {order_id}",
balances
f"DCA Order ID: {order_id}",
balances,
]
# 检查文件是否存在
file_exists = False
try:
with open(self.csv_file, "r", encoding="utf-8") as f:
@@ -301,23 +199,13 @@ class MexcSpotTrade:
except FileNotFoundError:
pass
# 写入CSV
with open(self.csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(
[
"日期",
"类型",
"证券代码",
"份额",
"净额",
"现金账户",
"目标账户",
"备注",
"balances"
]
)
writer.writerow([
"日期", "类型", "证券代码", "份额", "净额",
"现金账户", "目标账户", "备注", "balances",
])
writer.writerow(row)
self.logger.info("交易记录成功, 订单ID: %s", order_id)
@@ -326,155 +214,91 @@ class MexcSpotTrade:
self.logger.error("记录交易失败: %s", str(e))
return False
def _tool_calculate_quantity(
self,
quantity: float,
price: float,
base_asset_precision: int,
quote_amount_precision: float,
) -> float:
"""
调整下单数量以满足最小成交额要求。
策略说明:
- 计算出的quantity如果乘以price后小于交易对的最小成交额(quoteAmountPrecision)
则将quantity增加一个最小单位(10^(-base_asset_precision)),确保下单金额满足交易所要求。
Args:
quantity: 初步计算出的下单数量
price: 当前价格
base_asset_precision: 交易对基础资产的小数精度
quote_amount_precision: 交易对最小成交额
Returns:
满足最小成交额要求的下单数量
"""
processed_quantity = round(quantity, base_asset_precision)
if processed_quantity * price < quote_amount_precision:
self.logger.info(
"计算的quantity小于最低要求%f * %f = %f < %f,进行调整",
processed_quantity,
price,
processed_quantity * price,
quote_amount_precision,
)
processed_quantity = round(
quantity + 10 ** (-base_asset_precision), base_asset_precision
)
self.logger.info("调整后的quantity: %f", processed_quantity)
return processed_quantity
def trade(
self, symbol: str, order_type: str, side: str, **kwargs
) -> Optional[Dict[str, Any]]:
"""
执行现货交易
Args:
symbol: 交易对 (如 BTCUSDC)
order_type: 订单类型 (LIMIT, MARKET等)
side: 交易方向 (BUY, SELL)
**kwargs: 其他订单参数
Returns:
订单信息字典或None(如果失败)
Raises:
ValueError: 当参数验证失败时
Exception: 当API调用失败时
"""
# 基本参数验证
if side not in ["BUY", "SELL"]:
def trade(self, symbol: str, order_type: str, side: str, **kwargs) -> Optional[Dict[str, Any]]:
if side not in ["buy", "sell"]:
self.logger.error("无效的交易方向: %s", side)
return None
order_type = order_type.upper()
processed_kwargs = kwargs.copy()
# 记录未经过偏移的价格以供LIMIT订单只有quoteOrderQty没有quantity的情况使用
# 参数有price时直接使用否则就为实时价格
clean_price = processed_kwargs.get("price")
# 处理无price的情况获取实时价格
if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs:
if order_type == "limit" and "price" not in processed_kwargs:
current_price = self.market.get_price(symbol)
if current_price is None:
self.logger.error("无法获取实时价格,交易取消")
return None
clean_price = current_price
# 防止挂单不成交
if side == "BUY":
processed_kwargs["price"] = current_price * 1.01 # 买入加价1%
elif side == "SELL":
processed_kwargs["price"] = current_price * 0.99 # 卖出减价1%
self.logger.info("使用调整1%%后价格作为限价: %f", processed_kwargs["price"])
if side == "buy":
processed_kwargs["price"] = current_price * 1.001 # 买入加价0.1%
elif side == "sell":
processed_kwargs["price"] = current_price * 0.999 # 卖出减价0.1%
self.logger.info("使用调整0.1%%后价格作为限价: %f", processed_kwargs["price"])
# 处理LIMIT订单只有quoteOrderQty没有quantity的情况
if (
order_type in ["LIMIT", "LIMIT_MAKER"]
order_type == "limit"
and "quoteOrderQty" in processed_kwargs
and "quantity" not in processed_kwargs
):
try:
exchange_info = self.market.get_exchange_info(symbol)
if not exchange_info:
return None
quote_amount = float(processed_kwargs["quoteOrderQty"])
quantity = quote_amount / clean_price
base_asset_precision = int(
exchange_info["symbols"][0]["baseAssetPrecision"]
)
quote_amount_precision = float(
exchange_info["symbols"][0]["quoteAmountPrecision"]
)
processed_quantity = self._tool_calculate_quantity(
quantity, clean_price, base_asset_precision, quote_amount_precision
)
self.logger.info("根据quoteOrderQty计算quantity: %f", processed_quantity)
processed_kwargs["quantity"] = str(processed_quantity)
if clean_price is None:
self.logger.error("无法获取价格来计算数量")
return None
price_for_calc = float(clean_price)
quantity = quote_amount / price_for_calc
self.logger.info("根据quoteOrderQty计算quantity: %f", quantity)
processed_kwargs["quantity"] = str(quantity)
processed_kwargs.pop("quoteOrderQty")
except (ValueError, KeyError) as e:
except (ValueError, KeyError, TypeError) as e:
self.logger.error("计算quantity失败: %s", str(e))
return None
# 准备订单参数
base_params = {
"symbol": symbol,
"side": side,
"type": order_type,
**processed_kwargs,
}
amount = processed_kwargs.get("quantity")
price = processed_kwargs.get("price")
if amount is not None:
amount = float(amount)
if price is not None:
price = float(price)
# 验证参数
is_valid, error_msg = self._tool_validate_order_params(order_type, base_params)
params = {}
if order_type == "market" and "quoteOrderQty" in processed_kwargs:
current_price = self.market.get_price(symbol)
if current_price is None:
self.logger.error("无法获取实时价格,交易取消")
return None
clean_price = current_price
amount = float(processed_kwargs["quoteOrderQty"]) / clean_price
is_valid, error_msg = self._tool_validate_order_params(order_type, processed_kwargs)
if not is_valid:
self.logger.error("订单参数验证失败: %s", error_msg)
return None
try:
self.logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
self.logger.debug("订单参数: %s", base_params)
self.logger.info("订单参数: symbol=%s, type=%s, side=%s, amount=%s, price=%s, params=%s",
symbol, order_type, side, amount, price, params)
# 测试订单
test_result = self.trader.post_order_test(base_params.copy())
if test_result != {}:
self.logger.error("订单测试失败,参数有误: %s", test_result)
return None
self.logger.info("订单参数测试通过,准备正式下单")
# 正式下单
order = self.trader.post_order(base_params.copy())
order_id = order.get("orderId")
order = self.exchange.create_order(
symbol=symbol,
type=order_type,
side=side,
amount=amount,
price=price,
params=params,
)
order_id = order.get("id")
if not order_id:
self.logger.error("下单失败: 未获取到订单ID")
self.logger.error(base_params.copy())
self.logger.error(order)
return None
self.logger.info("订单创建成功, 订单ID: %s", order_id)
# 查询订单详情
self.logger.info("等待1秒后查询订单状态...")
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
@@ -482,9 +306,9 @@ class MexcSpotTrade:
self.logger.error("获取订单详情失败")
return None
# 如果不是FILLED则重复查询最多10次
retry_count = 0
while order_detail.get("status") != "FILLED" and retry_count < 10:
filled_statuses = ("closed", "filled")
while order_detail.get("status", "").lower() not in filled_statuses and retry_count < 10:
retry_count += 1
self.logger.info(
"订单未完成(状态: %s)等待1秒后第%s次重试查询...",
@@ -497,8 +321,7 @@ class MexcSpotTrade:
self.logger.error("获取订单详情失败")
return None
# 记录交易
if order_detail.get("status") == "FILLED":
if order_detail.get("status", "").lower() in filled_statuses:
if not self._tool_record_transaction(order_detail):
self.logger.error("交易记录失败")
else:
@@ -516,29 +339,13 @@ class MexcSpotTrade:
class TradingConfig:
"""交易配置管理类
负责加载和管理交易配置
方法:
- get_today_trades(): 获取今天需要执行的交易列表
- _load_config(): 加载JSON配置文件
"""
"""交易配置管理类"""
def __init__(self, config_file: str = "config/trading_config.json"):
"""
初始化交易配置
Args:
config_file: 配置文件路径
"""
self.config_file = config_file
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
self.config_data = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""加载JSON配置文件"""
try:
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
@@ -555,73 +362,71 @@ class TradingConfig:
return {}
def get_today_trades(self) -> List[Dict[str, Any]]:
"""
获取今天需要执行的交易列表
Returns:
今天需要执行的交易配置列表
Example:
>>> config = TradingConfig()
>>> today_trades = config.get_today_trades()
>>> print(len(today_trades))
"""
if not self.config_data or "trades" not in self.config_data:
return []
today = date.today().isoformat()
today_trades = []
for trade in self.config_data["trades"]:
# 检查交易是否包含执行日期
if "execute_dates" not in trade:
continue
# 检查是否设置了每天执行(*)
if "*" in trade["execute_dates"]:
today_trades.append(trade)
continue
# 检查今天是否在执行日期列表中
if today in trade["execute_dates"]:
today_trades.append(trade)
return today_trades
def git_commit(repo_path: str = ".") -> str:
"""获取Git仓库版本"""
def build_ccxt_exchange(api_config: Dict[str, Any], exchange_id: str) -> ccxt.Exchange:
"""
根据配置文件和交易所ID构建 ccxt 交易所实例
:param api_config: 包含 api_key, api_secret 等字段的字典
:param exchange_id: ccxt 交易所标识符(小写),如 "mexc", "binance", "bybit"
"""
api_key = api_config.get("api_key")
secret = api_config.get("api_secret")
exchange_class = getattr(ccxt, exchange_id, None)
if exchange_class is None:
raise ValueError(f"不支持的交易所ID: {exchange_id},请检查配置文件中的 'exchange' 字段")
exchange_params = {
"apiKey": api_key,
"secret": secret,
"enableRatelimit": True,
}
if "password" in api_config:
exchange_params["password"] = api_config["password"]
return exchange_class(exchange_params)
def git_commit(repo_path: str = ".") -> Optional[str]:
try:
repo = git.Repo(repo_path)
return repo.head.commit.hexsha
except Exception as _:
except Exception:
return None
def main():
"""主函数"""
logger = logging.getLogger(f"{__name__}.main")
logger.info("=" * 40)
# 获取主程序Git仓库版本
app_commit = git_commit(".")[:10]
# 确保config目录存在
app_commit = git_commit(".")
app_commit_short = app_commit[:10] if app_commit else "unknown"
if not os.path.exists("config"):
logger.error("配置目录 config 不存在")
return
# 获取config, output仓库版本
config_commit = git_commit("config")[:10]
output_commit = git_commit("output")[:10]
logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit, config_commit, output_commit)
config_commit = git_commit("config")
config_commit_short = config_commit[:10] if config_commit else "unknown"
output_commit = git_commit("output")
output_commit_short = output_commit[:10] if output_commit else "unknown"
logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit_short, config_commit_short, output_commit_short)
# 获取config目录下所有json文件
config_files = list(Path("config").glob("*.json"))
if not config_files:
logger.info("配置目录中没有找到任何JSON文件")
return
@@ -630,16 +435,22 @@ def main():
for config_file in config_files:
try:
# 提取交易参数
logger.info("处理配置文件: %s", config_file)
config = TradingConfig(str(config_file))
spot_trader = MexcSpotTrade(
config.config_data.get("api", {}),
config.config_data.get("symbol_mapping", {}),
os.path.basename(config_file).replace(".json", ""),
)
today_trades = config.get_today_trades()
exchange_id = config.config_data.get("exchange")
logger.info("使用交易所: %s", exchange_id)
api_conf = config.config_data.get("api", {})
exchange = build_ccxt_exchange(api_conf, exchange_id)
spot_trader = BotSpotTrade(
exchange=exchange,
symbol_mapping=config.config_data.get("symbol_mapping", {}),
config_file_name=os.path.basename(config_file).replace(".json", ""),
)
today_trades = config.get_today_trades()
if not today_trades:
logger.info("%s - 今天没有需要执行的交易", config_file)
continue
@@ -654,21 +465,16 @@ def main():
params = trade_config.get("params", {})
if not all([symbol, order_type, side]):
logger.error(
"%s - 交易配置缺少必要参数: %s", config_file, trade_config
)
logger.error("%s - 交易配置缺少必要参数: %s", config_file, trade_config)
continue
logger.info(
"%s - 执行交易: %s %s %s", config_file, symbol, order_type, side
)
logger.debug("%s - 交易参数: %s", config_file, params)
# 执行交易
logger.info("%s - 执行交易: %s %s %s", config_file, symbol, order_type, side)
result = spot_trader.trade(
symbol=symbol, order_type=order_type, side=side, **params
symbol=symbol,
order_type=order_type,
side=side,
**params,
)
if result:
logger.info("%s - 交易执行成功: %s", config_file, result)
else: