Compare commits

..

9 Commits

5 changed files with 335 additions and 64 deletions

View File

@@ -1,8 +1,8 @@
# MEXC 交易机器人
# MEXC 定投交易机器人
## 项目简介
MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执行现货交易。它通过 JSON 配置文件管理交易指令,支持多种订单类型,并自动记录交易日志。
MEXC 定投交易机器人是一个自动化交易工具,用于在 MEXC 交易所执行现货定投交易。它通过 JSON 配置文件管理交易指令,支持多种订单类型,并自动记录交易日志。
## 主要功能

212
main.py
View File

@@ -30,6 +30,7 @@ import json
from pathlib import Path
from datetime import datetime, date
from typing import Dict, Any, Optional, Tuple, List
import git
import mexc_spot_v3
@@ -44,7 +45,6 @@ logging.basicConfig(
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
class MexcSpotMarket:
@@ -53,12 +53,41 @@ class MexcSpotMarket:
提供获取交易对价格等功能
方法:
- get_exchange_info(symbol): 获取交易对信息
- get_price(symbol): 获取指定交易对的当前价格
"""
def __init__(self, config):
"""初始化市场数据查询接口"""
self.market = mexc_spot_v3.mexc_market(config)
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def get_exchange_info(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
获取交易对信息
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
交易对信息字典或None(如果失败)
"""
params = {"symbol": symbol}
try:
self.logger.info("查询交易对信息: %s", symbol)
exchange_info = self.market.get_exchangeInfo(params)
if not exchange_info or "symbols" not in exchange_info:
self.logger.error("获取交易对信息失败: %s", exchange_info)
return None
self.logger.info("获取交易对信息成功")
return exchange_info
except Exception as e:
self.logger.error("查询交易所信息失败: %s", str(e))
return None
def get_price(self, symbol: str) -> Optional[float]:
"""
@@ -77,20 +106,20 @@ class MexcSpotMarket:
params = {"symbol": symbol}
try:
logger.info("查询交易对价格: %s", symbol)
self.logger.info("查询交易对价格: %s", symbol)
price_data = self.market.get_price(params)
if not price_data or "price" not in price_data:
logger.error("获取价格数据失败: %s", price_data)
self.logger.error("获取价格数据失败: %s", price_data)
return None
price_str = price_data["price"]
price = float(price_str)
logger.info("获取价格成功: %s = %f", symbol, price)
self.logger.info("获取价格成功: %s = %f", symbol, price)
return price
except Exception as e:
logger.error("查询价格失败: %s", str(e))
self.logger.error("查询价格失败: %s", str(e))
return None
@@ -124,12 +153,33 @@ class MexcSpotTrade:
"FILL_OR_KILL": ["quantity", "price"],
}
def __init__(self, config, symbol_mapping):
def __init__(self, config, symbol_mapping, config_file_name):
"""初始化交易机器人"""
self.trader = mexc_spot_v3.mexc_trade(config)
self.market = MexcSpotMarket(config)
self.csv_file = "output/mexc_spot_trade.csv"
self.csv_file = f"output/{config_file_name}.csv"
self.symbol_mapping = symbol_mapping
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def _api_get_balance(self) -> str:
"""
获取账户余额
Returns:
账户余额字典或None(如果失败)
"""
try:
self.logger.info("查询账户余额")
account_info = self.trader.get_account_info()
account_info_balance = account_info.get("balances", [])
balances = ""
for item in account_info_balance:
balances += f"{item['available']} {item['asset']} "
self.logger.info("获取账户余额成功")
return balances
except Exception as e:
self.logger.error("查询账户信息失败: %s", str(e))
return f"ERROR: {str(e)}"
def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]:
"""
@@ -149,12 +199,12 @@ class MexcSpotTrade:
}
try:
logger.info("查询订单状态, 订单ID: %s", order_id)
self.logger.info("查询订单状态, 订单ID: %s", order_id)
order = self.trader.get_order(params)
logger.info("订单状态: %s", order.get("status"))
self.logger.info("订单状态: %s", order.get("status"))
return order
except Exception as e:
logger.error("查询订单失败: %s", str(e))
self.logger.error("查询订单失败: %s", str(e))
return None
def _tool_map_symbol(self, symbol: str) -> str:
@@ -222,6 +272,7 @@ class MexcSpotTrade:
executed_qty = order_data["executedQty"]
cummulative_quote_qty = order_data["cummulativeQuoteQty"]
side = order_data["side"]
balances = self._api_get_balance()
# 确定交易类型显示
trade_type = "买入" if side == "BUY" else "卖出"
@@ -239,6 +290,7 @@ class MexcSpotTrade:
"资金账户",
"CEX",
f"MEXC API - Order ID: {order_id}",
balances
]
# 检查文件是否存在
@@ -263,16 +315,55 @@ class MexcSpotTrade:
"现金账户",
"目标账户",
"备注",
"balances"
]
)
writer.writerow(row)
logger.info("交易记录成功, 订单ID: %s", order_id)
self.logger.info("交易记录成功, 订单ID: %s", order_id)
return True
except Exception as e:
logger.error("记录交易失败: %s", str(e))
self.logger.error("记录交易失败: %s", str(e))
return False
def _tool_calculate_quantity(
self,
quantity: float,
price: float,
base_asset_precision: int,
quote_amount_precision: float,
) -> float:
"""
调整下单数量以满足最小成交额要求。
策略说明:
- 计算出的quantity如果乘以price后小于交易对的最小成交额(quoteAmountPrecision)
则将quantity增加一个最小单位(10^(-base_asset_precision)),确保下单金额满足交易所要求。
Args:
quantity: 初步计算出的下单数量
price: 当前价格
base_asset_precision: 交易对基础资产的小数精度
quote_amount_precision: 交易对最小成交额
Returns:
满足最小成交额要求的下单数量
"""
processed_quantity = round(quantity, base_asset_precision)
if processed_quantity * price < quote_amount_precision:
self.logger.info(
"计算的quantity小于最低要求%f * %f = %f < %f,进行调整",
processed_quantity,
price,
processed_quantity * price,
quote_amount_precision,
)
processed_quantity = round(
quantity + 10 ** (-base_asset_precision), base_asset_precision
)
self.logger.info("调整后的quantity: %f", processed_quantity)
return processed_quantity
def trade(
self, symbol: str, order_type: str, side: str, **kwargs
) -> Optional[Dict[str, Any]]:
@@ -295,24 +386,29 @@ class MexcSpotTrade:
# 基本参数验证
if side not in ["BUY", "SELL"]:
logger.error("无效的交易方向: %s", side)
self.logger.error("无效的交易方向: %s", side)
return None
order_type = order_type.upper()
processed_kwargs = kwargs.copy()
# 记录未经过偏移的价格以供LIMIT订单只有quoteOrderQty没有quantity的情况使用
# 参数有price时直接使用否则就为实时价格
clean_price = processed_kwargs.get("price")
# 处理无price的情况获取实时价格
if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs:
current_price = self.market.get_price(symbol)
if current_price is None:
logger.error("无法获取实时价格,交易取消")
self.logger.error("无法获取实时价格,交易取消")
return None
clean_price = current_price
# 防止挂单不成交
if side == "BUY":
processed_kwargs["price"] = current_price * 1.01 # 买入加价0.5%
processed_kwargs["price"] = current_price * 1.01 # 买入加价1%
elif side == "SELL":
processed_kwargs["price"] = current_price * 0.91 # 卖出减价0.5%
logger.info("使用调整0.5%%后价格作为限价: %f", processed_kwargs["price"])
processed_kwargs["price"] = current_price * 0.99 # 卖出减价1%
self.logger.info("使用调整1%%后价格作为限价: %f", processed_kwargs["price"])
# 处理LIMIT订单只有quoteOrderQty没有quantity的情况
if (
@@ -321,14 +417,23 @@ class MexcSpotTrade:
and "quantity" not in processed_kwargs
):
try:
exchange_info = self.market.get_exchange_info(symbol)
quote_amount = float(processed_kwargs["quoteOrderQty"])
price = float(processed_kwargs["price"])
quantity = quote_amount / price
processed_kwargs["quantity"] = str(quantity)
quantity = quote_amount / clean_price
base_asset_precision = int(
exchange_info["symbols"][0]["baseAssetPrecision"]
)
quote_amount_precision = float(
exchange_info["symbols"][0]["quoteAmountPrecision"]
)
processed_quantity = self._tool_calculate_quantity(
quantity, clean_price, base_asset_precision, quote_amount_precision
)
self.logger.info("根据quoteOrderQty计算quantity: %f", processed_quantity)
processed_kwargs["quantity"] = str(processed_quantity)
processed_kwargs.pop("quoteOrderQty")
logger.info("根据quoteOrderQty计算quantity: %f", quantity)
except (ValueError, KeyError) as e:
logger.error("计算quantity失败: %s", str(e))
self.logger.error("计算quantity失败: %s", str(e))
return None
# 准备订单参数
@@ -342,46 +447,46 @@ class MexcSpotTrade:
# 验证参数
is_valid, error_msg = self._tool_validate_order_params(order_type, base_params)
if not is_valid:
logger.error("订单参数验证失败: %s", error_msg)
self.logger.error("订单参数验证失败: %s", error_msg)
return None
try:
logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
logger.debug("订单参数: %s", base_params)
self.logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
self.logger.debug("订单参数: %s", base_params)
# 测试订单
test_result = self.trader.post_order_test(base_params.copy())
if test_result != {}:
logger.error("订单测试失败,参数有误: %s", test_result)
self.logger.error("订单测试失败,参数有误: %s", test_result)
return None
logger.info("订单参数测试通过,准备正式下单")
self.logger.info("订单参数测试通过,准备正式下单")
# 正式下单
order = self.trader.post_order(base_params.copy())
order_id = order.get("orderId")
if not order_id:
logger.error("下单失败: 未获取到订单ID")
logger.error(base_params.copy())
logger.error(order)
self.logger.error("下单失败: 未获取到订单ID")
self.logger.error(base_params.copy())
self.logger.error(order)
return None
logger.info("订单创建成功, 订单ID: %s", order_id)
self.logger.info("订单创建成功, 订单ID: %s", order_id)
# 查询订单详情
logger.info("等待1秒后查询订单状态...")
self.logger.info("等待1秒后查询订单状态...")
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
if not order_detail:
logger.error("获取订单详情失败")
self.logger.error("获取订单详情失败")
return None
# 如果不是FILLED则重复查询最多10次
retry_count = 0
while order_detail.get("status") != "FILLED" and retry_count < 10:
retry_count += 1
logger.info(
self.logger.info(
"订单未完成(状态: %s)等待1秒后第%s次重试查询...",
order_detail.get("status", "UNKNOWN"),
retry_count,
@@ -389,15 +494,15 @@ class MexcSpotTrade:
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
if not order_detail:
logger.error("获取订单详情失败")
self.logger.error("获取订单详情失败")
return None
# 记录交易
if order_detail.get("status") == "FILLED":
if not self._tool_record_transaction(order_detail):
logger.error("交易记录失败")
self.logger.error("交易记录失败")
else:
logger.warning(
self.logger.warning(
"订单未完成(状态: %s)未被记录到CSV。订单ID: %s",
order_detail.get("status", "UNKNOWN"),
order_id,
@@ -406,7 +511,7 @@ class MexcSpotTrade:
return order_detail
except Exception as e:
logger.error("交易执行失败: %s", str(e))
self.logger.error("交易执行失败: %s", str(e))
return None
@@ -429,6 +534,7 @@ class TradingConfig:
"""
self.config_file = config_file
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
self.config_data = self._load_config()
def _load_config(self) -> Dict[str, Any]:
@@ -436,16 +542,16 @@ class TradingConfig:
try:
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
logger.info("成功加载配置文件: %s", self.config_file)
self.logger.info("成功加载配置文件: %s", self.config_file)
return config
except FileNotFoundError:
logger.error("配置文件不存在: %s", self.config_file)
self.logger.error("配置文件不存在: %s", self.config_file)
return {}
except json.JSONDecodeError:
logger.error("配置文件格式错误不是有效的JSON")
self.logger.error("配置文件格式错误不是有效的JSON")
return {}
except Exception as e:
logger.error("加载配置文件时出错: %s", str(e))
self.logger.error("加载配置文件时出错: %s", str(e))
return {}
def get_today_trades(self) -> List[Dict[str, Any]]:
@@ -484,14 +590,35 @@ class TradingConfig:
return today_trades
def git_commit(repo_path: str = ".") -> str:
"""获取Git仓库版本"""
try:
repo = git.Repo(repo_path)
return repo.head.commit.hexsha
except Exception as _:
return None
def main():
"""主函数"""
logger = logging.getLogger(f"{__name__}.main")
logger.info("=" * 40)
# 获取主程序Git仓库版本
app_commit = git_commit(".")[:10]
# 确保config目录存在
if not os.path.exists("config"):
logger.error("配置目录 config 不存在")
return
# 获取config, output仓库版本
config_commit = git_commit("config")[:10]
output_commit = git_commit("output")[:10]
logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit, config_commit, output_commit)
# 获取config目录下所有json文件
config_files = list(Path("config").glob("*.json"))
@@ -509,6 +636,7 @@ def main():
spot_trader = MexcSpotTrade(
config.config_data.get("api", {}),
config.config_data.get("symbol_mapping", {}),
os.path.basename(config_file).replace(".json", ""),
)
today_trades = config.get_today_trades()

12
requirements.txt Normal file
View File

@@ -0,0 +1,12 @@
certifi
charset-normalizer
gitdb
GitPython
icalendar
idna
python-dateutil
requests
six
smmap
tzdata
urllib3

129
tool_csv_merge.py Normal file
View File

@@ -0,0 +1,129 @@
"""CSV 文件处理模块
该模块用于合并同一证券在同一天的多笔交易记录,并生成汇总后的交易记录。
"""
import csv
import os
from collections import defaultdict
def process_csv(input_file, output_file):
"""处理CSV文件合并相同证券在同一天的交易记录
Args:
input_file (str): 输入CSV文件路径
output_file (str): 输出CSV文件路径
"""
merged_records = defaultdict(
lambda: {
"buy_shares": 0.0,
"buy_amount": 0.0,
"sell_shares": 0.0,
"sell_amount": 0.0,
"order_ids": set(),
"first_record": None,
"time_part": None,
}
)
with open(input_file, mode="r", newline="", encoding="utf-8") as infile:
reader = csv.DictReader(infile)
for row in reader:
datetime_str = row["日期"]
date_part, time_part = datetime_str.split("T")
order_id = row["备注"].split("Order ID: ")[-1].strip()
merge_key = (date_part, row["证券代码"])
record = merged_records[merge_key]
if record["first_record"] is None:
record["first_record"] = row
record["time_part"] = time_part
record["order_ids"].add(order_id)
if row["类型"] == "买入":
record["buy_shares"] += float(row["份额"])
record["buy_amount"] += float(row["净额"])
elif row["类型"] == "卖出":
record["sell_shares"] += float(row["份额"])
record["sell_amount"] += float(row["净额"])
output_rows = []
for key, record in merged_records.items():
date_part, symbol = key
first_row = record["first_record"]
net_shares = record["buy_shares"] - record["sell_shares"]
net_amount = record["buy_amount"] - record["sell_amount"]
if net_shares >= 0:
operation_type = "买入"
display_shares = net_shares
display_amount = net_amount
else:
operation_type = "卖出"
display_shares = -net_shares
display_amount = -net_amount
# 格式化为完整小数形式,不使用科学计数法
formatted_shares = (
f"{display_shares:f}".rstrip("0").rstrip(".")
if "." in f"{display_shares:f}"
else f"{display_shares:f}"
)
formatted_amount = (
f"{display_amount:f}".rstrip("0").rstrip(".")
if "." in f"{display_amount:f}"
else f"{display_amount:f}"
)
merged_row = {
"日期": f"{date_part}T{record['time_part']}",
"类型": operation_type,
"证券代码": symbol,
"份额": formatted_shares,
"净额": formatted_amount,
"现金账户": first_row["现金账户"],
"目标账户": first_row["目标账户"],
"备注": f"MEXC API - Order ID: {', '.join(sorted(record['order_ids']))}",
}
output_rows.append(merged_row)
# 写入输出文件
with open(output_file, mode="w", newline="", encoding="utf-8") as outfile:
fieldnames = [
"日期",
"类型",
"证券代码",
"份额",
"净额",
"现金账户",
"目标账户",
"备注",
]
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(output_rows)
def process_all_csvs(input_dir="output"):
"""处理指定目录下的所有CSV文件
Args:
input_dir (str): 包含CSV文件的目录路径
"""
for filename in os.listdir(input_dir):
if filename.endswith(".csv") and not filename.startswith("merged_"):
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(input_dir, f"merged_{filename}")
process_csv(input_path, output_path)
print(f"处理完成: {filename} -> merged_{filename}")
if __name__ == "__main__":
process_all_csvs()
print("所有文件处理完成")

View File

@@ -37,12 +37,11 @@ def load_config(config_path="config/trading_config.json"):
raise
def create_event(symbol, comment, date_obj, description):
def create_event(summary: str, date_obj, description):
"""创建单个日历事件。
Args:
symbol (str): 交易对符号
comment (str): 交易备注
summary (str): 事件标题
date_obj (date): 交易日期
description (str): 事件描述
@@ -50,10 +49,13 @@ def create_event(symbol, comment, date_obj, description):
Event: 创建好的事件对象
"""
event = Event()
event.add("uid", f"{symbol.lower()}-{date_obj.strftime('%Y%m%d')}@trade")
event.add(
"uid",
f"{summary.lower().replace('/', '-')}-{date_obj.strftime('%Y%m%d')}@trade",
)
event.add("dtstamp", datetime.now())
event.add("dtstart", date_obj)
event.add("summary", f"{symbol} 交易 ({comment})")
event.add("summary", summary)
event.add("description", description)
return event
@@ -71,10 +73,8 @@ def generate_ics(config):
cal.add("prodid", "-//Trading Calendar//EN")
cal.add("version", "2.0")
symbol_mapping = config.get("symbol_mapping", {})
for trade in config.get("trades", []):
symbol = symbol_mapping.get(trade["symbol"], trade["symbol"])
symbol = trade["symbol"]
order_type = trade["order_type"]
side = trade["side"]
comment = trade["comment"]
@@ -82,27 +82,29 @@ def generate_ics(config):
# 构建描述
amount = trade["params"].get("quoteOrderQty")
quantity = trade["params"].get("quantity")
description = f"交易类型: {order_type}\n方向: {side}"
if quantity:
description = f"{description}\n数量: {quantity}"
if amount:
description = f"{description}\n金额: {amount}"
description = f"{description}\n备注: {comment}"
if trade["execute_dates"] == ["*"]:
event = create_event(symbol, comment, datetime.now().date(), description)
summary = f"{symbol}/{order_type}/{side}"
if amount:
summary = f"{summary}/{amount}{symbol[-4:]}"
if quantity:
summary = f"{summary}/{quantity}{symbol[:-4]}"
event = create_event(summary, datetime.now().date(), comment)
event.add("rrule", {"freq": "daily"})
cal.add_component(event)
logger.info("已创建项目:%s 交易 (%s) 每日", symbol, comment)
logger.info("已创建项目:每日 %s", summary)
else:
for date_str in trade["execute_dates"]:
try:
date_obj = datetime.strptime(date_str, "%Y-%m-%d").date()
event = create_event(symbol, comment, date_obj, description)
summary = f"{symbol}/{order_type}/{side}"
if amount:
summary = f"{summary}/{amount}{symbol[-4:]}"
if quantity:
summary = f"{summary}/{quantity}{symbol[:-4]}"
event = create_event(summary, date_obj, comment)
cal.add_component(event)
logger.info(
"已创建项目:%s 交易 (%s) %s", symbol, comment, date_str
)
logger.info("已创建项目:%s %s", date_str, summary)
except ValueError as ex:
logger.warning("跳过无效日期 %s: %s", date_str, ex)