Compare commits

..

12 Commits

8 changed files with 458 additions and 133 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
output/ output/
public/ public/
config/
__pycache__/ __pycache__/

View File

@@ -1,8 +1,8 @@
# MEXC 交易机器人 # MEXC 定投交易机器人
## 项目简介 ## 项目简介
MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执行现货交易。它通过 JSON 配置文件管理交易指令,支持多种订单类型,并自动记录交易日志。 MEXC 定投交易机器人是一个自动化交易工具,用于在 MEXC 交易所执行现货定投交易。它通过 JSON 配置文件管理交易指令,支持多种订单类型,并自动记录交易日志。
## 主要功能 ## 主要功能
@@ -16,7 +16,7 @@ MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执
## 安装与使用 ## 安装与使用
1. 创建配置文件`trading_config.json` 1. 创建配置文件`config/trading_config.json`(可有多个不同名字的`.json`文件,程序会自动遍历所有)
2. 运行程序: 2. 运行程序:
```bash ```bash
@@ -31,6 +31,11 @@ MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执
```json ```json
{ {
"api": {
"mexc_host": "https://api.mexc.com",
"api_key": "mxxxxxxxxxx",
"secret_key": "xxxxxxxxxxxxxxxxx"
},
"symbol_mapping": { "symbol_mapping": {
"BTCUSDC": "BTCUSDT" "BTCUSDC": "BTCUSDT"
}, },
@@ -51,12 +56,14 @@ MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执
### 配置字段详解 ### 配置字段详解
#### 1. 证券代码映射 (`symbol_mapping`) #### 1. API 配置 (`api`)
#### 2. 证券代码映射 (`symbol_mapping`)
- 类型:数组 - 类型:数组
- 描述API代码: CSV记录代码如`"BTCUSDC": "BTCUSDT"`代表向API请求`BTCUSDC`但CSV中记录证券代码`BTCUSDT` - 描述API 代码: CSV 记录代码,如`"BTCUSDC": "BTCUSDT"`代表向 API 请求`BTCUSDC`,但 CSV 中记录证券代码`BTCUSDT`
#### 2. 交易列表 (`trades`) #### 3. 交易列表 (`trades`)
- 类型:数组 - 类型:数组
- 描述:包含所有交易指令的列表 - 描述:包含所有交易指令的列表

View File

@@ -1,3 +0,0 @@
mexc_host = "https://api.mexc.com"
api_key = "mx0vglky5BuzlcK5HQ"
secret_key = "e2a0c4737b4643bbac4ad5f8b26dcce2"

286
main.py
View File

@@ -27,8 +27,10 @@ import logging
import os import os
import time import time
import json import json
from pathlib import Path
from datetime import datetime, date from datetime import datetime, date
from typing import Dict, Any, Optional, Tuple, List from typing import Dict, Any, Optional, Tuple, List
import git
import mexc_spot_v3 import mexc_spot_v3
@@ -43,7 +45,6 @@ logging.basicConfig(
logging.StreamHandler(), logging.StreamHandler(),
], ],
) )
logger = logging.getLogger(__name__)
class MexcSpotMarket: class MexcSpotMarket:
@@ -52,12 +53,41 @@ class MexcSpotMarket:
提供获取交易对价格等功能 提供获取交易对价格等功能
方法: 方法:
- get_exchange_info(symbol): 获取交易对信息
- get_price(symbol): 获取指定交易对的当前价格 - get_price(symbol): 获取指定交易对的当前价格
""" """
def __init__(self): def __init__(self, config):
"""初始化市场数据查询接口""" """初始化市场数据查询接口"""
self.market = mexc_spot_v3.mexc_market() self.market = mexc_spot_v3.mexc_market(config)
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def get_exchange_info(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
获取交易对信息
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
交易对信息字典或None(如果失败)
"""
params = {"symbol": symbol}
try:
self.logger.info("查询交易对信息: %s", symbol)
exchange_info = self.market.get_exchangeInfo(params)
if not exchange_info or "symbols" not in exchange_info:
self.logger.error("获取交易对信息失败: %s", exchange_info)
return None
self.logger.info("获取交易对信息成功")
return exchange_info
except Exception as e:
self.logger.error("查询交易所信息失败: %s", str(e))
return None
def get_price(self, symbol: str) -> Optional[float]: def get_price(self, symbol: str) -> Optional[float]:
""" """
@@ -76,20 +106,20 @@ class MexcSpotMarket:
params = {"symbol": symbol} params = {"symbol": symbol}
try: try:
logger.info("查询交易对价格: %s", symbol) self.logger.info("查询交易对价格: %s", symbol)
price_data = self.market.get_price(params) price_data = self.market.get_price(params)
if not price_data or "price" not in price_data: if not price_data or "price" not in price_data:
logger.error("获取价格数据失败: %s", price_data) self.logger.error("获取价格数据失败: %s", price_data)
return None return None
price_str = price_data["price"] price_str = price_data["price"]
price = float(price_str) price = float(price_str)
logger.info("获取价格成功: %s = %f", symbol, price) self.logger.info("获取价格成功: %s = %f", symbol, price)
return price return price
except Exception as e: except Exception as e:
logger.error("查询价格失败: %s", str(e)) self.logger.error("查询价格失败: %s", str(e))
return None return None
@@ -123,12 +153,33 @@ class MexcSpotTrade:
"FILL_OR_KILL": ["quantity", "price"], "FILL_OR_KILL": ["quantity", "price"],
} }
def __init__(self): def __init__(self, config, symbol_mapping, config_file_name):
"""初始化交易机器人""" """初始化交易机器人"""
self.trader = mexc_spot_v3.mexc_trade() self.trader = mexc_spot_v3.mexc_trade(config)
self.market = MexcSpotMarket() self.market = MexcSpotMarket(config)
self.csv_file = "output/mexc_spot_trade.csv" self.csv_file = f"output/{config_file_name}.csv"
self.symbol_mapping = TradingConfig().config_data.get("symbol_mapping", {}) self.symbol_mapping = symbol_mapping
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def _api_get_balance(self) -> str:
"""
获取账户余额
Returns:
账户余额字典或None(如果失败)
"""
try:
self.logger.info("查询账户余额")
account_info = self.trader.get_account_info()
account_info_balance = account_info.get("balances", [])
balances = ""
for item in account_info_balance:
balances += f"{item['available']} {item['asset']} "
self.logger.info("获取账户余额成功")
return balances
except Exception as e:
self.logger.error("查询账户信息失败: %s", str(e))
return f"ERROR: {str(e)}"
def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]: def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]:
""" """
@@ -148,12 +199,12 @@ class MexcSpotTrade:
} }
try: try:
logger.info("查询订单状态, 订单ID: %s", order_id) self.logger.info("查询订单状态, 订单ID: %s", order_id)
order = self.trader.get_order(params) order = self.trader.get_order(params)
logger.info("订单状态: %s", order.get("status")) self.logger.info("订单状态: %s", order.get("status"))
return order return order
except Exception as e: except Exception as e:
logger.error("查询订单失败: %s", str(e)) self.logger.error("查询订单失败: %s", str(e))
return None return None
def _tool_map_symbol(self, symbol: str) -> str: def _tool_map_symbol(self, symbol: str) -> str:
@@ -221,6 +272,7 @@ class MexcSpotTrade:
executed_qty = order_data["executedQty"] executed_qty = order_data["executedQty"]
cummulative_quote_qty = order_data["cummulativeQuoteQty"] cummulative_quote_qty = order_data["cummulativeQuoteQty"]
side = order_data["side"] side = order_data["side"]
balances = self._api_get_balance()
# 确定交易类型显示 # 确定交易类型显示
trade_type = "买入" if side == "BUY" else "卖出" trade_type = "买入" if side == "BUY" else "卖出"
@@ -238,6 +290,7 @@ class MexcSpotTrade:
"资金账户", "资金账户",
"CEX", "CEX",
f"MEXC API - Order ID: {order_id}", f"MEXC API - Order ID: {order_id}",
balances
] ]
# 检查文件是否存在 # 检查文件是否存在
@@ -262,16 +315,55 @@ class MexcSpotTrade:
"现金账户", "现金账户",
"目标账户", "目标账户",
"备注", "备注",
"balances"
] ]
) )
writer.writerow(row) writer.writerow(row)
logger.info("交易记录成功, 订单ID: %s", order_id) self.logger.info("交易记录成功, 订单ID: %s", order_id)
return True return True
except Exception as e: except Exception as e:
logger.error("记录交易失败: %s", str(e)) self.logger.error("记录交易失败: %s", str(e))
return False return False
def _tool_calculate_quantity(
self,
quantity: float,
price: float,
base_asset_precision: int,
quote_amount_precision: float,
) -> float:
"""
调整下单数量以满足最小成交额要求。
策略说明:
- 计算出的quantity如果乘以price后小于交易对的最小成交额(quoteAmountPrecision)
则将quantity增加一个最小单位(10^(-base_asset_precision)),确保下单金额满足交易所要求。
Args:
quantity: 初步计算出的下单数量
price: 当前价格
base_asset_precision: 交易对基础资产的小数精度
quote_amount_precision: 交易对最小成交额
Returns:
满足最小成交额要求的下单数量
"""
processed_quantity = round(quantity, base_asset_precision)
if processed_quantity * price < quote_amount_precision:
self.logger.info(
"计算的quantity小于最低要求%f * %f = %f < %f,进行调整",
processed_quantity,
price,
processed_quantity * price,
quote_amount_precision,
)
processed_quantity = round(
quantity + 10 ** (-base_asset_precision), base_asset_precision
)
self.logger.info("调整后的quantity: %f", processed_quantity)
return processed_quantity
def trade( def trade(
self, symbol: str, order_type: str, side: str, **kwargs self, symbol: str, order_type: str, side: str, **kwargs
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
@@ -294,20 +386,29 @@ class MexcSpotTrade:
# 基本参数验证 # 基本参数验证
if side not in ["BUY", "SELL"]: if side not in ["BUY", "SELL"]:
logger.error("无效的交易方向: %s", side) self.logger.error("无效的交易方向: %s", side)
return None return None
order_type = order_type.upper() order_type = order_type.upper()
processed_kwargs = kwargs.copy() processed_kwargs = kwargs.copy()
# 记录未经过偏移的价格以供LIMIT订单只有quoteOrderQty没有quantity的情况使用
# 参数有price时直接使用否则就为实时价格
clean_price = processed_kwargs.get("price")
# 处理无price的情况获取实时价格 # 处理无price的情况获取实时价格
if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs: if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs:
current_price = self.market.get_price(symbol) current_price = self.market.get_price(symbol)
if current_price is None: if current_price is None:
logger.error("无法获取实时价格,交易取消") self.logger.error("无法获取实时价格,交易取消")
return None return None
processed_kwargs["price"] = current_price clean_price = current_price
logger.info("使用实时价格作为限价: %f", current_price) # 防止挂单不成交
if side == "BUY":
processed_kwargs["price"] = current_price * 1.01 # 买入加价1%
elif side == "SELL":
processed_kwargs["price"] = current_price * 0.99 # 卖出减价1%
self.logger.info("使用调整1%%后价格作为限价: %f", processed_kwargs["price"])
# 处理LIMIT订单只有quoteOrderQty没有quantity的情况 # 处理LIMIT订单只有quoteOrderQty没有quantity的情况
if ( if (
@@ -316,13 +417,23 @@ class MexcSpotTrade:
and "quantity" not in processed_kwargs and "quantity" not in processed_kwargs
): ):
try: try:
exchange_info = self.market.get_exchange_info(symbol)
quote_amount = float(processed_kwargs["quoteOrderQty"]) quote_amount = float(processed_kwargs["quoteOrderQty"])
price = float(processed_kwargs["price"]) quantity = quote_amount / clean_price
quantity = quote_amount / price base_asset_precision = int(
processed_kwargs["quantity"] = str(quantity) exchange_info["symbols"][0]["baseAssetPrecision"]
logger.info("根据quoteOrderQty计算quantity: %f", quantity) )
quote_amount_precision = float(
exchange_info["symbols"][0]["quoteAmountPrecision"]
)
processed_quantity = self._tool_calculate_quantity(
quantity, clean_price, base_asset_precision, quote_amount_precision
)
self.logger.info("根据quoteOrderQty计算quantity: %f", processed_quantity)
processed_kwargs["quantity"] = str(processed_quantity)
processed_kwargs.pop("quoteOrderQty")
except (ValueError, KeyError) as e: except (ValueError, KeyError) as e:
logger.error("计算quantity失败: %s", str(e)) self.logger.error("计算quantity失败: %s", str(e))
return None return None
# 准备订单参数 # 准备订单参数
@@ -336,46 +447,46 @@ class MexcSpotTrade:
# 验证参数 # 验证参数
is_valid, error_msg = self._tool_validate_order_params(order_type, base_params) is_valid, error_msg = self._tool_validate_order_params(order_type, base_params)
if not is_valid: if not is_valid:
logger.error("订单参数验证失败: %s", error_msg) self.logger.error("订单参数验证失败: %s", error_msg)
return None return None
try: try:
logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol) self.logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
logger.debug("订单参数: %s", base_params) self.logger.debug("订单参数: %s", base_params)
# 测试订单 # 测试订单
test_result = self.trader.post_order_test(base_params.copy()) test_result = self.trader.post_order_test(base_params.copy())
if test_result != {}: if test_result != {}:
logger.error("订单测试失败,参数有误: %s", test_result) self.logger.error("订单测试失败,参数有误: %s", test_result)
return None return None
logger.info("订单参数测试通过,准备正式下单") self.logger.info("订单参数测试通过,准备正式下单")
# 正式下单 # 正式下单
order = self.trader.post_order(base_params.copy()) order = self.trader.post_order(base_params.copy())
order_id = order.get("orderId") order_id = order.get("orderId")
if not order_id: if not order_id:
logger.error("下单失败: 未获取到订单ID") self.logger.error("下单失败: 未获取到订单ID")
logger.error(base_params.copy()) self.logger.error(base_params.copy())
logger.error(order) self.logger.error(order)
return None return None
logger.info("订单创建成功, 订单ID: %s", order_id) self.logger.info("订单创建成功, 订单ID: %s", order_id)
# 查询订单详情 # 查询订单详情
logger.info("等待1秒后查询订单状态...") self.logger.info("等待1秒后查询订单状态...")
time.sleep(1) time.sleep(1)
order_detail = self._api_get_order(symbol, order_id) order_detail = self._api_get_order(symbol, order_id)
if not order_detail: if not order_detail:
logger.error("获取订单详情失败") self.logger.error("获取订单详情失败")
return None return None
# 如果不是FILLED则重复查询最多10次 # 如果不是FILLED则重复查询最多10次
retry_count = 0 retry_count = 0
while order_detail.get("status") != "FILLED" and retry_count < 10: while order_detail.get("status") != "FILLED" and retry_count < 10:
retry_count += 1 retry_count += 1
logger.info( self.logger.info(
"订单未完成(状态: %s)等待1秒后第%s次重试查询...", "订单未完成(状态: %s)等待1秒后第%s次重试查询...",
order_detail.get("status", "UNKNOWN"), order_detail.get("status", "UNKNOWN"),
retry_count, retry_count,
@@ -383,15 +494,15 @@ class MexcSpotTrade:
time.sleep(1) time.sleep(1)
order_detail = self._api_get_order(symbol, order_id) order_detail = self._api_get_order(symbol, order_id)
if not order_detail: if not order_detail:
logger.error("获取订单详情失败") self.logger.error("获取订单详情失败")
return None return None
# 记录交易 # 记录交易
if order_detail.get("status") == "FILLED": if order_detail.get("status") == "FILLED":
if not self._tool_record_transaction(order_detail): if not self._tool_record_transaction(order_detail):
logger.error("交易记录失败") self.logger.error("交易记录失败")
else: else:
logger.warning( self.logger.warning(
"订单未完成(状态: %s)未被记录到CSV。订单ID: %s", "订单未完成(状态: %s)未被记录到CSV。订单ID: %s",
order_detail.get("status", "UNKNOWN"), order_detail.get("status", "UNKNOWN"),
order_id, order_id,
@@ -400,7 +511,7 @@ class MexcSpotTrade:
return order_detail return order_detail
except Exception as e: except Exception as e:
logger.error("交易执行失败: %s", str(e)) self.logger.error("交易执行失败: %s", str(e))
return None return None
@@ -414,7 +525,7 @@ class TradingConfig:
- _load_config(): 加载JSON配置文件 - _load_config(): 加载JSON配置文件
""" """
def __init__(self, config_file: str = "public/trading_config.json"): def __init__(self, config_file: str = "config/trading_config.json"):
""" """
初始化交易配置 初始化交易配置
@@ -423,6 +534,7 @@ class TradingConfig:
""" """
self.config_file = config_file self.config_file = config_file
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
self.config_data = self._load_config() self.config_data = self._load_config()
def _load_config(self) -> Dict[str, Any]: def _load_config(self) -> Dict[str, Any]:
@@ -430,16 +542,16 @@ class TradingConfig:
try: try:
with open(self.config_file, "r", encoding="utf-8") as f: with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f) config = json.load(f)
logger.info("成功加载配置文件: %s", self.config_file) self.logger.info("成功加载配置文件: %s", self.config_file)
return config return config
except FileNotFoundError: except FileNotFoundError:
logger.error("配置文件不存在: %s", self.config_file) self.logger.error("配置文件不存在: %s", self.config_file)
return {} return {}
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error("配置文件格式错误不是有效的JSON") self.logger.error("配置文件格式错误不是有效的JSON")
return {} return {}
except Exception as e: except Exception as e:
logger.error("加载配置文件时出错: %s", str(e)) self.logger.error("加载配置文件时出错: %s", str(e))
return {} return {}
def get_today_trades(self) -> List[Dict[str, Any]]: def get_today_trades(self) -> List[Dict[str, Any]]:
@@ -478,39 +590,79 @@ class TradingConfig:
return today_trades return today_trades
def git_commit(repo_path: str = ".") -> str:
"""获取Git仓库版本"""
try:
repo = git.Repo(repo_path)
return repo.head.commit.hexsha
except Exception as _:
return None
def main(): def main():
"""主函数""" """主函数"""
# 初始化交易配置 logger = logging.getLogger(f"{__name__}.main")
config = TradingConfig()
# 获取今天需要执行的交易 logger.info("=" * 40)
# 获取主程序Git仓库版本
app_commit = git_commit(".")[:10]
# 确保config目录存在
if not os.path.exists("config"):
logger.error("配置目录 config 不存在")
return
# 获取config, output仓库版本
config_commit = git_commit("config")[:10]
output_commit = git_commit("output")[:10]
logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit, config_commit, output_commit)
# 获取config目录下所有json文件
config_files = list(Path("config").glob("*.json"))
if not config_files:
logger.info("配置目录中没有找到任何JSON文件")
return
logger.info("找到 %d 个配置文件需要处理", len(config_files))
for config_file in config_files:
try:
# 提取交易参数
logger.info("处理配置文件: %s", config_file)
config = TradingConfig(str(config_file))
spot_trader = MexcSpotTrade(
config.config_data.get("api", {}),
config.config_data.get("symbol_mapping", {}),
os.path.basename(config_file).replace(".json", ""),
)
today_trades = config.get_today_trades() today_trades = config.get_today_trades()
if not today_trades: if not today_trades:
logger.info("今天没有需要执行的交易") logger.info("%s - 今天没有需要执行的交易", config_file)
return continue
logger.info("今天有 %d 个交易需要执行", len(today_trades)) logger.info("%s - 今天有 %d 个交易需要执行", config_file, len(today_trades))
# 初始化交易类
spot_trader = MexcSpotTrade()
# 执行每个交易
for trade_config in today_trades: for trade_config in today_trades:
try: try:
# 提取交易参数
symbol = trade_config.get("symbol") symbol = trade_config.get("symbol")
order_type = trade_config.get("order_type") order_type = trade_config.get("order_type")
side = trade_config.get("side") side = trade_config.get("side")
params = trade_config.get("params", {}) params = trade_config.get("params", {})
if not all([symbol, order_type, side]): if not all([symbol, order_type, side]):
logger.error("交易配置缺少必要参数: %s", trade_config) logger.error(
"%s - 交易配置缺少必要参数: %s", config_file, trade_config
)
continue continue
logger.info("执行交易: %s %s %s", symbol, order_type, side) logger.info(
logger.debug("交易参数: %s", params) "%s - 执行交易: %s %s %s", config_file, symbol, order_type, side
)
logger.debug("%s - 交易参数: %s", config_file, params)
# 执行交易 # 执行交易
result = spot_trader.trade( result = spot_trader.trade(
@@ -518,15 +670,19 @@ def main():
) )
if result: if result:
logger.info("交易执行成功: %s", result) logger.info("%s - 交易执行成功: %s", config_file, result)
else: else:
logger.error("交易执行失败") logger.error("%s - 交易执行失败", config_file)
except Exception as e: except Exception as e:
logger.error("执行交易时出错: %s", str(e)) logger.error("%s - 执行交易时出错: %s", config_file, str(e))
continue continue
logger.info("执行完毕") except Exception as e:
logger.error("处理配置文件 %s 时出错: %s", config_file, str(e))
continue
logger.info("所有配置文件处理完毕")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -2,7 +2,6 @@ import requests
import hmac import hmac
import hashlib import hashlib
from urllib.parse import urlencode, quote from urllib.parse import urlencode, quote
import config
# ServerTime、Signature # ServerTime、Signature
class TOOL(object): class TOOL(object):
@@ -42,9 +41,9 @@ class TOOL(object):
# Market Data # Market Data
class mexc_market(TOOL): class mexc_market(TOOL):
def __init__(self): def __init__(self, config):
self.api = '/api/v3' self.api = '/api/v3'
self.hosts = config.mexc_host self.hosts = config["mexc_host"]
self.method = 'GET' self.method = 'GET'
def get_ping(self): def get_ping(self):
@@ -129,11 +128,11 @@ class mexc_market(TOOL):
# Spot Trade # Spot Trade
class mexc_trade(TOOL): class mexc_trade(TOOL):
def __init__(self): def __init__(self, config):
self.api = '/api/v3' self.api = '/api/v3'
self.hosts = config.mexc_host self.hosts = config["mexc_host"]
self.mexc_key = config.api_key self.mexc_key = config["api_key"]
self.mexc_secret = config.secret_key self.mexc_secret = config["secret_key"]
def get_selfSymbols(self): def get_selfSymbols(self):
"""get currency information""" """get currency information"""
@@ -246,11 +245,11 @@ class mexc_trade(TOOL):
# Wallet # Wallet
class mexc_wallet(TOOL): class mexc_wallet(TOOL):
def __init__(self): def __init__(self, config):
self.api = '/api/v3/capital' self.api = '/api/v3/capital'
self.hosts = config.mexc_host self.hosts = config["mexc_host"]
self.mexc_key = config.api_key self.mexc_key = config["api_key"]
self.mexc_secret = config.secret_key self.mexc_secret = config["secret_key"]
def get_coinlist(self): def get_coinlist(self):
"""get currency information""" """get currency information"""
@@ -368,11 +367,11 @@ class mexc_wallet(TOOL):
# Sub-Account # Sub-Account
class mexc_subaccount(TOOL): class mexc_subaccount(TOOL):
def __init__(self): def __init__(self, config):
self.api = '/api/v3' self.api = '/api/v3'
self.hosts = config.mexc_host self.hosts = config["mexc_host"]
self.mexc_key = config.api_key self.mexc_key = config["api_key"]
self.mexc_secret = config.secret_key self.mexc_secret = config["secret_key"]
def post_virtualSubAccount(self, params): def post_virtualSubAccount(self, params):
"""create a sub-account""" """create a sub-account"""
@@ -427,11 +426,11 @@ class mexc_subaccount(TOOL):
# Rebate # Rebate
class mexc_rebate(TOOL): class mexc_rebate(TOOL):
def __init__(self): def __init__(self, config):
self.api = '/api/v3/rebate' self.api = '/api/v3/rebate'
self.hosts = config.mexc_host self.hosts = config["mexc_host"]
self.mexc_key = config.api_key self.mexc_key = config["api_key"]
self.mexc_secret = config.secret_key self.mexc_secret = config["secret_key"]
def get_taxQuery(self, params=None): def get_taxQuery(self, params=None):
"""get the rebate commission record""" """get the rebate commission record"""
@@ -500,11 +499,11 @@ class mexc_rebate(TOOL):
# WebSocket ListenKey # WebSocket ListenKey
class mexc_listenkey(TOOL): class mexc_listenkey(TOOL):
def __init__(self): def __init__(self, config):
self.api = '/api/v3' self.api = '/api/v3'
self.hosts = config.mexc_host self.hosts = config["mexc_host"]
self.mexc_key = config.api_key self.mexc_key = config["api_key"]
self.mexc_secret = config.secret_key self.mexc_secret = config["secret_key"]
def post_listenKey(self): def post_listenKey(self):
""" generate ListenKey """ """ generate ListenKey """

12
requirements.txt Normal file
View File

@@ -0,0 +1,12 @@
certifi
charset-normalizer
gitdb
GitPython
icalendar
idna
python-dateutil
requests
six
smmap
tzdata
urllib3

129
tool_csv_merge.py Normal file
View File

@@ -0,0 +1,129 @@
"""CSV 文件处理模块
该模块用于合并同一证券在同一天的多笔交易记录,并生成汇总后的交易记录。
"""
import csv
import os
from collections import defaultdict
def process_csv(input_file, output_file):
"""处理CSV文件合并相同证券在同一天的交易记录
Args:
input_file (str): 输入CSV文件路径
output_file (str): 输出CSV文件路径
"""
merged_records = defaultdict(
lambda: {
"buy_shares": 0.0,
"buy_amount": 0.0,
"sell_shares": 0.0,
"sell_amount": 0.0,
"order_ids": set(),
"first_record": None,
"time_part": None,
}
)
with open(input_file, mode="r", newline="", encoding="utf-8") as infile:
reader = csv.DictReader(infile)
for row in reader:
datetime_str = row["日期"]
date_part, time_part = datetime_str.split("T")
order_id = row["备注"].split("Order ID: ")[-1].strip()
merge_key = (date_part, row["证券代码"])
record = merged_records[merge_key]
if record["first_record"] is None:
record["first_record"] = row
record["time_part"] = time_part
record["order_ids"].add(order_id)
if row["类型"] == "买入":
record["buy_shares"] += float(row["份额"])
record["buy_amount"] += float(row["净额"])
elif row["类型"] == "卖出":
record["sell_shares"] += float(row["份额"])
record["sell_amount"] += float(row["净额"])
output_rows = []
for key, record in merged_records.items():
date_part, symbol = key
first_row = record["first_record"]
net_shares = record["buy_shares"] - record["sell_shares"]
net_amount = record["buy_amount"] - record["sell_amount"]
if net_shares >= 0:
operation_type = "买入"
display_shares = net_shares
display_amount = net_amount
else:
operation_type = "卖出"
display_shares = -net_shares
display_amount = -net_amount
# 格式化为完整小数形式,不使用科学计数法
formatted_shares = (
f"{display_shares:f}".rstrip("0").rstrip(".")
if "." in f"{display_shares:f}"
else f"{display_shares:f}"
)
formatted_amount = (
f"{display_amount:f}".rstrip("0").rstrip(".")
if "." in f"{display_amount:f}"
else f"{display_amount:f}"
)
merged_row = {
"日期": f"{date_part}T{record['time_part']}",
"类型": operation_type,
"证券代码": symbol,
"份额": formatted_shares,
"净额": formatted_amount,
"现金账户": first_row["现金账户"],
"目标账户": first_row["目标账户"],
"备注": f"MEXC API - Order ID: {', '.join(sorted(record['order_ids']))}",
}
output_rows.append(merged_row)
# 写入输出文件
with open(output_file, mode="w", newline="", encoding="utf-8") as outfile:
fieldnames = [
"日期",
"类型",
"证券代码",
"份额",
"净额",
"现金账户",
"目标账户",
"备注",
]
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(output_rows)
def process_all_csvs(input_dir="output"):
"""处理指定目录下的所有CSV文件
Args:
input_dir (str): 包含CSV文件的目录路径
"""
for filename in os.listdir(input_dir):
if filename.endswith(".csv") and not filename.startswith("merged_"):
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(input_dir, f"merged_{filename}")
process_csv(input_path, output_path)
print(f"处理完成: {filename} -> merged_{filename}")
if __name__ == "__main__":
process_all_csvs()
print("所有文件处理完成")

View File

@@ -13,7 +13,7 @@ logging.basicConfig(
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def load_config(config_path="trading_config.json"): def load_config(config_path="config/trading_config.json"):
"""加载交易配置文件。 """加载交易配置文件。
Args: Args:
@@ -37,12 +37,11 @@ def load_config(config_path="trading_config.json"):
raise raise
def create_event(symbol, comment, date_obj, description): def create_event(summary: str, date_obj, description):
"""创建单个日历事件。 """创建单个日历事件。
Args: Args:
symbol (str): 交易对符号 summary (str): 事件标题
comment (str): 交易备注
date_obj (date): 交易日期 date_obj (date): 交易日期
description (str): 事件描述 description (str): 事件描述
@@ -50,10 +49,13 @@ def create_event(symbol, comment, date_obj, description):
Event: 创建好的事件对象 Event: 创建好的事件对象
""" """
event = Event() event = Event()
event.add("uid", f"{symbol.lower()}-{date_obj.strftime('%Y%m%d')}@trade") event.add(
"uid",
f"{summary.lower().replace('/', '-')}-{date_obj.strftime('%Y%m%d')}@trade",
)
event.add("dtstamp", datetime.now()) event.add("dtstamp", datetime.now())
event.add("dtstart", date_obj) event.add("dtstart", date_obj)
event.add("summary", f"{symbol} 交易 ({comment})") event.add("summary", summary)
event.add("description", description) event.add("description", description)
return event return event
@@ -71,10 +73,8 @@ def generate_ics(config):
cal.add("prodid", "-//Trading Calendar//EN") cal.add("prodid", "-//Trading Calendar//EN")
cal.add("version", "2.0") cal.add("version", "2.0")
symbol_mapping = config.get("symbol_mapping", {})
for trade in config.get("trades", []): for trade in config.get("trades", []):
symbol = symbol_mapping.get(trade["symbol"], trade["symbol"]) symbol = trade["symbol"]
order_type = trade["order_type"] order_type = trade["order_type"]
side = trade["side"] side = trade["side"]
comment = trade["comment"] comment = trade["comment"]
@@ -82,25 +82,29 @@ def generate_ics(config):
# 构建描述 # 构建描述
amount = trade["params"].get("quoteOrderQty") amount = trade["params"].get("quoteOrderQty")
quantity = trade["params"].get("quantity") quantity = trade["params"].get("quantity")
description = f"交易类型: {order_type}\n方向: {side}"
if quantity:
description = f"{description}\n数量: {quantity}"
if amount:
description = f"{description}\n金额: {amount}"
description = f"{description}\n备注: {comment}"
if trade["execute_dates"] == ["*"]: if trade["execute_dates"] == ["*"]:
event = create_event(symbol, comment, datetime.now().date(), description) summary = f"{symbol}/{order_type}/{side}"
if amount:
summary = f"{summary}/{amount}{symbol[-4:]}"
if quantity:
summary = f"{summary}/{quantity}{symbol[:-4]}"
event = create_event(summary, datetime.now().date(), comment)
event.add("rrule", {"freq": "daily"}) event.add("rrule", {"freq": "daily"})
cal.add_component(event) cal.add_component(event)
logger.info("已创建项目:%s 交易 (%s) 每日", symbol, comment) logger.info("已创建项目:每日 %s", summary)
else: else:
for date_str in trade["execute_dates"]: for date_str in trade["execute_dates"]:
try: try:
date_obj = datetime.strptime(date_str, "%Y-%m-%d").date() date_obj = datetime.strptime(date_str, "%Y-%m-%d").date()
event = create_event(symbol, comment, date_obj, description) summary = f"{symbol}/{order_type}/{side}"
if amount:
summary = f"{summary}/{amount}{symbol[-4:]}"
if quantity:
summary = f"{summary}/{quantity}{symbol[:-4]}"
event = create_event(summary, date_obj, comment)
cal.add_component(event) cal.add_component(event)
logger.info("已创建项目:%s 交易 (%s) %s", symbol, comment, date_str) logger.info("已创建项目:%s %s", date_str, summary)
except ValueError as ex: except ValueError as ex:
logger.warning("跳过无效日期 %s: %s", date_str, ex) logger.warning("跳过无效日期 %s: %s", date_str, ex)
@@ -121,13 +125,33 @@ def save_ics(calendar, output_path="public/trading.ics"):
def main(): def main():
"""主执行函数。""" """主执行函数遍历config目录下的所有json文件并生成对应的ics文件"""
try: try:
config = load_config() # 遍历config目录下的所有json文件
for filename in os.listdir("config"):
if filename.endswith(".json"):
config_path = os.path.join("config", filename)
try:
# 加载配置
config = load_config(config_path)
# 生成ics文件名保留原文件名只改扩展名
ics_filename = os.path.splitext(filename)[0] + ".ics"
ics_path = os.path.join("public", ics_filename)
# 生成并保存ics文件
calendar = generate_ics(config) calendar = generate_ics(config)
save_ics(calendar) save_ics(calendar, ics_path)
except Exception as ex: # pylint: disable=broad-except
logger.error("生成失败: %s", ex, exc_info=True) logger.info("成功生成: %s%s", config_path, ics_path)
except Exception as ex:
logger.error("处理文件 %s 失败: %s", config_path, ex, exc_info=True)
continue # 继续处理下一个文件
logger.info("所有文件处理完成")
except Exception as ex:
logger.error("程序执行失败: %s", ex, exc_info=True)
raise raise