diff --git a/main.py b/main.py new file mode 100644 index 0000000..4f1c2b0 --- /dev/null +++ b/main.py @@ -0,0 +1,182 @@ +"""DEXRP价格数据收集与分析模块,自动获取交易数据并生成每日统计""" + +import json +import time +import subprocess +from datetime import datetime, timezone +from collections import defaultdict + +import requests + +# 配置常量 +API_URL = "https://backend.dexrp.io/vending/last" # DEXRP API地址 +JSON_FILE = "price.json" # 价格数据存储文件 +HASH_FILE = "processed.json" # 已处理交易哈希记录文件 + +# 全局变量 +SEEN_TXHASHES = set() # 内存中的已处理交易哈希集合 +GIT_INTERVAL = 300 # Git提交间隔(秒) +FETCH_INTERVAL = 10 # API轮询间隔(秒) + + +def fetch_latest_transactions(): + """从API获取最新交易数据并过滤已处理记录""" + try: + response = requests.get(API_URL, timeout=10) + response.raise_for_status() + transactions = response.json() + # transactions = json.loads( + # open("D:/DOWNLOADS/last.json", "r", encoding="utf-8").read() + # ) + return [tx for tx in transactions if tx["transactionHash"] not in SEEN_TXHASHES] + except requests.RequestException as e: + print(f"API请求失败: {e}") + return None + + +def calculate_daily_stats(transactions): + """计算每日统计数据(开盘价、收盘价、最高价、最低价、交易量)""" + daily_data = defaultdict( + lambda: { + "open": None, + "close": None, + "high": float("-inf"), + "low": float("inf"), + "tokensSold": 0, + } + ) + + for tx in transactions: + date = datetime.fromtimestamp(tx["blockTimestamp"], timezone.utc).strftime("%Y-%m-%d") + price = tx.get("price", 0) + tokens_sold = tx.get("tokensSold", 0) + + if daily_data[date]["open"] is None: + daily_data[date]["open"] = price + daily_data[date]["close"] = price + daily_data[date]["high"] = max(daily_data[date]["high"], price) + daily_data[date]["low"] = min(daily_data[date]["low"], price) + daily_data[date]["tokensSold"] += tokens_sold + + return [ + { + "date": date, + "open": data["open"], + "close": data["close"], + "high": data["high"], + "low": data["low"], + "tokensSold": data["tokensSold"], + } + for date, data in daily_data.items() + ] + + +def update_json_file(new_data): + """Update JSON file with new transaction data.""" + try: + print(f"开始更新JSON文件,收到{len(new_data)}条新交易数据") + # 读取现有数据,如果文件不存在则初始化为空数组 + try: + with open(JSON_FILE, "r", encoding="utf-8") as file: + existing_data = json.load(file) + except FileNotFoundError: + existing_data = [] + + # 确保existing_data是列表 + if not isinstance(existing_data, list): + existing_data = [] + + # 计算每日统计数据 + processed_data = calculate_daily_stats(new_data) + + # 合并数据而不是追加 + existing_dates = {item["date"]: item for item in existing_data} + for new_item in processed_data: + date = new_item["date"] + if date in existing_dates: + # 合并相同日期的记录 + existing_item = existing_dates[date] + existing_item["close"] = new_item["close"] + existing_item["high"] = max(existing_item["high"], new_item["high"]) + existing_item["low"] = min(existing_item["low"], new_item["low"]) + existing_item["tokensSold"] += new_item["tokensSold"] + else: + existing_data.append(new_item) + existing_dates[date] = new_item + + # 写回文件 + with open(JSON_FILE, "w", encoding="utf-8") as file: + json.dump(existing_data, file, indent=4, ensure_ascii=False) + + print( + f"成功更新{JSON_FILE},合并{len(processed_data)}条记录,总计{len(existing_data)}条记录" + ) + except (FileNotFoundError, json.JSONDecodeError, IOError) as e: + print(f"更新JSON文件时发生错误: {e},跳过本次更新") + + +def git_commit_and_push(): + """自动提交并推送数据更新到Git仓库""" + try: + subprocess.run(["git", "add", "."], check=True) + subprocess.run( + [ + "git", + "commit", + "--no-verify", + "--no-gpg-sign", + "-m", + f"Auto update at {datetime.now()}", + ], + check=True, + ) + subprocess.run(["git", "push"], check=True) + print("Git提交成功") + except subprocess.CalledProcessError as e: + print(f"Git操作失败: {e}") + + +def load_processed_hashes(): + """Load processed transaction hashes from file.""" + try: + with open(HASH_FILE, "r", encoding="utf-8") as file: + return set(json.load(file)) + except (FileNotFoundError, json.JSONDecodeError): + return set() + + +def save_processed_hashes(): + """Save processed transaction hashes to file.""" + with open(HASH_FILE, "w", encoding="utf-8") as file: + json.dump(list(SEEN_TXHASHES), file) + + +def main(): + """主循环,定期获取数据并更新""" + global SEEN_TXHASHES + SEEN_TXHASHES = load_processed_hashes() + last_git_time = 0 + + while True: + try: + current_time = time.time() + print(f"{datetime.now()}: 获取最新交易数据...") + + if new_transactions := fetch_latest_transactions(): + print(f"获取到{len(new_transactions)}条新交易") + SEEN_TXHASHES.update(tx["transactionHash"] for tx in new_transactions) + save_processed_hashes() + update_json_file(new_transactions) + + if current_time - last_git_time >= GIT_INTERVAL: + git_commit_and_push() + last_git_time = current_time + + time.sleep(FETCH_INTERVAL) + except Exception as e: + print(f"错误: {e},继续运行...") + continue + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/price.csv b/price.csv deleted file mode 100644 index dc25f80..0000000 --- a/price.csv +++ /dev/null @@ -1,2 +0,0 @@ -日期,当日最高,当日最低,报价,成交量 -20250727,0.03515,0.03515,0.03515,0 \ No newline at end of file