Compare commits

..

1 Commits

Author SHA1 Message Date
bc41498e96 fix(trade): LIMIT calculated quantity algorithm
Closes #6
2025-08-10 02:39:29 +08:00
4 changed files with 923 additions and 222 deletions

1
config1 Submodule

Submodule config1 added at fab56fe709

607
main.py
View File

@@ -2,17 +2,22 @@
# -*- coding: utf-8 -*-
"""
定投交易机器人
MEXC 交易机器人
功能:
1. 从JSON配置文件读取交易指令
2. 支持按日期执行多个交易(*表示每天执行)
3. 支持多种订单类型 (limit, market)
3. 支持多种订单类型 (LIMIT, MARKET, LIMIT_MAKER, IMMEDIATE_OR_CANCEL, FILL_OR_KILL)
4. 当无price时自动获取实时价格作为限价
5. limit订单支持只提供quoteOrderQty自动计算quantity
5. LIMIT订单支持只提供quoteOrderQty自动计算quantity
6. 证券代码映射功能
7. 完整的交易记录和日志
类说明:
- MexcSpotMarket: 处理市场数据查询
- MexcSpotTrade: 处理现货交易逻辑
- TradingConfig: 管理交易配置
使用示例:
python main.py
"""
@@ -26,7 +31,8 @@ from pathlib import Path
from datetime import datetime, date
from typing import Dict, Any, Optional, Tuple, List
import git
import ccxt
import mexc_spot_v3
os.makedirs("output", exist_ok=True)
@@ -35,150 +41,223 @@ logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("output/trading_bot.log", encoding="utf-8"),
logging.FileHandler("output/mexc_trading_bot.log", encoding="utf-8"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
class BotSpotMarket:
"""市场数据查询类"""
class MexcSpotMarket:
"""MEXC 市场数据查询类
def __init__(self, exchange: ccxt.Exchange):
self.exchange = exchange
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
try:
self.exchange.load_markets()
self.logger.info("加载市场信息成功")
except Exception as e:
self.logger.error("加载市场信息失败: %s", str(e))
提供获取交易对价格等功能
方法:
- get_exchange_info(symbol): 获取交易对信息
- get_price(symbol): 获取指定交易对的当前价格
"""
def __init__(self, config):
"""初始化市场数据查询接口"""
self.market = mexc_spot_v3.mexc_market(config)
def get_exchange_info(self, symbol: str) -> Optional[Dict[str, Any]]:
"""获取交易对信息,返回格式与原始代码兼容"""
"""
获取交易对信息
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
交易对信息字典或None(如果失败)
"""
params = {"symbol": symbol}
try:
self.logger.info("查询交易对信息: %s", symbol)
if symbol not in self.exchange.markets:
self.logger.warning("市场信息中未找到 %s,尝试重新加载", symbol)
self.exchange.load_markets()
market = self.exchange.markets.get(symbol)
if not market:
self.logger.error("交易对 %s 不存在", symbol)
logger.info("查询交易对信息: %s", symbol)
exchange_info = self.market.get_exchangeInfo(params)
if not exchange_info or "symbols" not in exchange_info:
logger.error("获取交易对信息失败: %s", exchange_info)
return None
exchange_info = {
"symbols": [
{
"baseAssetPrecision": market["precision"]["amount"],
"quoteAmountPrecision": str(market["limits"]["cost"]["min"] or 0),
}
]
}
self.logger.info("获取交易对信息成功: %s", symbol)
logger.info("获取交易对信息成功")
return exchange_info
except Exception as e:
self.logger.error("查询交易所信息失败: %s", str(e))
logger.error("查询交易所信息失败: %s", str(e))
return None
def get_price(self, symbol: str) -> Optional[float]:
"""获取指定交易对的当前价格"""
"""
获取指定交易对的当前价格
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
当前价格(浮点数)或None(如果失败)
Raises:
Exception: 当API调用失败时抛出异常
"""
params = {"symbol": symbol}
try:
self.logger.info("查询交易对价格: %s", symbol)
ticker = self.exchange.fetch_ticker(symbol)
if not ticker or "last" not in ticker:
self.logger.error("获取价格数据失败: %s", ticker)
logger.info("查询交易对价格: %s", symbol)
price_data = self.market.get_price(params)
if not price_data or "price" not in price_data:
logger.error("获取价格数据失败: %s", price_data)
return None
price = float(ticker["last"])
self.logger.info("获取价格成功: %s = %f", symbol, price)
price_str = price_data["price"]
price = float(price_str)
logger.info("获取价格成功: %s = %f", symbol, price)
return price
except Exception as e:
self.logger.error("查询价格失败: %s", str(e))
logger.error("查询价格失败: %s", str(e))
return None
class BotSpotTrade:
"""现货交易类"""
class MexcSpotTrade:
"""MEXC 现货交易类
处理所有现货交易逻辑,包括订单创建、状态查询和记录
属性:
- SYMBOL_MAPPING: 证券代码映射字典
- ORDER_TYPE_REQUIREMENTS: 各订单类型必需参数
方法:
- trade(): 执行现货交易
- _api_get_order(): 查询订单状态
- _tool_map_symbol(): 映射证券代码
- _tool_validate_order_params(): 验证订单参数
- _tool_record_transaction(): 记录交易到CSV
"""
# 订单类型与必需参数
ORDER_TYPE_REQUIREMENTS = {
"limit": ["quantity", "price", "quoteOrderQty"],
"market": ["quantity", "quoteOrderQty"]
"LIMIT": [
"quantity",
"price",
"quoteOrderQty",
], # quantity和price或quoteOrderQty和price
"MARKET": ["quantity", "quoteOrderQty"], # quantity或quoteOrderQty
"LIMIT_MAKER": ["quantity", "price"],
"IMMEDIATE_OR_CANCEL": ["quantity", "price"],
"FILL_OR_KILL": ["quantity", "price"],
}
def __init__(self, exchange: ccxt.Exchange, symbol_mapping: Dict[str, str], config_file_name: str):
self.exchange = exchange
self.market = BotSpotMarket(exchange)
def __init__(self, config, symbol_mapping, config_file_name):
"""初始化交易机器人"""
self.trader = mexc_spot_v3.mexc_trade(config)
self.market = MexcSpotMarket(config)
self.csv_file = f"output/{config_file_name}.csv"
self.symbol_mapping = symbol_mapping
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def _api_get_balance(self) -> str:
try:
self.logger.info("查询账户余额")
balance = self.exchange.fetch_balance()
free = balance.get("free", {})
balances = ""
for asset, amount in free.items():
if amount and amount > 0:
balances += f"{amount} {asset} "
self.logger.info("获取账户余额成功")
return balances
except Exception as e:
self.logger.error("查询账户信息失败: %s", str(e))
return f"ERROR: {str(e)}"
def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]:
"""
查询订单状态
Args:
symbol: 交易对
order_id: 订单ID
Returns:
订单详情字典或None(如果失败)
"""
params = {
"symbol": symbol,
"orderId": order_id,
}
try:
self.logger.info("查询订单状态, 订单ID: %s", order_id)
order = self.exchange.fetch_order(order_id, symbol)
self.logger.info("订单状态: %s", order.get("status"))
logger.info("查询订单状态, 订单ID: %s", order_id)
order = self.trader.get_order(params)
logger.info("订单状态: %s", order.get("status"))
return order
except Exception as e:
self.logger.error("查询订单失败: %s", str(e))
logger.error("查询订单失败: %s", str(e))
return None
def _tool_map_symbol(self, symbol: str) -> str:
"""映射证券代码用于记录"""
return self.symbol_mapping.get(symbol, symbol)
def _tool_validate_order_params(self, order_type: str, params: Dict[str, Any]) -> Tuple[bool, str]:
def _tool_validate_order_params(
self, order_type: str, params: Dict[str, Any]
) -> Tuple[bool, str]:
"""
验证订单参数是否符合要求
Args:
order_type: 订单类型
params: 订单参数
Returns:
元组(是否有效, 错误信息)
"""
required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, [])
if not required_params:
return False, f"未知的订单类型: {order_type}"
if order_type == "limit":
# 特殊处理LIMIT订单
if order_type == "LIMIT":
# 需要price和(quantity或quoteOrderQty)
if "price" not in params:
return False, "limit订单需要price参数"
return False, "LIMIT订单需要price参数"
if "quantity" not in params and "quoteOrderQty" not in params:
return False, "limit订单需要quantity或quoteOrderQty参数"
return False, "LIMIT订单需要quantity或quoteOrderQty参数"
return True, ""
if order_type == "market":
# 特殊处理MARKET订单
if order_type == "MARKET":
if "quantity" not in params and "quoteOrderQty" not in params:
return False, "market订单需要quantity或quoteOrderQty参数"
return False, "MARKET订单需要quantity或quoteOrderQty参数"
return True, ""
# 检查其他订单类型的必需参数
missing_params = [p for p in required_params if p not in params]
if missing_params:
return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}"
return True, ""
def _tool_sci_to_decimal(self, num, precision = 30):
if isinstance(num, str):
num = float(num)
result = f"{num:.{precision}f}"
result = result.rstrip('0').rstrip('.')
return result
def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool:
"""
记录交易到CSV文件
Args:
order_data: 订单数据字典
Returns:
是否成功记录
"""
try:
original_symbol = order_data["symbol"]
mapped_symbol = self._tool_map_symbol(original_symbol)
order_id = order_data["id"]
executed_qty = self._tool_sci_to_decimal(order_data.get("filled", 0.0))
cummulative_quote_qty = self._tool_sci_to_decimal(order_data.get("cost", 0.0))
order_id = order_data["orderId"]
executed_qty = order_data["executedQty"]
cummulative_quote_qty = order_data["cummulativeQuoteQty"]
side = order_data["side"]
balances = self._api_get_balance()
trade_type = "买入" if side == "buy" else "卖出"
timestamp = datetime.fromtimestamp(order_data["timestamp"] / 1000).strftime("%Y-%m-%dT%H:%M")
# 确定交易类型显示
trade_type = "买入" if side == "BUY" else "卖出"
timestamp = datetime.fromtimestamp(order_data["time"] / 1000).strftime(
"%Y-%m-%dT%H:%M"
)
row = [
timestamp,
@@ -188,10 +267,10 @@ class BotSpotTrade:
cummulative_quote_qty,
"资金账户",
"CEX",
f"DCA Order ID: {order_id}",
balances,
f"MEXC API - Order ID: {order_id}",
]
# 检查文件是否存在
file_exists = False
try:
with open(self.csv_file, "r", encoding="utf-8") as f:
@@ -199,118 +278,191 @@ class BotSpotTrade:
except FileNotFoundError:
pass
# 写入CSV
with open(self.csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow([
"日期", "类型", "证券代码", "份额", "净额",
"现金账户", "目标账户", "备注", "balances",
])
writer.writerow(
[
"日期",
"类型",
"证券代码",
"份额",
"净额",
"现金账户",
"目标账户",
"备注",
]
)
writer.writerow(row)
self.logger.info("交易记录成功, 订单ID: %s", order_id)
logger.info("交易记录成功, 订单ID: %s", order_id)
return True
except Exception as e:
self.logger.error("记录交易失败: %s", str(e))
logger.error("记录交易失败: %s", str(e))
return False
def trade(self, symbol: str, order_type: str, side: str, **kwargs) -> Optional[Dict[str, Any]]:
if side not in ["buy", "sell"]:
self.logger.error("无效的交易方向: %s", side)
def _tool_calculate_quantity(
self,
quantity: float,
price: float,
base_asset_precision: int,
quote_amount_precision: float,
) -> float:
"""
调整下单数量以满足最小成交额要求。
策略说明:
- 计算出的quantity如果乘以price后小于交易对的最小成交额(quoteAmountPrecision)
则将quantity增加一个最小单位(10^(-base_asset_precision)),确保下单金额满足交易所要求。
Args:
quantity: 初步计算出的下单数量
price: 当前价格
base_asset_precision: 交易对基础资产的小数精度
quote_amount_precision: 交易对最小成交额
Returns:
满足最小成交额要求的下单数量
"""
processed_quantity = round(quantity, base_asset_precision)
if processed_quantity * price < quote_amount_precision:
logger.info(
"计算的quantity小于最低要求%f * %f = %f < %f,进行调整",
processed_quantity,
price,
processed_quantity * price,
quote_amount_precision,
)
processed_quantity = round(
quantity + 10 ** (-base_asset_precision), base_asset_precision
)
logger.info("调整后的quantity: %f", processed_quantity)
return processed_quantity
def trade(
self, symbol: str, order_type: str, side: str, **kwargs
) -> Optional[Dict[str, Any]]:
"""
执行现货交易
Args:
symbol: 交易对 (如 BTCUSDC)
order_type: 订单类型 (LIMIT, MARKET等)
side: 交易方向 (BUY, SELL)
**kwargs: 其他订单参数
Returns:
订单信息字典或None(如果失败)
Raises:
ValueError: 当参数验证失败时
Exception: 当API调用失败时
"""
# 基本参数验证
if side not in ["BUY", "SELL"]:
logger.error("无效的交易方向: %s", side)
return None
order_type = order_type.upper()
processed_kwargs = kwargs.copy()
# 记录未经过偏移的价格以供LIMIT订单只有quoteOrderQty没有quantity的情况使用
# 参数有price时直接使用否则就为实时价格
clean_price = processed_kwargs.get("price")
if order_type == "limit" and "price" not in processed_kwargs:
# 处理无price的情况获取实时价格
if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs:
current_price = self.market.get_price(symbol)
if current_price is None:
self.logger.error("无法获取实时价格,交易取消")
logger.error("无法获取实时价格,交易取消")
return None
clean_price = current_price
# 防止挂单不成交
if side == "buy":
processed_kwargs["price"] = current_price * 1.001 # 买入加价0.1%
elif side == "sell":
processed_kwargs["price"] = current_price * 0.999 # 卖出减价0.1%
self.logger.info("使用调整0.1%%后价格作为限价: %f", processed_kwargs["price"])
if side == "BUY":
processed_kwargs["price"] = current_price * 1.01 # 买入加价1%
elif side == "SELL":
processed_kwargs["price"] = current_price * 0.99 # 卖出减价1%
logger.info("使用调整1%%后价格作为限价: %f", processed_kwargs["price"])
# 处理LIMIT订单只有quoteOrderQty没有quantity的情况
if (
order_type == "limit"
order_type in ["LIMIT", "LIMIT_MAKER"]
and "quoteOrderQty" in processed_kwargs
and "quantity" not in processed_kwargs
):
try:
exchange_info = self.market.get_exchange_info(symbol)
if not exchange_info:
return None
quote_amount = float(processed_kwargs["quoteOrderQty"])
if clean_price is None:
self.logger.error("无法获取价格来计算数量")
return None
price_for_calc = float(clean_price)
quantity = quote_amount / price_for_calc
self.logger.info("根据quoteOrderQty计算quantity: %f", quantity)
processed_kwargs["quantity"] = str(quantity)
quantity = quote_amount / clean_price
base_asset_precision = int(
exchange_info["symbols"][0]["baseAssetPrecision"]
)
quote_amount_precision = float(
exchange_info["symbols"][0]["quoteAmountPrecision"]
)
processed_quantity = self._tool_calculate_quantity(
quantity, clean_price, base_asset_precision, quote_amount_precision
)
logger.info("根据quoteOrderQty计算quantity: %f", processed_quantity)
processed_kwargs["quantity"] = str(processed_quantity)
processed_kwargs.pop("quoteOrderQty")
except (ValueError, KeyError, TypeError) as e:
self.logger.error("计算quantity失败: %s", str(e))
except (ValueError, KeyError) as e:
logger.error("计算quantity失败: %s", str(e))
return None
amount = processed_kwargs.get("quantity")
price = processed_kwargs.get("price")
if amount is not None:
amount = float(amount)
if price is not None:
price = float(price)
# 准备订单参数
base_params = {
"symbol": symbol,
"side": side,
"type": order_type,
**processed_kwargs,
}
params = {}
if order_type == "market" and "quoteOrderQty" in processed_kwargs:
current_price = self.market.get_price(symbol)
if current_price is None:
self.logger.error("无法获取实时价格,交易取消")
return None
clean_price = current_price
amount = float(processed_kwargs["quoteOrderQty"]) / clean_price
is_valid, error_msg = self._tool_validate_order_params(order_type, processed_kwargs)
# 验证参数
is_valid, error_msg = self._tool_validate_order_params(order_type, base_params)
if not is_valid:
self.logger.error("订单参数验证失败: %s", error_msg)
logger.error("订单参数验证失败: %s", error_msg)
return None
try:
self.logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
self.logger.info("订单参数: symbol=%s, type=%s, side=%s, amount=%s, price=%s, params=%s",
symbol, order_type, side, amount, price, params)
logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
logger.debug("订单参数: %s", base_params)
order = self.exchange.create_order(
symbol=symbol,
type=order_type,
side=side,
amount=amount,
price=price,
params=params,
)
order_id = order.get("id")
if not order_id:
self.logger.error("下单失败: 未获取到订单ID")
self.logger.error(order)
# 测试订单
test_result = self.trader.post_order_test(base_params.copy())
if test_result != {}:
logger.error("订单测试失败,参数有误: %s", test_result)
return None
self.logger.info("订单创建成功, 订单ID: %s", order_id)
self.logger.info("等待1秒后查询订单状态...")
logger.info("订单参数测试通过,准备正式下单")
# 正式下单
order = self.trader.post_order(base_params.copy())
order_id = order.get("orderId")
if not order_id:
logger.error("下单失败: 未获取到订单ID")
logger.error(base_params.copy())
logger.error(order)
return None
logger.info("订单创建成功, 订单ID: %s", order_id)
# 查询订单详情
logger.info("等待1秒后查询订单状态...")
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
if not order_detail:
self.logger.error("获取订单详情失败")
logger.error("获取订单详情失败")
return None
# 如果不是FILLED则重复查询最多10次
retry_count = 0
filled_statuses = ("closed", "filled")
while order_detail.get("status", "").lower() not in filled_statuses and retry_count < 10:
while order_detail.get("status") != "FILLED" and retry_count < 10:
retry_count += 1
self.logger.info(
logger.info(
"订单未完成(状态: %s)等待1秒后第%s次重试查询...",
order_detail.get("status", "UNKNOWN"),
retry_count,
@@ -318,14 +470,15 @@ class BotSpotTrade:
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
if not order_detail:
self.logger.error("获取订单详情失败")
logger.error("获取订单详情失败")
return None
if order_detail.get("status", "").lower() in filled_statuses:
# 记录交易
if order_detail.get("status") == "FILLED":
if not self._tool_record_transaction(order_detail):
self.logger.error("交易记录失败")
logger.error("交易记录失败")
else:
self.logger.warning(
logger.warning(
"订单未完成(状态: %s)未被记录到CSV。订单ID: %s",
order_detail.get("status", "UNKNOWN"),
order_id,
@@ -334,99 +487,114 @@ class BotSpotTrade:
return order_detail
except Exception as e:
self.logger.error("交易执行失败: %s", str(e))
logger.error("交易执行失败: %s", str(e))
return None
class TradingConfig:
"""交易配置管理类"""
"""交易配置管理类
负责加载和管理交易配置
方法:
- get_today_trades(): 获取今天需要执行的交易列表
- _load_config(): 加载JSON配置文件
"""
def __init__(self, config_file: str = "config/trading_config.json"):
"""
初始化交易配置
Args:
config_file: 配置文件路径
"""
self.config_file = config_file
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
self.config_data = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""加载JSON配置文件"""
try:
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
self.logger.info("成功加载配置文件: %s", self.config_file)
logger.info("成功加载配置文件: %s", self.config_file)
return config
except FileNotFoundError:
self.logger.error("配置文件不存在: %s", self.config_file)
logger.error("配置文件不存在: %s", self.config_file)
return {}
except json.JSONDecodeError:
self.logger.error("配置文件格式错误不是有效的JSON")
logger.error("配置文件格式错误不是有效的JSON")
return {}
except Exception as e:
self.logger.error("加载配置文件时出错: %s", str(e))
logger.error("加载配置文件时出错: %s", str(e))
return {}
def get_today_trades(self) -> List[Dict[str, Any]]:
"""
获取今天需要执行的交易列表
Returns:
今天需要执行的交易配置列表
Example:
>>> config = TradingConfig()
>>> today_trades = config.get_today_trades()
>>> print(len(today_trades))
"""
if not self.config_data or "trades" not in self.config_data:
return []
today = date.today().isoformat()
today_trades = []
for trade in self.config_data["trades"]:
# 检查交易是否包含执行日期
if "execute_dates" not in trade:
continue
# 检查是否设置了每天执行(*)
if "*" in trade["execute_dates"]:
today_trades.append(trade)
continue
# 检查今天是否在执行日期列表中
if today in trade["execute_dates"]:
today_trades.append(trade)
return today_trades
def build_ccxt_exchange(api_config: Dict[str, Any], exchange_id: str) -> ccxt.Exchange:
"""
根据配置文件和交易所ID构建 ccxt 交易所实例
:param api_config: 包含 api_key, api_secret 等字段的字典
:param exchange_id: ccxt 交易所标识符(小写),如 "mexc", "binance", "bybit"
"""
api_key = api_config.get("api_key")
secret = api_config.get("api_secret")
def git_commit(repo_path: str = ".") -> str:
"""获取Git仓库版本"""
exchange_class = getattr(ccxt, exchange_id, None)
if exchange_class is None:
raise ValueError(f"不支持的交易所ID: {exchange_id},请检查配置文件中的 'exchange' 字段")
exchange_params = {
"apiKey": api_key,
"secret": secret,
"enableRatelimit": True,
}
if "password" in api_config:
exchange_params["password"] = api_config["password"]
return exchange_class(exchange_params)
def git_commit(repo_path: str = ".") -> Optional[str]:
try:
repo = git.Repo(repo_path)
return repo.head.commit.hexsha
except Exception:
except Exception as _:
return None
def main():
logger = logging.getLogger(f"{__name__}.main")
"""主函数"""
logger.info("=" * 40)
# 获取主程序Git仓库版本
app_commit = git_commit(".")[:10]
app_commit = git_commit(".")
app_commit_short = app_commit[:10] if app_commit else "unknown"
# 确保config目录存在
if not os.path.exists("config"):
logger.error("配置目录 config 不存在")
return
config_commit = git_commit("config")
config_commit_short = config_commit[:10] if config_commit else "unknown"
output_commit = git_commit("output")
output_commit_short = output_commit[:10] if output_commit else "unknown"
logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit_short, config_commit_short, output_commit_short)
# 获取config, output仓库版本
config_commit = git_commit("config")[:10]
output_commit = git_commit("output")[:10]
logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit, config_commit, output_commit)
# 获取config目录下所有json文件
config_files = list(Path("config").glob("*.json"))
if not config_files:
logger.info("配置目录中没有找到任何JSON文件")
return
@@ -435,22 +603,16 @@ def main():
for config_file in config_files:
try:
# 提取交易参数
logger.info("处理配置文件: %s", config_file)
config = TradingConfig(str(config_file))
exchange_id = config.config_data.get("exchange")
logger.info("使用交易所: %s", exchange_id)
api_conf = config.config_data.get("api", {})
exchange = build_ccxt_exchange(api_conf, exchange_id)
spot_trader = BotSpotTrade(
exchange=exchange,
symbol_mapping=config.config_data.get("symbol_mapping", {}),
config_file_name=os.path.basename(config_file).replace(".json", ""),
spot_trader = MexcSpotTrade(
config.config_data.get("api", {}),
config.config_data.get("symbol_mapping", {}),
os.path.basename(config_file).replace(".json", ""),
)
today_trades = config.get_today_trades()
if not today_trades:
logger.info("%s - 今天没有需要执行的交易", config_file)
continue
@@ -465,16 +627,21 @@ def main():
params = trade_config.get("params", {})
if not all([symbol, order_type, side]):
logger.error("%s - 交易配置缺少必要参数: %s", config_file, trade_config)
logger.error(
"%s - 交易配置缺少必要参数: %s", config_file, trade_config
)
continue
logger.info("%s - 执行交易: %s %s %s", config_file, symbol, order_type, side)
result = spot_trader.trade(
symbol=symbol,
order_type=order_type,
side=side,
**params,
logger.info(
"%s - 执行交易: %s %s %s", config_file, symbol, order_type, side
)
logger.debug("%s - 交易参数: %s", config_file, params)
# 执行交易
result = spot_trader.trade(
symbol=symbol, order_type=order_type, side=side, **params
)
if result:
logger.info("%s - 交易执行成功: %s", config_file, result)
else:

534
mexc_spot_v3.py Normal file
View File

@@ -0,0 +1,534 @@
import requests
import hmac
import hashlib
from urllib.parse import urlencode, quote
# ServerTime、Signature
class TOOL(object):
def _get_server_time(self):
return requests.request('get', 'https://api.mexc.com/api/v3/time').json()['serverTime']
def _sign_v3(self, req_time, sign_params=None):
if sign_params:
sign_params = urlencode(sign_params, quote_via=quote)
to_sign = "{}&timestamp={}".format(sign_params, req_time)
else:
to_sign = "timestamp={}".format(req_time)
sign = hmac.new(self.mexc_secret.encode('utf-8'), to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
return sign
def public_request(self, method, url, params=None):
url = '{}{}'.format(self.hosts, url)
return requests.request(method, url, params=params)
def sign_request(self, method, url, params=None):
url = '{}{}'.format(self.hosts, url)
req_time = self._get_server_time()
if params:
params['signature'] = self._sign_v3(req_time=req_time, sign_params=params)
else:
params = {}
params['signature'] = self._sign_v3(req_time=req_time)
params['timestamp'] = req_time
headers = {
'x-mexc-apikey': self.mexc_key,
'Content-Type': 'application/json',
}
return requests.request(method, url, params=params, headers=headers)
# Market Data
class mexc_market(TOOL):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config["mexc_host"]
self.method = 'GET'
def get_ping(self):
"""test connectivity"""
url = '{}{}'.format(self.api, '/ping')
response = self.public_request(self.method, url)
return response.json()
def get_timestamp(self):
"""get sever time"""
url = '{}{}'.format(self.api, '/time')
response = self.public_request(self.method, url)
return response.json()
def get_defaultSymbols(self):
"""get defaultSymbols"""
url = '{}{}'.format(self.api, '/defaultSymbols')
response = self.public_request(self.method, url)
return response.json()
def get_exchangeInfo(self, params=None):
"""get exchangeInfo"""
url = '{}{}'.format(self.api, '/exchangeInfo')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_depth(self, params):
"""get symbol depth"""
url = '{}{}'.format(self.api, '/depth')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_deals(self, params):
"""get current trade deals list"""
url = '{}{}'.format(self.api, '/trades')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_aggtrades(self, params):
"""get aggregate trades list"""
url = '{}{}'.format(self.api, '/aggTrades')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_kline(self, params):
"""get k-line data"""
url = '{}{}'.format(self.api, '/klines')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_avgprice(self, params):
"""get current average prcie(default : 5m)"""
url = '{}{}'.format(self.api, '/avgPrice')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_24hr_ticker(self, params=None):
"""get 24hr prcie ticker change statistics"""
url = '{}{}'.format(self.api, '/ticker/24hr')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_price(self, params=None):
"""get symbol price ticker"""
url = '{}{}'.format(self.api, '/ticker/price')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_bookticker(self, params=None):
"""get symbol order book ticker"""
url = '{}{}'.format(self.api, '/ticker/bookTicker')
response = self.public_request(self.method, url, params=params)
return response.json()
def get_ETF_info(self, params=None):
"""get ETF information"""
url = '{}{}'.format(self.api, '/etf/info')
response = self.public_request(self.method, url, params=params)
return response.json()
# Spot Trade
class mexc_trade(TOOL):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_selfSymbols(self):
"""get currency information"""
method = 'GET'
url = '{}{}'.format(self.api, '/selfSymbols')
response = self.sign_request(method, url)
return response.json()
def post_order_test(self, params):
"""test new order"""
method = 'POST'
url = '{}{}'.format(self.api, '/order/test')
response = self.sign_request(method, url, params=params)
return response.json()
def post_order(self, params):
"""place order"""
method = 'POST'
url = '{}{}'.format(self.api, '/order')
response = self.sign_request(method, url, params=params)
return response.json()
def post_batchorders(self, params):
"""place batch orders(same symbol)"""
method = 'POST'
url = '{}{}'.format(self.api, '/batchOrders')
params = {"batchOrders": str(params)}
response = self.sign_request(method, url, params=params)
print(response.url)
return response.json()
def delete_order(self, params):
"""
cancel order
'origClientOrderId' or 'orderId' must be sent
"""
method = 'DELETE'
url = '{}{}'.format(self.api, '/order')
response = self.sign_request(method, url, params=params)
return response.json()
def delete_openorders(self, params):
"""
cancel all order for a single symbol
"""
method = 'DELETE'
url = '{}{}'.format(self.api, '/openOrders')
response = self.sign_request(method, url, params=params)
return response.json()
def get_order(self, params):
"""
get order
'origClientOrderId' or 'orderId' must be sent
"""
method = 'GET'
url = '{}{}'.format(self.api, '/order')
response = self.sign_request(method, url, params=params)
return response.json()
def get_openorders(self, params):
"""get current pending order """
method = 'GET'
url = '{}{}'.format(self.api, '/openOrders')
response = self.sign_request(method, url, params=params)
return response.json()
def get_allorders(self, params):
"""
get current all order
startTime and endTime need to use at the same time
"""
method = 'GET'
url = '{}{}'.format(self.api, '/allOrders')
response = self.sign_request(method, url, params=params)
return response.json()
def get_mytrades(self, params):
"""
get current all order
orderId need to use with symbol at the same time
"""
method = 'GET'
url = '{}{}'.format(self.api, '/myTrades')
response = self.sign_request(method, url, params=params)
return response.json()
def post_mxDeDuct(self, params):
"""Enable MX DeDuct"""
method = 'POST'
url = '{}{}'.format(self.api, '/mxDeduct/enable')
response = self.sign_request(method, url, params=params)
return response.json()
def get_mxDeDuct(self):
"""MX DeDuct status"""
method = 'GET'
url = '{}{}'.format(self.api, '/mxDeduct/enable')
response = self.sign_request(method, url)
return response.json()
def get_account_info(self):
"""get account information"""
method = 'GET'
url = '{}{}'.format(self.api, '/account')
response = self.sign_request(method, url)
return response.json()
# Wallet
class mexc_wallet(TOOL):
def __init__(self, config):
self.api = '/api/v3/capital'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_coinlist(self):
"""get currency information"""
method = 'GET'
url = '{}{}'.format(self.api, '/config/getall')
response = self.sign_request(method, url)
return response.json()
def post_withdraw(self, params):
"""withdraw"""
method = 'POST'
url = '{}{}'.format(self.api, '/withdraw/apply')
response = self.sign_request(method, url, params=params)
return response.json()
def cancel_withdraw(self, params):
"""withdraw"""
method = 'DELETE'
url = '{}{}'.format(self.api, '/withdraw')
response = self.sign_request(method, url, params=params)
return response.json()
def get_deposit_list(self, params):
"""deposit history list"""
method = 'GET'
url = '{}{}'.format(self.api, '/deposit/hisrec')
response = self.sign_request(method, url, params=params)
return response.json()
def get_withdraw_list(self, params):
"""withdraw history list"""
method = 'GET'
url = '{}{}'.format(self.api, '/withdraw/history')
response = self.sign_request(method, url, params=params)
return response.json()
def post_deposit_address(self, params):
"""generate deposit address"""
method = 'POST'
url = '{}{}'.format(self.api, '/deposit/address')
response = self.sign_request(method, url, params=params)
return response.json()
def get_deposit_address(self, params):
"""get deposit address"""
method = 'GET'
url = '{}{}'.format(self.api, '/deposit/address')
response = self.sign_request(method, url, params=params)
return response.json()
def get_withdraw_address(self, params):
"""get deposit address"""
method = 'GET'
url = '{}{}'.format(self.api, '/withdraw/address')
response = self.sign_request(method, url, params=params)
return response.json()
def post_transfer(self, params):
"""universal transfer"""
method = 'POST'
url = '{}{}'.format(self.api, '/transfer')
response = self.sign_request(method, url, params=params)
return response.json()
def get_transfer_list(self, params):
"""universal transfer history"""
method = 'GET'
url = '{}{}'.format(self.api, '/transfer')
response = self.sign_request(method, url, params=params)
return response.json()
def get_transfer_list_byId(self, params):
"""universal transfer history (by tranId)"""
method = 'GET'
url = '{}{}'.format(self.api, '/transfer/tranId')
response = self.sign_request(method, url, params=params)
return response.json()
def post_transfer_internal(self, params):
"""universal transfer"""
method = 'POST'
url = '{}{}'.format(self.api, '/transfer/internal')
response = self.sign_request(method, url, params=params)
return response.json()
def get_transfer_internal_list(self, params=None):
"""universal transfer"""
method = 'GET'
url = '{}{}'.format(self.api, '/transfer/internal')
response = self.sign_request(method, url, params=params)
return response.json()
def get_smallAssets_list(self):
"""small Assets convertible list"""
method = 'GET'
url = '{}{}'.format(self.api, '/convert/list')
response = self.sign_request(method, url)
return response.json()
def post_smallAssets_convert(self, params):
"""small Assets convert"""
method = 'POST'
url = '{}{}'.format(self.api, '/convert')
response = self.sign_request(method, url, params=params)
return response.json()
def get_smallAssets_history(self, params=None):
"""small Assets convertible history"""
method = 'GET'
url = '{}{}'.format(self.api, '/convert')
response = self.sign_request(method, url, params=params)
return response.json()
# Sub-Account
class mexc_subaccount(TOOL):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def post_virtualSubAccount(self, params):
"""create a sub-account"""
method = 'POST'
url = '{}{}'.format(self.api, '/sub-account/virtualSubAccount')
response = self.sign_request(method, url, params=params)
return response.json()
def get_SubAccountList(self, params=None):
"""get sub-account list"""
method = 'GET'
url = '{}{}'.format(self.api, '/sub-account/list')
response = self.sign_request(method, url, params=params)
return response.json()
def post_virtualApiKey(self, params):
"""create sub-account's apikey"""
method = 'POST'
url = '{}{}'.format(self.api, '/sub-account/apiKey')
response = self.sign_request(method, url, params=params)
return response.json()
def get_virtualApiKey(self, params):
"""get sub-account's apikey"""
method = 'GET'
url = '{}{}'.format(self.api, '/sub-account/apiKey')
response = self.sign_request(method, url, params=params)
return response.json()
def delete_virtualApiKey(self, params):
"""delete sub-account's apikey"""
method = 'DELETE'
url = '{}{}'.format(self.api, '/sub-account/apiKey')
response = self.sign_request(method, url, params=params)
return response.json()
def post_universalTransfer(self, params):
"""universal transfer between accounts"""
method = 'POST'
url = '{}{}'.format(self.api, '/capital/sub-account/universalTransfer')
response = self.sign_request(method, url, params=params)
return response.json()
def get_universalTransfer(self, params):
"""universal transfer history between accounts"""
method = 'GET'
url = '{}{}'.format(self.api, '/capital/sub-account/universalTransfer')
response = self.sign_request(method, url, params=params)
return response.json()
# Rebate
class mexc_rebate(TOOL):
def __init__(self, config):
self.api = '/api/v3/rebate'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_taxQuery(self, params=None):
"""get the rebate commission record"""
method = 'GET'
url = '{}{}'.format(self.api, '/taxQuery')
response = self.sign_request(method, url, params=params)
return response.json()
def get_rebate_detail(self, params=None):
"""get rebate record details"""
method = 'GET'
url = '{}{}'.format(self.api, '/detail')
response = self.sign_request(method, url, params=params)
return response.json()
def get_kickback_detail(self, params=None):
"""get self-return record details"""
method = 'GET'
url = '{}{}'.format(self.api, '/detail/kickback')
response = self.sign_request(method, url, params=params)
return response.json()
def get_inviter(self, params=None):
"""get self-return record details"""
method = 'GET'
url = '{}{}'.format(self.api, '/referCode')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_commission(self, params=None):
"""get affiliate commission history"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/commission')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_withdraw(self, params=None):
"""get affiliate withdraw history"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/withdraw')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_commission_detail(self, params=None):
"""get affiliate commission details"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/commission/detail')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_referral(self, params=None):
"""get affiliate referral"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/referral')
response = self.sign_request(method, url, params=params)
return response.json()
def get_affiliate_subaffiliates(self, params=None):
"""get affiliate subaffiliates"""
method = 'GET'
url = '{}{}'.format(self.api, '/affiliate/subaffiliates')
response = self.sign_request(method, url, params=params)
return response.json()
# WebSocket ListenKey
class mexc_listenkey(TOOL):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def post_listenKey(self):
""" generate ListenKey """
method = 'POST'
url = '{}{}'.format(self.api, '/userDataStream')
response = self.sign_request(method, url)
return response.json()
def get_listenKey(self):
""" get valid ListenKey """
method = 'GET'
url = '{}{}'.format(self.api, '/userDataStream')
response = self.sign_request(method, url)
return response.json()
def put_listenKey(self, params):
""" extend ListenKey validity """
method = 'PUT'
url = '{}{}'.format(self.api, '/userDataStream')
response = self.sign_request(method, url, params=params)
return response.json()
def delete_listenKey(self, params):
""" delete ListenKey """
method = 'DELETE'
url = '{}{}'.format(self.api, '/userDataStream')
response = self.sign_request(method, url, params=params)
return response.json()

View File

@@ -10,4 +10,3 @@ six
smmap
tzdata
urllib3
ccxt