Compare commits

..

12 Commits

8 changed files with 458 additions and 133 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
output/
public/
config/
__pycache__/

View File

@@ -1,8 +1,8 @@
# MEXC 交易机器人
# MEXC 定投交易机器人
## 项目简介
MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执行现货交易。它通过 JSON 配置文件管理交易指令,支持多种订单类型,并自动记录交易日志。
MEXC 定投交易机器人是一个自动化交易工具,用于在 MEXC 交易所执行现货定投交易。它通过 JSON 配置文件管理交易指令,支持多种订单类型,并自动记录交易日志。
## 主要功能
@@ -16,7 +16,7 @@ MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执
## 安装与使用
1. 创建配置文件`trading_config.json`
1. 创建配置文件`config/trading_config.json`(可有多个不同名字的`.json`文件,程序会自动遍历所有)
2. 运行程序:
```bash
@@ -31,6 +31,11 @@ MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执
```json
{
"api": {
"mexc_host": "https://api.mexc.com",
"api_key": "mxxxxxxxxxx",
"secret_key": "xxxxxxxxxxxxxxxxx"
},
"symbol_mapping": {
"BTCUSDC": "BTCUSDT"
},
@@ -51,12 +56,14 @@ MEXC 交易机器人是一个自动化交易工具,用于在 MEXC 交易所执
### 配置字段详解
#### 1. 证券代码映射 (`symbol_mapping`)
#### 1. API 配置 (`api`)
#### 2. 证券代码映射 (`symbol_mapping`)
- 类型:数组
- 描述API 代码: CSV 记录代码,如`"BTCUSDC": "BTCUSDT"`代表向 API 请求`BTCUSDC`,但 CSV 中记录证券代码`BTCUSDT`
#### 2. 交易列表 (`trades`)
#### 3. 交易列表 (`trades`)
- 类型:数组
- 描述:包含所有交易指令的列表

View File

@@ -1,3 +0,0 @@
mexc_host = "https://api.mexc.com"
api_key = "mx0vglky5BuzlcK5HQ"
secret_key = "e2a0c4737b4643bbac4ad5f8b26dcce2"

286
main.py
View File

@@ -27,8 +27,10 @@ import logging
import os
import time
import json
from pathlib import Path
from datetime import datetime, date
from typing import Dict, Any, Optional, Tuple, List
import git
import mexc_spot_v3
@@ -43,7 +45,6 @@ logging.basicConfig(
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
class MexcSpotMarket:
@@ -52,12 +53,41 @@ class MexcSpotMarket:
提供获取交易对价格等功能
方法:
- get_exchange_info(symbol): 获取交易对信息
- get_price(symbol): 获取指定交易对的当前价格
"""
def __init__(self):
def __init__(self, config):
"""初始化市场数据查询接口"""
self.market = mexc_spot_v3.mexc_market()
self.market = mexc_spot_v3.mexc_market(config)
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def get_exchange_info(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
获取交易对信息
Args:
symbol: 交易对,如 "BTCUSDT"
Returns:
交易对信息字典或None(如果失败)
"""
params = {"symbol": symbol}
try:
self.logger.info("查询交易对信息: %s", symbol)
exchange_info = self.market.get_exchangeInfo(params)
if not exchange_info or "symbols" not in exchange_info:
self.logger.error("获取交易对信息失败: %s", exchange_info)
return None
self.logger.info("获取交易对信息成功")
return exchange_info
except Exception as e:
self.logger.error("查询交易所信息失败: %s", str(e))
return None
def get_price(self, symbol: str) -> Optional[float]:
"""
@@ -76,20 +106,20 @@ class MexcSpotMarket:
params = {"symbol": symbol}
try:
logger.info("查询交易对价格: %s", symbol)
self.logger.info("查询交易对价格: %s", symbol)
price_data = self.market.get_price(params)
if not price_data or "price" not in price_data:
logger.error("获取价格数据失败: %s", price_data)
self.logger.error("获取价格数据失败: %s", price_data)
return None
price_str = price_data["price"]
price = float(price_str)
logger.info("获取价格成功: %s = %f", symbol, price)
self.logger.info("获取价格成功: %s = %f", symbol, price)
return price
except Exception as e:
logger.error("查询价格失败: %s", str(e))
self.logger.error("查询价格失败: %s", str(e))
return None
@@ -123,12 +153,33 @@ class MexcSpotTrade:
"FILL_OR_KILL": ["quantity", "price"],
}
def __init__(self):
def __init__(self, config, symbol_mapping, config_file_name):
"""初始化交易机器人"""
self.trader = mexc_spot_v3.mexc_trade()
self.market = MexcSpotMarket()
self.csv_file = "output/mexc_spot_trade.csv"
self.symbol_mapping = TradingConfig().config_data.get("symbol_mapping", {})
self.trader = mexc_spot_v3.mexc_trade(config)
self.market = MexcSpotMarket(config)
self.csv_file = f"output/{config_file_name}.csv"
self.symbol_mapping = symbol_mapping
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
def _api_get_balance(self) -> str:
"""
获取账户余额
Returns:
账户余额字典或None(如果失败)
"""
try:
self.logger.info("查询账户余额")
account_info = self.trader.get_account_info()
account_info_balance = account_info.get("balances", [])
balances = ""
for item in account_info_balance:
balances += f"{item['available']} {item['asset']} "
self.logger.info("获取账户余额成功")
return balances
except Exception as e:
self.logger.error("查询账户信息失败: %s", str(e))
return f"ERROR: {str(e)}"
def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]:
"""
@@ -148,12 +199,12 @@ class MexcSpotTrade:
}
try:
logger.info("查询订单状态, 订单ID: %s", order_id)
self.logger.info("查询订单状态, 订单ID: %s", order_id)
order = self.trader.get_order(params)
logger.info("订单状态: %s", order.get("status"))
self.logger.info("订单状态: %s", order.get("status"))
return order
except Exception as e:
logger.error("查询订单失败: %s", str(e))
self.logger.error("查询订单失败: %s", str(e))
return None
def _tool_map_symbol(self, symbol: str) -> str:
@@ -221,6 +272,7 @@ class MexcSpotTrade:
executed_qty = order_data["executedQty"]
cummulative_quote_qty = order_data["cummulativeQuoteQty"]
side = order_data["side"]
balances = self._api_get_balance()
# 确定交易类型显示
trade_type = "买入" if side == "BUY" else "卖出"
@@ -238,6 +290,7 @@ class MexcSpotTrade:
"资金账户",
"CEX",
f"MEXC API - Order ID: {order_id}",
balances
]
# 检查文件是否存在
@@ -262,16 +315,55 @@ class MexcSpotTrade:
"现金账户",
"目标账户",
"备注",
"balances"
]
)
writer.writerow(row)
logger.info("交易记录成功, 订单ID: %s", order_id)
self.logger.info("交易记录成功, 订单ID: %s", order_id)
return True
except Exception as e:
logger.error("记录交易失败: %s", str(e))
self.logger.error("记录交易失败: %s", str(e))
return False
def _tool_calculate_quantity(
self,
quantity: float,
price: float,
base_asset_precision: int,
quote_amount_precision: float,
) -> float:
"""
调整下单数量以满足最小成交额要求。
策略说明:
- 计算出的quantity如果乘以price后小于交易对的最小成交额(quoteAmountPrecision)
则将quantity增加一个最小单位(10^(-base_asset_precision)),确保下单金额满足交易所要求。
Args:
quantity: 初步计算出的下单数量
price: 当前价格
base_asset_precision: 交易对基础资产的小数精度
quote_amount_precision: 交易对最小成交额
Returns:
满足最小成交额要求的下单数量
"""
processed_quantity = round(quantity, base_asset_precision)
if processed_quantity * price < quote_amount_precision:
self.logger.info(
"计算的quantity小于最低要求%f * %f = %f < %f,进行调整",
processed_quantity,
price,
processed_quantity * price,
quote_amount_precision,
)
processed_quantity = round(
quantity + 10 ** (-base_asset_precision), base_asset_precision
)
self.logger.info("调整后的quantity: %f", processed_quantity)
return processed_quantity
def trade(
self, symbol: str, order_type: str, side: str, **kwargs
) -> Optional[Dict[str, Any]]:
@@ -294,20 +386,29 @@ class MexcSpotTrade:
# 基本参数验证
if side not in ["BUY", "SELL"]:
logger.error("无效的交易方向: %s", side)
self.logger.error("无效的交易方向: %s", side)
return None
order_type = order_type.upper()
processed_kwargs = kwargs.copy()
# 记录未经过偏移的价格以供LIMIT订单只有quoteOrderQty没有quantity的情况使用
# 参数有price时直接使用否则就为实时价格
clean_price = processed_kwargs.get("price")
# 处理无price的情况获取实时价格
if order_type in ["LIMIT", "LIMIT_MAKER"] and "price" not in processed_kwargs:
current_price = self.market.get_price(symbol)
if current_price is None:
logger.error("无法获取实时价格,交易取消")
self.logger.error("无法获取实时价格,交易取消")
return None
processed_kwargs["price"] = current_price
logger.info("使用实时价格作为限价: %f", current_price)
clean_price = current_price
# 防止挂单不成交
if side == "BUY":
processed_kwargs["price"] = current_price * 1.01 # 买入加价1%
elif side == "SELL":
processed_kwargs["price"] = current_price * 0.99 # 卖出减价1%
self.logger.info("使用调整1%%后价格作为限价: %f", processed_kwargs["price"])
# 处理LIMIT订单只有quoteOrderQty没有quantity的情况
if (
@@ -316,13 +417,23 @@ class MexcSpotTrade:
and "quantity" not in processed_kwargs
):
try:
exchange_info = self.market.get_exchange_info(symbol)
quote_amount = float(processed_kwargs["quoteOrderQty"])
price = float(processed_kwargs["price"])
quantity = quote_amount / price
processed_kwargs["quantity"] = str(quantity)
logger.info("根据quoteOrderQty计算quantity: %f", quantity)
quantity = quote_amount / clean_price
base_asset_precision = int(
exchange_info["symbols"][0]["baseAssetPrecision"]
)
quote_amount_precision = float(
exchange_info["symbols"][0]["quoteAmountPrecision"]
)
processed_quantity = self._tool_calculate_quantity(
quantity, clean_price, base_asset_precision, quote_amount_precision
)
self.logger.info("根据quoteOrderQty计算quantity: %f", processed_quantity)
processed_kwargs["quantity"] = str(processed_quantity)
processed_kwargs.pop("quoteOrderQty")
except (ValueError, KeyError) as e:
logger.error("计算quantity失败: %s", str(e))
self.logger.error("计算quantity失败: %s", str(e))
return None
# 准备订单参数
@@ -336,46 +447,46 @@ class MexcSpotTrade:
# 验证参数
is_valid, error_msg = self._tool_validate_order_params(order_type, base_params)
if not is_valid:
logger.error("订单参数验证失败: %s", error_msg)
self.logger.error("订单参数验证失败: %s", error_msg)
return None
try:
logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
logger.debug("订单参数: %s", base_params)
self.logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol)
self.logger.debug("订单参数: %s", base_params)
# 测试订单
test_result = self.trader.post_order_test(base_params.copy())
if test_result != {}:
logger.error("订单测试失败,参数有误: %s", test_result)
self.logger.error("订单测试失败,参数有误: %s", test_result)
return None
logger.info("订单参数测试通过,准备正式下单")
self.logger.info("订单参数测试通过,准备正式下单")
# 正式下单
order = self.trader.post_order(base_params.copy())
order_id = order.get("orderId")
if not order_id:
logger.error("下单失败: 未获取到订单ID")
logger.error(base_params.copy())
logger.error(order)
self.logger.error("下单失败: 未获取到订单ID")
self.logger.error(base_params.copy())
self.logger.error(order)
return None
logger.info("订单创建成功, 订单ID: %s", order_id)
self.logger.info("订单创建成功, 订单ID: %s", order_id)
# 查询订单详情
logger.info("等待1秒后查询订单状态...")
self.logger.info("等待1秒后查询订单状态...")
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
if not order_detail:
logger.error("获取订单详情失败")
self.logger.error("获取订单详情失败")
return None
# 如果不是FILLED则重复查询最多10次
retry_count = 0
while order_detail.get("status") != "FILLED" and retry_count < 10:
retry_count += 1
logger.info(
self.logger.info(
"订单未完成(状态: %s)等待1秒后第%s次重试查询...",
order_detail.get("status", "UNKNOWN"),
retry_count,
@@ -383,15 +494,15 @@ class MexcSpotTrade:
time.sleep(1)
order_detail = self._api_get_order(symbol, order_id)
if not order_detail:
logger.error("获取订单详情失败")
self.logger.error("获取订单详情失败")
return None
# 记录交易
if order_detail.get("status") == "FILLED":
if not self._tool_record_transaction(order_detail):
logger.error("交易记录失败")
self.logger.error("交易记录失败")
else:
logger.warning(
self.logger.warning(
"订单未完成(状态: %s)未被记录到CSV。订单ID: %s",
order_detail.get("status", "UNKNOWN"),
order_id,
@@ -400,7 +511,7 @@ class MexcSpotTrade:
return order_detail
except Exception as e:
logger.error("交易执行失败: %s", str(e))
self.logger.error("交易执行失败: %s", str(e))
return None
@@ -414,7 +525,7 @@ class TradingConfig:
- _load_config(): 加载JSON配置文件
"""
def __init__(self, config_file: str = "public/trading_config.json"):
def __init__(self, config_file: str = "config/trading_config.json"):
"""
初始化交易配置
@@ -423,6 +534,7 @@ class TradingConfig:
"""
self.config_file = config_file
self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__)
self.config_data = self._load_config()
def _load_config(self) -> Dict[str, Any]:
@@ -430,16 +542,16 @@ class TradingConfig:
try:
with open(self.config_file, "r", encoding="utf-8") as f:
config = json.load(f)
logger.info("成功加载配置文件: %s", self.config_file)
self.logger.info("成功加载配置文件: %s", self.config_file)
return config
except FileNotFoundError:
logger.error("配置文件不存在: %s", self.config_file)
self.logger.error("配置文件不存在: %s", self.config_file)
return {}
except json.JSONDecodeError:
logger.error("配置文件格式错误不是有效的JSON")
self.logger.error("配置文件格式错误不是有效的JSON")
return {}
except Exception as e:
logger.error("加载配置文件时出错: %s", str(e))
self.logger.error("加载配置文件时出错: %s", str(e))
return {}
def get_today_trades(self) -> List[Dict[str, Any]]:
@@ -478,39 +590,79 @@ class TradingConfig:
return today_trades
def git_commit(repo_path: str = ".") -> str:
"""获取Git仓库版本"""
try:
repo = git.Repo(repo_path)
return repo.head.commit.hexsha
except Exception as _:
return None
def main():
"""主函数"""
# 初始化交易配置
config = TradingConfig()
logger = logging.getLogger(f"{__name__}.main")
# 获取今天需要执行的交易
logger.info("=" * 40)
# 获取主程序Git仓库版本
app_commit = git_commit(".")[:10]
# 确保config目录存在
if not os.path.exists("config"):
logger.error("配置目录 config 不存在")
return
# 获取config, output仓库版本
config_commit = git_commit("config")[:10]
output_commit = git_commit("output")[:10]
logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit, config_commit, output_commit)
# 获取config目录下所有json文件
config_files = list(Path("config").glob("*.json"))
if not config_files:
logger.info("配置目录中没有找到任何JSON文件")
return
logger.info("找到 %d 个配置文件需要处理", len(config_files))
for config_file in config_files:
try:
# 提取交易参数
logger.info("处理配置文件: %s", config_file)
config = TradingConfig(str(config_file))
spot_trader = MexcSpotTrade(
config.config_data.get("api", {}),
config.config_data.get("symbol_mapping", {}),
os.path.basename(config_file).replace(".json", ""),
)
today_trades = config.get_today_trades()
if not today_trades:
logger.info("今天没有需要执行的交易")
return
logger.info("%s - 今天没有需要执行的交易", config_file)
continue
logger.info("今天有 %d 个交易需要执行", len(today_trades))
logger.info("%s - 今天有 %d 个交易需要执行", config_file, len(today_trades))
# 初始化交易类
spot_trader = MexcSpotTrade()
# 执行每个交易
for trade_config in today_trades:
try:
# 提取交易参数
symbol = trade_config.get("symbol")
order_type = trade_config.get("order_type")
side = trade_config.get("side")
params = trade_config.get("params", {})
if not all([symbol, order_type, side]):
logger.error("交易配置缺少必要参数: %s", trade_config)
logger.error(
"%s - 交易配置缺少必要参数: %s", config_file, trade_config
)
continue
logger.info("执行交易: %s %s %s", symbol, order_type, side)
logger.debug("交易参数: %s", params)
logger.info(
"%s - 执行交易: %s %s %s", config_file, symbol, order_type, side
)
logger.debug("%s - 交易参数: %s", config_file, params)
# 执行交易
result = spot_trader.trade(
@@ -518,15 +670,19 @@ def main():
)
if result:
logger.info("交易执行成功: %s", result)
logger.info("%s - 交易执行成功: %s", config_file, result)
else:
logger.error("交易执行失败")
logger.error("%s - 交易执行失败", config_file)
except Exception as e:
logger.error("执行交易时出错: %s", str(e))
logger.error("%s - 执行交易时出错: %s", config_file, str(e))
continue
logger.info("执行完毕")
except Exception as e:
logger.error("处理配置文件 %s 时出错: %s", config_file, str(e))
continue
logger.info("所有配置文件处理完毕")
if __name__ == "__main__":

View File

@@ -2,7 +2,6 @@ import requests
import hmac
import hashlib
from urllib.parse import urlencode, quote
import config
# ServerTime、Signature
class TOOL(object):
@@ -42,9 +41,9 @@ class TOOL(object):
# Market Data
class mexc_market(TOOL):
def __init__(self):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config.mexc_host
self.hosts = config["mexc_host"]
self.method = 'GET'
def get_ping(self):
@@ -129,11 +128,11 @@ class mexc_market(TOOL):
# Spot Trade
class mexc_trade(TOOL):
def __init__(self):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config.mexc_host
self.mexc_key = config.api_key
self.mexc_secret = config.secret_key
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_selfSymbols(self):
"""get currency information"""
@@ -246,11 +245,11 @@ class mexc_trade(TOOL):
# Wallet
class mexc_wallet(TOOL):
def __init__(self):
def __init__(self, config):
self.api = '/api/v3/capital'
self.hosts = config.mexc_host
self.mexc_key = config.api_key
self.mexc_secret = config.secret_key
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_coinlist(self):
"""get currency information"""
@@ -368,11 +367,11 @@ class mexc_wallet(TOOL):
# Sub-Account
class mexc_subaccount(TOOL):
def __init__(self):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config.mexc_host
self.mexc_key = config.api_key
self.mexc_secret = config.secret_key
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def post_virtualSubAccount(self, params):
"""create a sub-account"""
@@ -427,11 +426,11 @@ class mexc_subaccount(TOOL):
# Rebate
class mexc_rebate(TOOL):
def __init__(self):
def __init__(self, config):
self.api = '/api/v3/rebate'
self.hosts = config.mexc_host
self.mexc_key = config.api_key
self.mexc_secret = config.secret_key
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def get_taxQuery(self, params=None):
"""get the rebate commission record"""
@@ -500,11 +499,11 @@ class mexc_rebate(TOOL):
# WebSocket ListenKey
class mexc_listenkey(TOOL):
def __init__(self):
def __init__(self, config):
self.api = '/api/v3'
self.hosts = config.mexc_host
self.mexc_key = config.api_key
self.mexc_secret = config.secret_key
self.hosts = config["mexc_host"]
self.mexc_key = config["api_key"]
self.mexc_secret = config["secret_key"]
def post_listenKey(self):
""" generate ListenKey """

12
requirements.txt Normal file
View File

@@ -0,0 +1,12 @@
certifi
charset-normalizer
gitdb
GitPython
icalendar
idna
python-dateutil
requests
six
smmap
tzdata
urllib3

129
tool_csv_merge.py Normal file
View File

@@ -0,0 +1,129 @@
"""CSV 文件处理模块
该模块用于合并同一证券在同一天的多笔交易记录,并生成汇总后的交易记录。
"""
import csv
import os
from collections import defaultdict
def process_csv(input_file, output_file):
"""处理CSV文件合并相同证券在同一天的交易记录
Args:
input_file (str): 输入CSV文件路径
output_file (str): 输出CSV文件路径
"""
merged_records = defaultdict(
lambda: {
"buy_shares": 0.0,
"buy_amount": 0.0,
"sell_shares": 0.0,
"sell_amount": 0.0,
"order_ids": set(),
"first_record": None,
"time_part": None,
}
)
with open(input_file, mode="r", newline="", encoding="utf-8") as infile:
reader = csv.DictReader(infile)
for row in reader:
datetime_str = row["日期"]
date_part, time_part = datetime_str.split("T")
order_id = row["备注"].split("Order ID: ")[-1].strip()
merge_key = (date_part, row["证券代码"])
record = merged_records[merge_key]
if record["first_record"] is None:
record["first_record"] = row
record["time_part"] = time_part
record["order_ids"].add(order_id)
if row["类型"] == "买入":
record["buy_shares"] += float(row["份额"])
record["buy_amount"] += float(row["净额"])
elif row["类型"] == "卖出":
record["sell_shares"] += float(row["份额"])
record["sell_amount"] += float(row["净额"])
output_rows = []
for key, record in merged_records.items():
date_part, symbol = key
first_row = record["first_record"]
net_shares = record["buy_shares"] - record["sell_shares"]
net_amount = record["buy_amount"] - record["sell_amount"]
if net_shares >= 0:
operation_type = "买入"
display_shares = net_shares
display_amount = net_amount
else:
operation_type = "卖出"
display_shares = -net_shares
display_amount = -net_amount
# 格式化为完整小数形式,不使用科学计数法
formatted_shares = (
f"{display_shares:f}".rstrip("0").rstrip(".")
if "." in f"{display_shares:f}"
else f"{display_shares:f}"
)
formatted_amount = (
f"{display_amount:f}".rstrip("0").rstrip(".")
if "." in f"{display_amount:f}"
else f"{display_amount:f}"
)
merged_row = {
"日期": f"{date_part}T{record['time_part']}",
"类型": operation_type,
"证券代码": symbol,
"份额": formatted_shares,
"净额": formatted_amount,
"现金账户": first_row["现金账户"],
"目标账户": first_row["目标账户"],
"备注": f"MEXC API - Order ID: {', '.join(sorted(record['order_ids']))}",
}
output_rows.append(merged_row)
# 写入输出文件
with open(output_file, mode="w", newline="", encoding="utf-8") as outfile:
fieldnames = [
"日期",
"类型",
"证券代码",
"份额",
"净额",
"现金账户",
"目标账户",
"备注",
]
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(output_rows)
def process_all_csvs(input_dir="output"):
"""处理指定目录下的所有CSV文件
Args:
input_dir (str): 包含CSV文件的目录路径
"""
for filename in os.listdir(input_dir):
if filename.endswith(".csv") and not filename.startswith("merged_"):
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(input_dir, f"merged_{filename}")
process_csv(input_path, output_path)
print(f"处理完成: {filename} -> merged_{filename}")
if __name__ == "__main__":
process_all_csvs()
print("所有文件处理完成")

View File

@@ -13,7 +13,7 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
def load_config(config_path="trading_config.json"):
def load_config(config_path="config/trading_config.json"):
"""加载交易配置文件。
Args:
@@ -37,12 +37,11 @@ def load_config(config_path="trading_config.json"):
raise
def create_event(symbol, comment, date_obj, description):
def create_event(summary: str, date_obj, description):
"""创建单个日历事件。
Args:
symbol (str): 交易对符号
comment (str): 交易备注
summary (str): 事件标题
date_obj (date): 交易日期
description (str): 事件描述
@@ -50,10 +49,13 @@ def create_event(symbol, comment, date_obj, description):
Event: 创建好的事件对象
"""
event = Event()
event.add("uid", f"{symbol.lower()}-{date_obj.strftime('%Y%m%d')}@trade")
event.add(
"uid",
f"{summary.lower().replace('/', '-')}-{date_obj.strftime('%Y%m%d')}@trade",
)
event.add("dtstamp", datetime.now())
event.add("dtstart", date_obj)
event.add("summary", f"{symbol} 交易 ({comment})")
event.add("summary", summary)
event.add("description", description)
return event
@@ -71,10 +73,8 @@ def generate_ics(config):
cal.add("prodid", "-//Trading Calendar//EN")
cal.add("version", "2.0")
symbol_mapping = config.get("symbol_mapping", {})
for trade in config.get("trades", []):
symbol = symbol_mapping.get(trade["symbol"], trade["symbol"])
symbol = trade["symbol"]
order_type = trade["order_type"]
side = trade["side"]
comment = trade["comment"]
@@ -82,25 +82,29 @@ def generate_ics(config):
# 构建描述
amount = trade["params"].get("quoteOrderQty")
quantity = trade["params"].get("quantity")
description = f"交易类型: {order_type}\n方向: {side}"
if quantity:
description = f"{description}\n数量: {quantity}"
if amount:
description = f"{description}\n金额: {amount}"
description = f"{description}\n备注: {comment}"
if trade["execute_dates"] == ["*"]:
event = create_event(symbol, comment, datetime.now().date(), description)
summary = f"{symbol}/{order_type}/{side}"
if amount:
summary = f"{summary}/{amount}{symbol[-4:]}"
if quantity:
summary = f"{summary}/{quantity}{symbol[:-4]}"
event = create_event(summary, datetime.now().date(), comment)
event.add("rrule", {"freq": "daily"})
cal.add_component(event)
logger.info("已创建项目:%s 交易 (%s) 每日", symbol, comment)
logger.info("已创建项目:每日 %s", summary)
else:
for date_str in trade["execute_dates"]:
try:
date_obj = datetime.strptime(date_str, "%Y-%m-%d").date()
event = create_event(symbol, comment, date_obj, description)
summary = f"{symbol}/{order_type}/{side}"
if amount:
summary = f"{summary}/{amount}{symbol[-4:]}"
if quantity:
summary = f"{summary}/{quantity}{symbol[:-4]}"
event = create_event(summary, date_obj, comment)
cal.add_component(event)
logger.info("已创建项目:%s 交易 (%s) %s", symbol, comment, date_str)
logger.info("已创建项目:%s %s", date_str, summary)
except ValueError as ex:
logger.warning("跳过无效日期 %s: %s", date_str, ex)
@@ -121,13 +125,33 @@ def save_ics(calendar, output_path="public/trading.ics"):
def main():
"""主执行函数。"""
"""主执行函数遍历config目录下的所有json文件并生成对应的ics文件"""
try:
config = load_config()
# 遍历config目录下的所有json文件
for filename in os.listdir("config"):
if filename.endswith(".json"):
config_path = os.path.join("config", filename)
try:
# 加载配置
config = load_config(config_path)
# 生成ics文件名保留原文件名只改扩展名
ics_filename = os.path.splitext(filename)[0] + ".ics"
ics_path = os.path.join("public", ics_filename)
# 生成并保存ics文件
calendar = generate_ics(config)
save_ics(calendar)
except Exception as ex: # pylint: disable=broad-except
logger.error("生成失败: %s", ex, exc_info=True)
save_ics(calendar, ics_path)
logger.info("成功生成: %s%s", config_path, ics_path)
except Exception as ex:
logger.error("处理文件 %s 失败: %s", config_path, ex, exc_info=True)
continue # 继续处理下一个文件
logger.info("所有文件处理完成")
except Exception as ex:
logger.error("程序执行失败: %s", ex, exc_info=True)
raise