#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 定投交易机器人 功能: 1. 从JSON配置文件读取交易指令 2. 支持按日期执行多个交易(*表示每天执行) 3. 支持多种订单类型 (limit, market) 4. 当无price时自动获取实时价格作为限价 5. limit订单支持只提供quoteOrderQty自动计算quantity 6. 证券代码映射功能 7. 完整的交易记录和日志 使用示例: python main.py """ import csv import logging import os import time import json from pathlib import Path from datetime import datetime, date from typing import Dict, Any, Optional, Tuple, List import git import ccxt os.makedirs("output", exist_ok=True) # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("output/trading_bot.log", encoding="utf-8"), logging.StreamHandler(), ], ) class BotSpotMarket: """市场数据查询类""" def __init__(self, exchange: ccxt.Exchange): self.exchange = exchange self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__) try: self.exchange.load_markets() self.logger.info("加载市场信息成功") except Exception as e: self.logger.error("加载市场信息失败: %s", str(e)) def get_exchange_info(self, symbol: str) -> Optional[Dict[str, Any]]: """获取交易对信息,返回格式与原始代码兼容""" try: self.logger.info("查询交易对信息: %s", symbol) if symbol not in self.exchange.markets: self.logger.warning("市场信息中未找到 %s,尝试重新加载", symbol) self.exchange.load_markets() market = self.exchange.markets.get(symbol) if not market: self.logger.error("交易对 %s 不存在", symbol) return None exchange_info = { "symbols": [ { "baseAssetPrecision": market["precision"]["amount"], "quoteAmountPrecision": str(market["limits"]["cost"]["min"] or 0), } ] } self.logger.info("获取交易对信息成功: %s", symbol) return exchange_info except Exception as e: self.logger.error("查询交易所信息失败: %s", str(e)) return None def get_price(self, symbol: str) -> Optional[float]: """获取指定交易对的当前价格""" try: self.logger.info("查询交易对价格: %s", symbol) ticker = self.exchange.fetch_ticker(symbol) if not ticker or "last" not in ticker: self.logger.error("获取价格数据失败: %s", ticker) return None price = float(ticker["last"]) self.logger.info("获取价格成功: %s = %f", symbol, price) return price except Exception as e: self.logger.error("查询价格失败: %s", str(e)) return None class BotSpotTrade: """现货交易类""" ORDER_TYPE_REQUIREMENTS = { "limit": ["quantity", "price", "quoteOrderQty"], "market": ["quantity", "quoteOrderQty"] } def __init__(self, exchange: ccxt.Exchange, symbol_mapping: Dict[str, str], config_file_name: str): self.exchange = exchange self.market = BotSpotMarket(exchange) self.csv_file = f"output/{config_file_name}.csv" self.symbol_mapping = symbol_mapping self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__) def _api_get_balance(self) -> str: try: self.logger.info("查询账户余额") balance = self.exchange.fetch_balance() free = balance.get("free", {}) balances = "" for asset, amount in free.items(): if amount and amount > 0: balances += f"{amount} {asset} " self.logger.info("获取账户余额成功") return balances except Exception as e: self.logger.error("查询账户信息失败: %s", str(e)) return f"ERROR: {str(e)}" def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]: try: self.logger.info("查询订单状态, 订单ID: %s", order_id) order = self.exchange.fetch_order(order_id, symbol) self.logger.info("订单状态: %s", order.get("status")) return order except Exception as e: self.logger.error("查询订单失败: %s", str(e)) return None def _tool_map_symbol(self, symbol: str) -> str: return self.symbol_mapping.get(symbol, symbol) def _tool_validate_order_params(self, order_type: str, params: Dict[str, Any]) -> Tuple[bool, str]: required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, []) if not required_params: return False, f"未知的订单类型: {order_type}" if order_type == "limit": if "price" not in params: return False, "limit订单需要price参数" if "quantity" not in params and "quoteOrderQty" not in params: return False, "limit订单需要quantity或quoteOrderQty参数" return True, "" if order_type == "market": if "quantity" not in params and "quoteOrderQty" not in params: return False, "market订单需要quantity或quoteOrderQty参数" return True, "" missing_params = [p for p in required_params if p not in params] if missing_params: return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}" return True, "" def _tool_sci_to_decimal(self, num, precision = 30): if isinstance(num, str): num = float(num) result = f"{num:.{precision}f}" result = result.rstrip('0').rstrip('.') return result def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool: try: original_symbol = order_data["symbol"] mapped_symbol = self._tool_map_symbol(original_symbol) order_id = order_data["id"] executed_qty = self._tool_sci_to_decimal(order_data.get("filled", 0.0)) cummulative_quote_qty = self._tool_sci_to_decimal(order_data.get("cost", 0.0)) side = order_data["side"] balances = self._api_get_balance() trade_type = "买入" if side == "buy" else "卖出" timestamp = datetime.fromtimestamp(order_data["timestamp"] / 1000).strftime("%Y-%m-%dT%H:%M") row = [ timestamp, trade_type, mapped_symbol, executed_qty, cummulative_quote_qty, "资金账户", "CEX", f"DCA Order ID: {order_id}", balances, ] file_exists = False try: with open(self.csv_file, "r", encoding="utf-8") as f: file_exists = True except FileNotFoundError: pass with open(self.csv_file, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if not file_exists: writer.writerow([ "日期", "类型", "证券代码", "份额", "净额", "现金账户", "目标账户", "备注", "balances", ]) writer.writerow(row) self.logger.info("交易记录成功, 订单ID: %s", order_id) return True except Exception as e: self.logger.error("记录交易失败: %s", str(e)) return False def trade(self, symbol: str, order_type: str, side: str, **kwargs) -> Optional[Dict[str, Any]]: if side not in ["buy", "sell"]: self.logger.error("无效的交易方向: %s", side) return None processed_kwargs = kwargs.copy() clean_price = processed_kwargs.get("price") if order_type == "limit" and "price" not in processed_kwargs: current_price = self.market.get_price(symbol) if current_price is None: self.logger.error("无法获取实时价格,交易取消") return None clean_price = current_price # 防止挂单不成交 if side == "buy": processed_kwargs["price"] = current_price * 1.001 # 买入加价0.1% elif side == "sell": processed_kwargs["price"] = current_price * 0.999 # 卖出减价0.1% self.logger.info("使用调整0.1%%后价格作为限价: %f", processed_kwargs["price"]) if ( order_type == "limit" and "quoteOrderQty" in processed_kwargs and "quantity" not in processed_kwargs ): try: exchange_info = self.market.get_exchange_info(symbol) if not exchange_info: return None quote_amount = float(processed_kwargs["quoteOrderQty"]) if clean_price is None: self.logger.error("无法获取价格来计算数量") return None price_for_calc = float(clean_price) quantity = quote_amount / price_for_calc self.logger.info("根据quoteOrderQty计算quantity: %f", quantity) processed_kwargs["quantity"] = str(quantity) processed_kwargs.pop("quoteOrderQty") except (ValueError, KeyError, TypeError) as e: self.logger.error("计算quantity失败: %s", str(e)) return None amount = processed_kwargs.get("quantity") price = processed_kwargs.get("price") if amount is not None: amount = float(amount) if price is not None: price = float(price) params = {} if order_type == "market" and "quoteOrderQty" in processed_kwargs: current_price = self.market.get_price(symbol) if current_price is None: self.logger.error("无法获取实时价格,交易取消") return None clean_price = current_price amount = float(processed_kwargs["quoteOrderQty"]) / clean_price is_valid, error_msg = self._tool_validate_order_params(order_type, processed_kwargs) if not is_valid: self.logger.error("订单参数验证失败: %s", error_msg) return None try: self.logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol) self.logger.info("订单参数: symbol=%s, type=%s, side=%s, amount=%s, price=%s, params=%s", symbol, order_type, side, amount, price, params) order = self.exchange.create_order( symbol=symbol, type=order_type, side=side, amount=amount, price=price, params=params, ) order_id = order.get("id") if not order_id: self.logger.error("下单失败: 未获取到订单ID") self.logger.error(order) return None self.logger.info("订单创建成功, 订单ID: %s", order_id) self.logger.info("等待1秒后查询订单状态...") time.sleep(1) order_detail = self._api_get_order(symbol, order_id) if not order_detail: self.logger.error("获取订单详情失败") return None retry_count = 0 filled_statuses = ("closed", "filled") while order_detail.get("status", "").lower() not in filled_statuses and retry_count < 10: retry_count += 1 self.logger.info( "订单未完成(状态: %s),等待1秒后第%s次重试查询...", order_detail.get("status", "UNKNOWN"), retry_count, ) time.sleep(1) order_detail = self._api_get_order(symbol, order_id) if not order_detail: self.logger.error("获取订单详情失败") return None if order_detail.get("status", "").lower() in filled_statuses: if not self._tool_record_transaction(order_detail): self.logger.error("交易记录失败") else: self.logger.warning( "订单未完成(状态: %s),未被记录到CSV。订单ID: %s", order_detail.get("status", "UNKNOWN"), order_id, ) return order_detail except Exception as e: self.logger.error("交易执行失败: %s", str(e)) return None class TradingConfig: """交易配置管理类""" def __init__(self, config_file: str = "config/trading_config.json"): self.config_file = config_file self.logger = logging.getLogger(self.__class__.__module__ + '.' + self.__class__.__name__) self.config_data = self._load_config() def _load_config(self) -> Dict[str, Any]: try: with open(self.config_file, "r", encoding="utf-8") as f: config = json.load(f) self.logger.info("成功加载配置文件: %s", self.config_file) return config except FileNotFoundError: self.logger.error("配置文件不存在: %s", self.config_file) return {} except json.JSONDecodeError: self.logger.error("配置文件格式错误,不是有效的JSON") return {} except Exception as e: self.logger.error("加载配置文件时出错: %s", str(e)) return {} def get_today_trades(self) -> List[Dict[str, Any]]: if not self.config_data or "trades" not in self.config_data: return [] today = date.today().isoformat() today_trades = [] for trade in self.config_data["trades"]: if "execute_dates" not in trade: continue if "*" in trade["execute_dates"]: today_trades.append(trade) continue if today in trade["execute_dates"]: today_trades.append(trade) return today_trades def build_ccxt_exchange(api_config: Dict[str, Any], exchange_id: str) -> ccxt.Exchange: """ 根据配置文件和交易所ID构建 ccxt 交易所实例 :param api_config: 包含 api_key, api_secret 等字段的字典 :param exchange_id: ccxt 交易所标识符(小写),如 "mexc", "binance", "bybit" """ api_key = api_config.get("api_key") secret = api_config.get("api_secret") exchange_class = getattr(ccxt, exchange_id, None) if exchange_class is None: raise ValueError(f"不支持的交易所ID: {exchange_id},请检查配置文件中的 'exchange' 字段") exchange_params = { "apiKey": api_key, "secret": secret, "enableRatelimit": True, } if "password" in api_config: exchange_params["password"] = api_config["password"] return exchange_class(exchange_params) def git_commit(repo_path: str = ".") -> Optional[str]: try: repo = git.Repo(repo_path) return repo.head.commit.hexsha except Exception: return None def main(): logger = logging.getLogger(f"{__name__}.main") logger.info("=" * 40) app_commit = git_commit(".") app_commit_short = app_commit[:10] if app_commit else "unknown" if not os.path.exists("config"): logger.error("配置目录 config 不存在") return config_commit = git_commit("config") config_commit_short = config_commit[:10] if config_commit else "unknown" output_commit = git_commit("output") output_commit_short = output_commit[:10] if output_commit else "unknown" logger.info("主程序 %s, 配置 %s, 输出 %s", app_commit_short, config_commit_short, output_commit_short) config_files = list(Path("config").glob("*.json")) if not config_files: logger.info("配置目录中没有找到任何JSON文件") return logger.info("找到 %d 个配置文件需要处理", len(config_files)) for config_file in config_files: try: logger.info("处理配置文件: %s", config_file) config = TradingConfig(str(config_file)) exchange_id = config.config_data.get("exchange") logger.info("使用交易所: %s", exchange_id) api_conf = config.config_data.get("api", {}) exchange = build_ccxt_exchange(api_conf, exchange_id) spot_trader = BotSpotTrade( exchange=exchange, symbol_mapping=config.config_data.get("symbol_mapping", {}), config_file_name=os.path.basename(config_file).replace(".json", ""), ) today_trades = config.get_today_trades() if not today_trades: logger.info("%s - 今天没有需要执行的交易", config_file) continue logger.info("%s - 今天有 %d 个交易需要执行", config_file, len(today_trades)) for trade_config in today_trades: try: symbol = trade_config.get("symbol") order_type = trade_config.get("order_type") side = trade_config.get("side") params = trade_config.get("params", {}) if not all([symbol, order_type, side]): logger.error("%s - 交易配置缺少必要参数: %s", config_file, trade_config) continue logger.info("%s - 执行交易: %s %s %s", config_file, symbol, order_type, side) result = spot_trader.trade( symbol=symbol, order_type=order_type, side=side, **params, ) if result: logger.info("%s - 交易执行成功: %s", config_file, result) else: logger.error("%s - 交易执行失败", config_file) except Exception as e: logger.error("%s - 执行交易时出错: %s", config_file, str(e)) continue except Exception as e: logger.error("处理配置文件 %s 时出错: %s", config_file, str(e)) continue logger.info("所有配置文件处理完毕") if __name__ == "__main__": main()