From 3714ea9a885656a78b5f7d04e85017bdd6aae64b Mon Sep 17 00:00:00 2001 From: Zichao Lin Date: Fri, 11 Jul 2025 21:40:49 +0800 Subject: [PATCH] initial commit --- .gitignore | 2 + config.py | 3 + main.py | 313 ++++++++++++++++++++++++++++ mexc_spot_v3.py | 535 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 853 insertions(+) create mode 100644 .gitignore create mode 100644 config.py create mode 100644 main.py create mode 100644 mexc_spot_v3.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1f93644 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +output/ +__pycache__/ \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..e0647bf --- /dev/null +++ b/config.py @@ -0,0 +1,3 @@ +mexc_host = "https://api.mexc.com" +api_key = "mx0vglky5BuzlcK5HQ" +secret_key = "e2a0c4737b4643bbac4ad5f8b26dcce2" diff --git a/main.py b/main.py new file mode 100644 index 0000000..f83f9f7 --- /dev/null +++ b/main.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +MEXC 交易机器人 + +功能: +1. 支持多种订单类型 (LIMIT, MARKET, LIMIT_MAKER, IMMEDIATE_OR_CANCEL, FILL_OR_KILL) +2. 证券代码映射功能 +3. 完整的交易记录和日志 +""" + +import csv +import logging +import os +from datetime import datetime +from typing import Dict, Any, Optional, Tuple + +import mexc_spot_v3 + +os.makedirs("output", exist_ok=True) + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler("output/mexc_trading_bot.log", encoding="utf-8"), + logging.StreamHandler(), + ], +) +logger = logging.getLogger(__name__) + + +class MexcSpotMarket: + """MEXC 市场数据查询类""" + + def __init__(self): + """初始化市场数据查询接口""" + self.market = mexc_spot_v3.mexc_market() + + def get_price(self, symbol: str) -> Optional[float]: + """ + 获取指定交易对的当前价格 + + Args: + symbol: 交易对,如 "BTCUSDT" + + Returns: + 当前价格(浮点数)或None(如果失败) + """ + params = {"symbol": symbol} + + try: + logger.info("查询交易对价格: %s", symbol) + price_data = self.market.get_price(params) + + if not price_data or "price" not in price_data: + logger.error("获取价格数据失败: %s", price_data) + return None + + price_str = price_data["price"] + price = float(price_str) + logger.info("获取价格成功: %s = %f", symbol, price) + return price + + except Exception as e: # pylint: disable=W0703 + logger.error("查询价格失败: %s", str(e)) + return None + + +class MexcSpotTrade: + """MEXC 现货交易类""" + + # 证券代码映射 (API代码: CSV记录代码) + SYMBOL_MAPPING = { + "BTCUSDC": "BTCUSDT", + # 可以在此添加更多映射 + } + + # 订单类型与必需参数 + ORDER_TYPE_REQUIREMENTS = { + "LIMIT": ["quantity", "price"], + "MARKET": ["quantity", "quoteOrderQty"], # 任选其一 + "LIMIT_MAKER": ["quantity", "price"], + "IMMEDIATE_OR_CANCEL": ["quantity", "price"], + "FILL_OR_KILL": ["quantity", "price"], + } + + def __init__(self): + """初始化交易机器人""" + self.trader = mexc_spot_v3.mexc_trade() + self.csv_file = "output/mexc_spot_trade.csv" + + def _api_get_order(self, symbol: str, order_id: str) -> Optional[Dict[str, Any]]: + """ + 查询订单状态 + + Args: + symbol: 交易对 + order_id: 订单ID + + Returns: + 订单详情字典或None(如果失败) + """ + params = { + "symbol": symbol, + "orderId": order_id, + } + + try: + logger.info("查询订单状态, 订单ID: %s", order_id) + order = self.trader.get_order(params) + logger.info("订单状态: %s", order.get("status")) + return order + except Exception as e: # pylint: disable=W0703 + logger.error("查询订单失败: %s", str(e)) + return None + + def _tool_map_symbol(self, symbol: str) -> str: + """映射证券代码用于记录""" + return self.SYMBOL_MAPPING.get(symbol, symbol) + + def _tool_validate_order_params( + self, order_type: str, params: Dict[str, Any] + ) -> Tuple[bool, str]: + """ + 验证订单参数是否符合要求 + + Args: + order_type: 订单类型 + params: 订单参数 + + Returns: + 元组(是否有效, 错误信息) + """ + required_params = self.ORDER_TYPE_REQUIREMENTS.get(order_type, []) + + if not required_params: + return False, f"未知的订单类型: {order_type}" + + # 特殊处理MARKET订单 + if order_type == "MARKET": + if "quantity" not in params and "quoteOrderQty" not in params: + return False, "MARKET订单需要quantity或quoteOrderQty参数" + return True, "" + + # 检查其他订单类型的必需参数 + missing_params = [p for p in required_params if p not in params] + if missing_params: + return False, f"{order_type}订单缺少必需参数: {', '.join(missing_params)}" + + return True, "" + + def _tool_record_transaction(self, order_data: Dict[str, Any]) -> bool: + """ + 记录交易到CSV文件 + + Args: + order_data: 订单数据字典 + + Returns: + 是否成功记录 + """ + try: + original_symbol = order_data["symbol"] + mapped_symbol = self._tool_map_symbol(original_symbol) + order_id = order_data["orderId"] + executed_qty = order_data["executedQty"] + cummulative_quote_qty = order_data["cummulativeQuoteQty"] + side = order_data["side"] + + # 确定交易类型显示 + trade_type = "买入" if side == "BUY" else "卖出" + + timestamp = datetime.fromtimestamp(order_data["time"] / 1000).strftime( + "%Y-%m-%dT%H:%M" + ) + + row = [ + timestamp, + trade_type, + mapped_symbol, + executed_qty, + cummulative_quote_qty, + "资金账户", + "FW#id", + f"MEXC API - Order ID: {order_id}", + ] + + # 检查文件是否存在 + file_exists = False + try: + with open(self.csv_file, "r", encoding="utf-8") as f: + file_exists = True + except FileNotFoundError: + pass + + # 写入CSV + with open(self.csv_file, "a", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + if not file_exists: + writer.writerow( + [ + "日期", + "类型", + "证券代码", + "份额", + "净额", + "现金账户", + "目标账户", + "备注", + ] + ) + writer.writerow(row) + + logger.info("交易记录成功, 订单ID: %s", order_id) + return True + except Exception as e: # pylint: disable=W0703 + logger.error("记录交易失败: %s", str(e)) + return False + + def trade( + self, symbol: str, order_type: str, side: str, **kwargs + ) -> Optional[Dict[str, Any]]: + """ + 执行现货交易 + + Args: + symbol: 交易对 (如 BTCUSDC) + order_type: 订单类型 (LIMIT, MARKET等) + side: 交易方向 (BUY, SELL) + **kwargs: 其他订单参数 + + Returns: + 订单信息字典或None(如果失败) + """ + # 基本参数验证 + if side not in ["BUY", "SELL"]: + logger.error("无效的交易方向: %s", side) + return None + + order_type = order_type.upper() + + # 准备订单参数 + base_params = {"symbol": symbol, "side": side, "type": order_type, **kwargs} + + # 验证参数 + is_valid, error_msg = self._tool_validate_order_params(order_type, base_params) + if not is_valid: + logger.error("订单参数验证失败: %s", error_msg) + return None + + try: + logger.info("准备下单 %s %s 订单, 交易对: %s", side, order_type, symbol) + + # 测试订单 + test_result = self.trader.post_order_test(base_params.copy()) + if test_result != {}: + logger.error("订单测试失败,参数有误: %s", test_result) + return None + + logger.info("订单参数测试通过,准备正式下单") + + # 正式下单 + order = self.trader.post_order(base_params.copy()) + order_id = order.get("orderId") + + if not order_id: + logger.error("下单失败: 未获取到订单ID") + logger.error(base_params.copy()) + logger.error(order) + return None + + logger.info("订单创建成功, 订单ID: %s", order_id) + + # 查询订单详情 + order_detail = self._api_get_order(symbol, order_id) + if not order_detail: + logger.error("获取订单详情失败") + return None + + # 记录交易 + if order_detail.get("status") == "FILLED": + self._tool_record_transaction(order_detail) + + return order_detail + + except Exception as e: # pylint: disable=W0703 + logger.error("交易执行失败: %s", str(e)) + return None + + +def main(): + """主函数""" + spot_trader = MexcSpotTrade() + + result = spot_trader.trade( + symbol="BTCUSDC", + order_type="MARKET", + side="BUY", + quoteOrderQty="1.5", + ) + + if result: + logger.info("交易执行成功") + logger.info("订单详情: %s", result) + else: + logger.error("交易执行失败") + + +if __name__ == "__main__": + main() diff --git a/mexc_spot_v3.py b/mexc_spot_v3.py new file mode 100644 index 0000000..77a751b --- /dev/null +++ b/mexc_spot_v3.py @@ -0,0 +1,535 @@ +import requests +import hmac +import hashlib +from urllib.parse import urlencode, quote +import config + +# ServerTime、Signature +class TOOL(object): + + def _get_server_time(self): + return requests.request('get', 'https://api.mexc.com/api/v3/time').json()['serverTime'] + + def _sign_v3(self, req_time, sign_params=None): + if sign_params: + sign_params = urlencode(sign_params, quote_via=quote) + to_sign = "{}×tamp={}".format(sign_params, req_time) + else: + to_sign = "timestamp={}".format(req_time) + sign = hmac.new(self.mexc_secret.encode('utf-8'), to_sign.encode('utf-8'), hashlib.sha256).hexdigest() + return sign + + def public_request(self, method, url, params=None): + url = '{}{}'.format(self.hosts, url) + return requests.request(method, url, params=params) + + def sign_request(self, method, url, params=None): + url = '{}{}'.format(self.hosts, url) + req_time = self._get_server_time() + if params: + params['signature'] = self._sign_v3(req_time=req_time, sign_params=params) + else: + params = {} + params['signature'] = self._sign_v3(req_time=req_time) + params['timestamp'] = req_time + headers = { + 'x-mexc-apikey': self.mexc_key, + 'Content-Type': 'application/json', + } + return requests.request(method, url, params=params, headers=headers) + + +# Market Data +class mexc_market(TOOL): + + def __init__(self): + self.api = '/api/v3' + self.hosts = config.mexc_host + self.method = 'GET' + + def get_ping(self): + """test connectivity""" + url = '{}{}'.format(self.api, '/ping') + response = self.public_request(self.method, url) + return response.json() + + def get_timestamp(self): + """get sever time""" + url = '{}{}'.format(self.api, '/time') + response = self.public_request(self.method, url) + return response.json() + + def get_defaultSymbols(self): + """get defaultSymbols""" + url = '{}{}'.format(self.api, '/defaultSymbols') + response = self.public_request(self.method, url) + return response.json() + + def get_exchangeInfo(self, params=None): + """get exchangeInfo""" + url = '{}{}'.format(self.api, '/exchangeInfo') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_depth(self, params): + """get symbol depth""" + url = '{}{}'.format(self.api, '/depth') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_deals(self, params): + """get current trade deals list""" + url = '{}{}'.format(self.api, '/trades') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_aggtrades(self, params): + """get aggregate trades list""" + url = '{}{}'.format(self.api, '/aggTrades') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_kline(self, params): + """get k-line data""" + url = '{}{}'.format(self.api, '/klines') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_avgprice(self, params): + """get current average prcie(default : 5m)""" + url = '{}{}'.format(self.api, '/avgPrice') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_24hr_ticker(self, params=None): + """get 24hr prcie ticker change statistics""" + url = '{}{}'.format(self.api, '/ticker/24hr') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_price(self, params=None): + """get symbol price ticker""" + url = '{}{}'.format(self.api, '/ticker/price') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_bookticker(self, params=None): + """get symbol order book ticker""" + url = '{}{}'.format(self.api, '/ticker/bookTicker') + response = self.public_request(self.method, url, params=params) + return response.json() + + def get_ETF_info(self, params=None): + """get ETF information""" + url = '{}{}'.format(self.api, '/etf/info') + response = self.public_request(self.method, url, params=params) + return response.json() + + +# Spot Trade +class mexc_trade(TOOL): + + def __init__(self): + self.api = '/api/v3' + self.hosts = config.mexc_host + self.mexc_key = config.api_key + self.mexc_secret = config.secret_key + + def get_selfSymbols(self): + """get currency information""" + method = 'GET' + url = '{}{}'.format(self.api, '/selfSymbols') + response = self.sign_request(method, url) + return response.json() + + def post_order_test(self, params): + """test new order""" + method = 'POST' + url = '{}{}'.format(self.api, '/order/test') + response = self.sign_request(method, url, params=params) + return response.json() + + def post_order(self, params): + """place order""" + method = 'POST' + url = '{}{}'.format(self.api, '/order') + response = self.sign_request(method, url, params=params) + return response.json() + + def post_batchorders(self, params): + """place batch orders(same symbol)""" + method = 'POST' + url = '{}{}'.format(self.api, '/batchOrders') + params = {"batchOrders": str(params)} + response = self.sign_request(method, url, params=params) + print(response.url) + return response.json() + + def delete_order(self, params): + """ + cancel order + 'origClientOrderId' or 'orderId' must be sent + """ + method = 'DELETE' + url = '{}{}'.format(self.api, '/order') + response = self.sign_request(method, url, params=params) + return response.json() + + def delete_openorders(self, params): + """ + cancel all order for a single symbol + """ + method = 'DELETE' + url = '{}{}'.format(self.api, '/openOrders') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_order(self, params): + """ + get order + 'origClientOrderId' or 'orderId' must be sent + """ + method = 'GET' + url = '{}{}'.format(self.api, '/order') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_openorders(self, params): + """get current pending order """ + method = 'GET' + url = '{}{}'.format(self.api, '/openOrders') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_allorders(self, params): + """ + get current all order + startTime and endTime need to use at the same time + """ + method = 'GET' + url = '{}{}'.format(self.api, '/allOrders') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_mytrades(self, params): + """ + get current all order + orderId need to use with symbol at the same time + """ + method = 'GET' + url = '{}{}'.format(self.api, '/myTrades') + response = self.sign_request(method, url, params=params) + return response.json() + + def post_mxDeDuct(self, params): + """Enable MX DeDuct""" + method = 'POST' + url = '{}{}'.format(self.api, '/mxDeduct/enable') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_mxDeDuct(self): + """MX DeDuct status""" + method = 'GET' + url = '{}{}'.format(self.api, '/mxDeduct/enable') + response = self.sign_request(method, url) + return response.json() + + def get_account_info(self): + """get account information""" + method = 'GET' + url = '{}{}'.format(self.api, '/account') + response = self.sign_request(method, url) + return response.json() + + +# Wallet +class mexc_wallet(TOOL): + + def __init__(self): + self.api = '/api/v3/capital' + self.hosts = config.mexc_host + self.mexc_key = config.api_key + self.mexc_secret = config.secret_key + + def get_coinlist(self): + """get currency information""" + method = 'GET' + url = '{}{}'.format(self.api, '/config/getall') + response = self.sign_request(method, url) + return response.json() + + def post_withdraw(self, params): + """withdraw""" + method = 'POST' + url = '{}{}'.format(self.api, '/withdraw/apply') + response = self.sign_request(method, url, params=params) + return response.json() + + def cancel_withdraw(self, params): + """withdraw""" + method = 'DELETE' + url = '{}{}'.format(self.api, '/withdraw') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_deposit_list(self, params): + """deposit history list""" + method = 'GET' + url = '{}{}'.format(self.api, '/deposit/hisrec') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_withdraw_list(self, params): + """withdraw history list""" + method = 'GET' + url = '{}{}'.format(self.api, '/withdraw/history') + response = self.sign_request(method, url, params=params) + return response.json() + + def post_deposit_address(self, params): + """generate deposit address""" + method = 'POST' + url = '{}{}'.format(self.api, '/deposit/address') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_deposit_address(self, params): + """get deposit address""" + method = 'GET' + url = '{}{}'.format(self.api, '/deposit/address') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_withdraw_address(self, params): + """get deposit address""" + method = 'GET' + url = '{}{}'.format(self.api, '/withdraw/address') + response = self.sign_request(method, url, params=params) + return response.json() + + def post_transfer(self, params): + """universal transfer""" + method = 'POST' + url = '{}{}'.format(self.api, '/transfer') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_transfer_list(self, params): + """universal transfer history""" + method = 'GET' + url = '{}{}'.format(self.api, '/transfer') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_transfer_list_byId(self, params): + """universal transfer history (by tranId)""" + method = 'GET' + url = '{}{}'.format(self.api, '/transfer/tranId') + response = self.sign_request(method, url, params=params) + return response.json() + + def post_transfer_internal(self, params): + """universal transfer""" + method = 'POST' + url = '{}{}'.format(self.api, '/transfer/internal') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_transfer_internal_list(self, params=None): + """universal transfer""" + method = 'GET' + url = '{}{}'.format(self.api, '/transfer/internal') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_smallAssets_list(self): + """small Assets convertible list""" + method = 'GET' + url = '{}{}'.format(self.api, '/convert/list') + response = self.sign_request(method, url) + return response.json() + + def post_smallAssets_convert(self, params): + """small Assets convert""" + method = 'POST' + url = '{}{}'.format(self.api, '/convert') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_smallAssets_history(self, params=None): + """small Assets convertible history""" + method = 'GET' + url = '{}{}'.format(self.api, '/convert') + response = self.sign_request(method, url, params=params) + return response.json() + + +# Sub-Account +class mexc_subaccount(TOOL): + + def __init__(self): + self.api = '/api/v3' + self.hosts = config.mexc_host + self.mexc_key = config.api_key + self.mexc_secret = config.secret_key + + def post_virtualSubAccount(self, params): + """create a sub-account""" + method = 'POST' + url = '{}{}'.format(self.api, '/sub-account/virtualSubAccount') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_SubAccountList(self, params=None): + """get sub-account list""" + method = 'GET' + url = '{}{}'.format(self.api, '/sub-account/list') + response = self.sign_request(method, url, params=params) + return response.json() + + def post_virtualApiKey(self, params): + """create sub-account's apikey""" + method = 'POST' + url = '{}{}'.format(self.api, '/sub-account/apiKey') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_virtualApiKey(self, params): + """get sub-account's apikey""" + method = 'GET' + url = '{}{}'.format(self.api, '/sub-account/apiKey') + response = self.sign_request(method, url, params=params) + return response.json() + + def delete_virtualApiKey(self, params): + """delete sub-account's apikey""" + method = 'DELETE' + url = '{}{}'.format(self.api, '/sub-account/apiKey') + response = self.sign_request(method, url, params=params) + return response.json() + + def post_universalTransfer(self, params): + """universal transfer between accounts""" + method = 'POST' + url = '{}{}'.format(self.api, '/capital/sub-account/universalTransfer') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_universalTransfer(self, params): + """universal transfer history between accounts""" + method = 'GET' + url = '{}{}'.format(self.api, '/capital/sub-account/universalTransfer') + response = self.sign_request(method, url, params=params) + return response.json() + + +# Rebate +class mexc_rebate(TOOL): + + def __init__(self): + self.api = '/api/v3/rebate' + self.hosts = config.mexc_host + self.mexc_key = config.api_key + self.mexc_secret = config.secret_key + + def get_taxQuery(self, params=None): + """get the rebate commission record""" + method = 'GET' + url = '{}{}'.format(self.api, '/taxQuery') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_rebate_detail(self, params=None): + """get rebate record details""" + method = 'GET' + url = '{}{}'.format(self.api, '/detail') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_kickback_detail(self, params=None): + """get self-return record details""" + method = 'GET' + url = '{}{}'.format(self.api, '/detail/kickback') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_inviter(self, params=None): + """get self-return record details""" + method = 'GET' + url = '{}{}'.format(self.api, '/referCode') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_affiliate_commission(self, params=None): + """get affiliate commission history""" + method = 'GET' + url = '{}{}'.format(self.api, '/affiliate/commission') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_affiliate_withdraw(self, params=None): + """get affiliate withdraw history""" + method = 'GET' + url = '{}{}'.format(self.api, '/affiliate/withdraw') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_affiliate_commission_detail(self, params=None): + """get affiliate commission details""" + method = 'GET' + url = '{}{}'.format(self.api, '/affiliate/commission/detail') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_affiliate_referral(self, params=None): + """get affiliate referral""" + method = 'GET' + url = '{}{}'.format(self.api, '/affiliate/referral') + response = self.sign_request(method, url, params=params) + return response.json() + + def get_affiliate_subaffiliates(self, params=None): + """get affiliate subaffiliates""" + method = 'GET' + url = '{}{}'.format(self.api, '/affiliate/subaffiliates') + response = self.sign_request(method, url, params=params) + return response.json() + + +# WebSocket ListenKey +class mexc_listenkey(TOOL): + + def __init__(self): + self.api = '/api/v3' + self.hosts = config.mexc_host + self.mexc_key = config.api_key + self.mexc_secret = config.secret_key + + def post_listenKey(self): + """ generate ListenKey """ + method = 'POST' + url = '{}{}'.format(self.api, '/userDataStream') + response = self.sign_request(method, url) + return response.json() + + def get_listenKey(self): + """ get valid ListenKey """ + method = 'GET' + url = '{}{}'.format(self.api, '/userDataStream') + response = self.sign_request(method, url) + return response.json() + + def put_listenKey(self, params): + """ extend ListenKey validity """ + method = 'PUT' + url = '{}{}'.format(self.api, '/userDataStream') + response = self.sign_request(method, url, params=params) + return response.json() + + def delete_listenKey(self, params): + """ delete ListenKey """ + method = 'DELETE' + url = '{}{}'.format(self.api, '/userDataStream') + response = self.sign_request(method, url, params=params) + return response.json()