Files
mexc-spot-grid-bot/main.py

576 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import csv
from datetime import datetime
import time
import logging
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass
import mexc_spot_v3
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("output/mexc_spot_grid_bot.log", encoding="utf-8"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
@dataclass
class Order:
"""订单数据结构"""
order_id: str # 订单ID
price: float # 订单价格
quantity: float # 订单数量
side: str # 买卖方向: BUY/SELL
status: str # 订单状态: NEW/FILLED/CANCELED
filled_time: Optional[float] = None # 成交时间戳
class GridTradingBot:
"""网格交易机器人核心类"""
def __init__(self, conf: Dict):
"""初始化网格交易机器人"""
self.config = conf
self.symbol = conf["symbol"] # 交易对符号
self.csv_symbol = conf["csv_symbol"] # CSV记录使用的交易对符号
self.csv_file = conf["csv_file"] # 交易记录文件路径
self.grid_percentage = conf["grid_percentage"] # 网格间距百分比
self.grid_count = conf["grid_count"] # 单边网格数量
self.order_amount = conf["order_amount"] # 每单交易数量
self.min_order_value = conf["min_order_value"] # 最小订单价值
self.reserve_base = conf["reserve_base"] # 保留的基础货币数量
self.reserve_quote = conf["reserve_quote"] # 保留的报价货币数量
# 运行时状态
self.active_orders: Dict[str, Order] = {} # 活跃订单字典
self.current_buy_levels = 0 # 当前买单网格层级
self.current_sell_levels = 0 # 当前卖单网格层级
self.api_trade = mexc_spot_v3.mexc_trade() # 交易API
self.api_market = mexc_spot_v3.mexc_market() # 市场API
self.running = False # 运行状态标志
logger.info("网格交易机器人初始化完成,交易对: %s", self.symbol)
def record_transaction(self, order_response: Dict[str, Any]) -> bool:
"""记录交易到CSV文件"""
try:
trade_type = "买入" if order_response["side"] == "BUY" else "卖出"
timestamp = datetime.fromtimestamp(
order_response["updateTime"] / 1000
).strftime("%Y-%m-%dT%H:%M")
# 构建CSV行数据
row = [
timestamp,
trade_type,
self.csv_symbol,
order_response["executedQty"],
order_response["cummulativeQuoteQty"],
"资金账户",
"CEX",
f"MEXC API - Order ID: {order_response['orderId']}",
]
# 检查文件是否存在
try:
with open(self.csv_file, "r", encoding="utf-8") as f:
file_exists = True
except FileNotFoundError:
file_exists = False
# 写入CSV文件
with open(self.csv_file, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(
[
"日期",
"类型",
"证券代码",
"份额",
"净额",
"现金账户",
"目标账户",
"备注",
]
)
writer.writerow(row)
logger.info(
"交易记录已保存: %s %s %s",
trade_type,
self.csv_symbol,
order_response["orderId"],
)
return True
except Exception as e:
logger.error("交易记录保存失败: %s", str(e))
return False
def api_get_price(self) -> float:
"""获取当前市场价格"""
try:
ticker = self.api_market.get_price({"symbol": self.symbol})
price = float(ticker["price"])
logger.debug("当前市场价格: %f", price)
return price
except Exception as e:
logger.warning("获取价格失败,重试中... 错误: %s", str(e))
return self.api_get_price()
def api_get_balances(self) -> Tuple[float, float]:
"""获取基础货币和报价货币可用余额"""
base_currency = self.symbol[:-4] # 提取基础货币
quote_currency = self.symbol[-4:] # 提取报价货币
base_available, quote_available = 0.0, 0.0
try:
balances = self.api_trade.get_account_info()
for balance in balances.get("balances", []):
if balance["asset"] == base_currency:
base_available = float(balance["free"])
elif balance["asset"] == quote_currency:
quote_available = float(balance["free"])
logger.debug(
"当前余额 - %s: %f, %s: %f",
base_currency,
base_available,
quote_currency,
quote_available,
)
return base_available, quote_available
except Exception as e:
logger.warning("获取余额失败,重试中... 错误: %s", str(e))
return self.api_get_balances()
def calculate_order_value(self, price: float) -> float:
"""计算订单价值(价格*数量)"""
return price * self.order_amount
def is_order_value_valid(self, price: float) -> bool:
"""检查订单价值是否满足最小要求"""
return self.calculate_order_value(price) >= self.min_order_value
def calculate_grid_price(self, base_price: float, level: int, side: str) -> float:
"""计算网格价格"""
if side == "BUY":
return base_price / (1 + self.grid_percentage) ** level
elif side == "SELL":
return base_price * (1 + self.grid_percentage) ** level
else:
raise ValueError(f"无效的交易方向: {side}")
def api_place_order(self, price: float, side: str) -> Optional[str]:
"""下订单"""
if not self.is_order_value_valid(price):
logger.warning(
"订单价值 %f 低于最小值 %f",
price * self.order_amount,
self.min_order_value,
)
return None
try:
order_detail = self.api_trade.post_order(
{
"symbol": self.symbol,
"side": side,
"type": "LIMIT",
"price": price,
"quantity": self.order_amount,
}
)
if order_id := order_detail.get("orderId"):
logger.info(
"成功下单 %s - 价格: %f 数量: %f 订单ID: %s",
side,
price,
self.order_amount,
order_id,
)
return order_id
else:
logger.error("下单失败: %r", order_detail)
return None
except Exception as e:
logger.warning("下单异常,重试中... 错误: %s", str(e))
return self.api_place_order(price, side)
def api_cancel_order(self, order_id: str):
"""取消指定订单"""
try:
self.api_trade.delete_order({"symbol": self.symbol, "orderId": order_id})
logger.info("订单已取消: %s", order_id)
except Exception as e:
logger.warning(
"取消订单失败,重试中... 订单ID: %s 错误: %s", order_id, str(e)
)
self.api_cancel_order(order_id)
def api_cancel_all_orders(self):
"""取消所有活跃订单"""
try:
self.api_trade.delete_openorders({"symbol": self.symbol})
self.active_orders.clear()
self.current_buy_levels = 0
self.current_sell_levels = 0
logger.info("已取消所有订单")
except Exception as e:
logger.warning("取消所有订单失败,重试中... 错误: %s", str(e))
self.api_cancel_all_orders()
def api_update_order_statuses(self):
"""更新所有活跃订单状态"""
for side in ["BUY", "SELL"]:
# 按价格排序订单(买单从高到低,卖单从低到高)
orders = sorted(
[o for o in self.active_orders.values() if o.side == side],
key=lambda x: x.price,
reverse=(side == "BUY"),
)
for order in orders:
try:
order_info = self.api_trade.get_order(
{"symbol": self.symbol, "orderId": order.order_id}
)
# 更新订单状态
old_status = order.status
order.status = order_info["status"]
if old_status != order.status:
logger.info(
"订单状态更新: %s %s -> %s",
order.order_id,
old_status,
order.status,
)
# 处理已成交订单
if order.status == "FILLED":
order.filled_time = time.time()
self.record_transaction(order_info)
logger.info(
"订单已成交: %s 价格: %f 数量: %f",
order.order_id,
order.price,
order.quantity,
)
# 移除已取消订单
if order.status == "CANCELED":
self.active_orders.pop(order.order_id, None)
except Exception as e:
logger.warning(
"获取订单状态失败: %s 错误: %s", order.order_id, str(e)
)
def get_active_orders_by_side(self, side: str) -> List[Order]:
"""获取指定方向的所有活跃订单"""
return [o for o in self.active_orders.values() if o.side == side]
def get_extreme_prices(self) -> Tuple[Optional[float], Optional[float]]:
"""获取当前最极端的买单和卖单价格"""
buy_orders = self.get_active_orders_by_side("BUY")
sell_orders = self.get_active_orders_by_side("SELL")
lowest_buy = min(o.price for o in buy_orders) if buy_orders else None
highest_sell = max(o.price for o in sell_orders) if sell_orders else None
return lowest_buy, highest_sell
def initialize_grid(self):
"""初始化网格"""
try:
market_price = self.api_get_price()
logger.info("开始初始化网格,当前价格: %f", market_price)
# 计算初始买卖价格
buy_price = self.calculate_grid_price(market_price, 1, "BUY")
sell_price = self.calculate_grid_price(market_price, 1, "SELL")
# 获取当前余额
base_balance, quote_balance = self.api_get_balances()
# 下单买1
if (
quote_balance - self.order_amount * buy_price >= self.reserve_quote
and self.is_order_value_valid(buy_price)
):
if order_id := self.api_place_order(buy_price, "BUY"):
self.active_orders[order_id] = Order(
order_id=order_id,
price=buy_price,
quantity=self.order_amount,
side="BUY",
status="NEW",
)
self.current_buy_levels = 1
# 下单卖1
if (
base_balance - self.order_amount >= self.reserve_base
and self.is_order_value_valid(sell_price)
):
if order_id := self.api_place_order(sell_price, "SELL"):
self.active_orders[order_id] = Order(
order_id=order_id,
price=sell_price,
quantity=self.order_amount,
side="SELL",
status="NEW",
)
self.current_sell_levels = 1
# 扩展网格
self.extend_grid()
except Exception as e:
logger.error("网格初始化失败: %s", str(e))
self.initialize_grid()
def extend_grid(self):
"""扩展网格到指定层级"""
try:
lowest_buy, highest_sell = self.get_extreme_prices()
# 波动过大导致两侧无挂单,则重置机器人
if lowest_buy is None and highest_sell is None:
self.stop()
self.run()
# 扩展买单网格(向下)
while self.current_buy_levels < self.grid_count:
lowest_buy, _ = self.get_extreme_prices()
_, quote_balance = self.api_get_balances()
if lowest_buy is None:
lowest_buy = self.calculate_grid_price(
highest_sell, self.grid_count, "BUY"
)
new_buy_price = self.calculate_grid_price(lowest_buy, 1, "BUY")
required_quote = new_buy_price * self.order_amount
if quote_balance - required_quote >= self.reserve_quote:
if self.is_order_value_valid(new_buy_price):
if order_id := self.api_place_order(new_buy_price, "BUY"):
self.active_orders[order_id] = Order(
order_id=order_id,
price=new_buy_price,
quantity=self.order_amount,
side="BUY",
status="NEW",
)
self.current_buy_levels += 1
logger.info(
"扩展买单网格到层级 %d 价格: %f",
self.current_buy_levels,
new_buy_price,
)
else:
logger.warning(
"买单价值 %f 低于最小值 %f",
new_buy_price * self.order_amount,
self.min_order_value,
)
break
else:
logger.debug("报价货币余额不足,无法扩展买单网格")
break
# 扩展卖单网格(向上)
while self.current_sell_levels < self.grid_count:
_, highest_sell = self.get_extreme_prices()
base_balance, _ = self.api_get_balances()
if highest_sell is None:
highest_sell = self.calculate_grid_price(
lowest_buy, self.grid_count, "SELL"
)
new_sell_price = self.calculate_grid_price(highest_sell, 1, "SELL")
if base_balance - self.order_amount >= self.reserve_base:
if self.is_order_value_valid(new_sell_price):
if order_id := self.api_place_order(new_sell_price, "SELL"):
self.active_orders[order_id] = Order(
order_id=order_id,
price=new_sell_price,
quantity=self.order_amount,
side="SELL",
status="NEW",
)
self.current_sell_levels += 1
logger.info(
"扩展卖单网格到层级 %d 价格: %f",
self.current_sell_levels,
new_sell_price,
)
else:
logger.warning(
"卖单价值 %f 低于最小值 %f",
new_sell_price * self.order_amount,
self.min_order_value,
)
break
else:
logger.debug("基础货币余额不足,无法扩展卖单网格")
break
except Exception as e:
logger.error("扩展网格失败: %s", str(e))
self.extend_grid()
def adjust_grid_for_filled(self):
"""根据成交订单调整网格"""
try:
# 找出新成交的订单
filled_orders = [
o for o in self.active_orders.values() if o.status == "FILLED"
]
for order in filled_orders:
logger.info(
"处理成交订单: %s %s %f", order.order_id, order.side, order.price
)
# 从活跃订单中移除
self.active_orders.pop(order.order_id, None)
if order.side == "BUY":
self.current_buy_levels -= 1
# 买单成交时取消最高价卖单并下新卖单
if sell_orders := self.get_active_orders_by_side("SELL"):
highest_sell = max(sell_orders, key=lambda x: x.price)
self.api_cancel_order(highest_sell.order_id)
self.active_orders.pop(highest_sell.order_id, None)
# 下新卖单(卖0)
new_sell_price = self.calculate_grid_price(
order.price, 1, "SELL"
)
if self.is_order_value_valid(new_sell_price):
if order_id := self.api_place_order(new_sell_price, "SELL"):
self.active_orders[order_id] = Order(
order_id=order_id,
price=new_sell_price,
quantity=self.order_amount,
side="SELL",
status="NEW",
)
elif order.side == "SELL":
self.current_sell_levels -= 1
# 卖单成交时取消最低价买单并下新买单
if buy_orders := self.get_active_orders_by_side("BUY"):
lowest_buy = min(buy_orders, key=lambda x: x.price)
self.api_cancel_order(lowest_buy.order_id)
self.active_orders.pop(lowest_buy.order_id, None)
# 下新买单(买0)
new_buy_price = self.calculate_grid_price(order.price, 1, "BUY")
if self.is_order_value_valid(new_buy_price):
if order_id := self.api_place_order(new_buy_price, "BUY"):
self.active_orders[order_id] = Order(
order_id=order_id,
price=new_buy_price,
quantity=self.order_amount,
side="BUY",
status="NEW",
)
except Exception as e:
logger.error("调整网格失败: %s", str(e))
self.adjust_grid_for_filled()
def adjust_grid_for_violation(self):
"""处理价格超出网格范围的情况"""
try:
market_price = self.api_get_price()
lowest_buy, highest_sell = self.get_extreme_prices()
# 价格跌破最低买单网格
if lowest_buy is None and highest_sell is not None:
threshold = self.calculate_grid_price(
highest_sell, self.grid_count + 1, "BUY"
)
if market_price < threshold:
logger.warning(
"价格跌破网格下限(%f < %f),重置网格",
market_price,
threshold,
)
self.api_cancel_all_orders()
self.initialize_grid()
# 价格突破最高卖单网格
elif highest_sell is None and lowest_buy is not None:
threshold = self.calculate_grid_price(
lowest_buy, self.grid_count + 1, "SELL"
)
if market_price > threshold:
logger.warning(
"价格突破网格上限(%f > %f),重置网格",
market_price,
threshold,
)
self.api_cancel_all_orders()
self.initialize_grid()
except Exception as e:
logger.error("网格边界检查失败: %s", str(e))
self.adjust_grid_for_violation()
def run(self):
"""运行网格交易机器人主循环"""
self.running = True
logger.info("启动网格交易机器人,交易对: %s", self.symbol)
try:
self.initialize_grid()
while self.running:
try:
# 更新订单状态
self.api_update_order_statuses()
# 调整网格
self.adjust_grid_for_filled()
self.adjust_grid_for_violation()
# 扩展网格
self.extend_grid()
# 控制循环频率
time.sleep(1)
except Exception as e:
logger.error("主循环异常: %s3秒后重试...", str(e))
time.sleep(3)
except KeyboardInterrupt:
logger.info("接收到键盘中断信号,停止机器人...")
finally:
self.stop()
def stop(self):
"""停止机器人运行"""
self.running = False
self.api_cancel_all_orders()
logger.info("网格交易机器人已停止")
if __name__ == "__main__":
# 默认配置
config = {
"symbol": "BTCUSDC",
"csv_symbol": "BTCUSDT",
"csv_file": "output/mexc_spot_grid_trades.csv",
"grid_percentage": 0.0005,
"grid_count": 5,
"order_amount": 0.00001,
"min_order_value": 1,
"reserve_base": 0,
"reserve_quote": 0,
}
logger.info("创建网格交易机器人实例")
bot = GridTradingBot(config)
bot.run()