"""DEXRP价格数据收集与分析模块,自动获取交易数据并生成每日统计""" import json import time import subprocess from datetime import datetime from collections import defaultdict import logging from pathlib import Path import requests # 配置常量 API_URL = "https://backend.dexrp.io/vending/last" # DEXRP API地址 JSON_FILE = "price.json" # 价格数据存储文件 HASH_FILE = "processed.json" # 已处理交易哈希记录文件 LOG_DIR = "logs" LOG_FILE_FORMAT = "%Y-%m-%d.log" # 全局变量 SEEN_TXHASHES = set() # 内存中的已处理交易哈希集合 GIT_INTERVAL = 3600 # Git提交间隔(秒) FETCH_INTERVAL = 10 # API轮询间隔(秒) LOGGER = None # 添加全局logger变量 def setup_logging(): """配置日志记录""" global LOGGER # 声明使用全局logger Path(LOG_DIR).mkdir(exist_ok=True) log_file = Path(LOG_DIR) / datetime.now().strftime(LOG_FILE_FORMAT) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(log_file, encoding="utf-8"), logging.StreamHandler(), ], ) LOGGER = logging.getLogger(__name__) # 赋值给全局logger return LOGGER def fetch_latest_transactions(): """从API获取最新交易数据并过滤已处理记录""" try: response = requests.get(API_URL, timeout=10) response.raise_for_status() transactions = response.json() # transactions = json.loads( # open("D:/DOWNLOADS/last.json", "r", encoding="utf-8").read() # ) return [tx for tx in transactions if tx["transactionHash"] not in SEEN_TXHASHES] except requests.RequestException as e: LOGGER.error("API请求失败: %s", e) return None def calculate_daily_stats(transactions): """计算每日统计数据(开盘价、收盘价、最高价、最低价、交易量)""" daily_data = defaultdict( lambda: { "open": None, "close": None, "high": float("-inf"), "low": float("inf"), "tokensSold": 0, } ) for tx in transactions: # 使用本地时区转换时间戳 date = datetime.fromtimestamp(tx["blockTimestamp"]).strftime("%Y-%m-%d") price = tx.get("price", 0) tokens_sold = tx.get("tokensSold", 0) if daily_data[date]["open"] is None: daily_data[date]["open"] = price daily_data[date]["close"] = price daily_data[date]["high"] = max(daily_data[date]["high"], price) daily_data[date]["low"] = min(daily_data[date]["low"], price) daily_data[date]["tokensSold"] += tokens_sold return [ { "date": date, "open": data["open"], "close": data["close"], "high": data["high"], "low": data["low"], "tokensSold": data["tokensSold"], } for date, data in daily_data.items() ] def update_json_file(new_data): """Update JSON file with new transaction data.""" try: LOGGER.info("开始更新JSON文件,收到%d条新交易数据", len(new_data)) # 读取现有数据,如果文件不存在则初始化为空数组 try: with open(JSON_FILE, "r", encoding="utf-8") as file: existing_data = json.load(file) except FileNotFoundError: existing_data = [] # 确保existing_data是列表 if not isinstance(existing_data, list): existing_data = [] # 计算每日统计数据 processed_data = calculate_daily_stats(new_data) # 合并数据而不是追加 existing_dates = {item["date"]: item for item in existing_data} for new_item in processed_data: date = new_item["date"] if date in existing_dates: # 合并相同日期的记录 existing_item = existing_dates[date] existing_item["close"] = new_item["close"] existing_item["high"] = max(existing_item["high"], new_item["high"]) existing_item["low"] = min(existing_item["low"], new_item["low"]) existing_item["tokensSold"] += new_item["tokensSold"] else: existing_data.append(new_item) existing_dates[date] = new_item # 写回文件 with open(JSON_FILE, "w", encoding="utf-8") as file: json.dump(existing_data, file, indent=4, ensure_ascii=False) LOGGER.info( "成功更新%s,合并%d条记录,总计%d条记录", JSON_FILE, len(processed_data), len(existing_data) ) except (FileNotFoundError, json.JSONDecodeError, IOError) as e: LOGGER.error("更新JSON文件时发生错误: %s,跳过本次更新", e) def git_commit_and_push(): """自动提交并推送数据更新到Git仓库""" try: subprocess.run(["git", "add", "."], check=True) subprocess.run( [ "git", "commit", "--no-verify", "--no-gpg-sign", "-m", f"Auto update at {datetime.now()}", ], check=True, ) subprocess.run(["git", "push"], check=True) LOGGER.info("Git提交成功") except subprocess.CalledProcessError as e: LOGGER.error("Git操作失败: %s", e) def load_processed_hashes(): """Load processed transaction hashes from file.""" try: with open(HASH_FILE, "r", encoding="utf-8") as file: return set(json.load(file)) except (FileNotFoundError, json.JSONDecodeError): return set() def save_processed_hashes(): """Save processed transaction hashes to file.""" try: with open(HASH_FILE, "w", encoding="utf-8") as file: json.dump(list(SEEN_TXHASHES), file, indent=4, ensure_ascii=False) LOGGER.info("成功保存已处理交易哈希到%s", HASH_FILE) except IOError as e: LOGGER.error("保存已处理交易哈希时发生错误: %s", e) def main(): """主循环,定期获取数据并更新""" global SEEN_TXHASHES, LOGGER # 添加logger到全局声明 setup_logging() # 初始化logger LOGGER.info("程序启动") SEEN_TXHASHES = load_processed_hashes() last_git_time = 0 while True: setup_logging() try: LOGGER.info("获取最新交易数据...") if new_transactions := fetch_latest_transactions(): LOGGER.info("获取到%d条新交易", len(new_transactions)) SEEN_TXHASHES.update(tx["transactionHash"] for tx in new_transactions) save_processed_hashes() update_json_file(new_transactions) current_time = time.time() if current_time - last_git_time >= GIT_INTERVAL: git_commit_and_push() last_git_time = current_time time.sleep(FETCH_INTERVAL) except Exception as e: LOGGER.error("错误: %s,继续运行...", e) continue if __name__ == "__main__": main()