235 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			235 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""DEXRP价格数据收集与分析模块,自动获取交易数据并生成每日统计"""
 | 
						||
 | 
						||
import json
 | 
						||
import time
 | 
						||
import subprocess
 | 
						||
from datetime import datetime
 | 
						||
from collections import defaultdict
 | 
						||
import logging
 | 
						||
from pathlib import Path
 | 
						||
 | 
						||
import requests
 | 
						||
 | 
						||
# 配置常量
 | 
						||
API_URL = "https://backend.dexrp.io/vending/last"  # DEXRP API地址
 | 
						||
JSON_FILE = "price.json"  # 价格数据存储文件
 | 
						||
HASH_FILE = "processed.json"  # 已处理交易哈希记录文件
 | 
						||
 | 
						||
# 全局变量
 | 
						||
SEEN_TXHASHES = set()  # 内存中的已处理交易哈希集合
 | 
						||
GIT_INTERVAL = 86400  # Git提交间隔(秒) 1天
 | 
						||
FETCH_INTERVAL = 10  # API轮询间隔(秒)
 | 
						||
LOGGER = None  # 添加全局logger变量
 | 
						||
 | 
						||
 | 
						||
def setup_logging():
 | 
						||
    """配置日志记录"""
 | 
						||
    global LOGGER  # 声明使用全局logger
 | 
						||
    log_file = Path("output.log")
 | 
						||
 | 
						||
    logging.basicConfig(
 | 
						||
        level=logging.WARNING,  # 只记录WARNING及以上级别
 | 
						||
        format="%(asctime)s - %(levelname)s - %(message)s",
 | 
						||
        handlers=[
 | 
						||
            logging.FileHandler(log_file, encoding="utf-8"),
 | 
						||
        ],
 | 
						||
    )
 | 
						||
    LOGGER = logging.getLogger(__name__)  # 赋值给全局logger
 | 
						||
    return LOGGER
 | 
						||
 | 
						||
 | 
						||
def fetch_latest_transactions():
 | 
						||
    """从API获取最新交易数据并过滤已处理记录"""
 | 
						||
    try:
 | 
						||
        response = requests.get(API_URL, timeout=10)
 | 
						||
        response.raise_for_status()
 | 
						||
        transactions = response.json()
 | 
						||
        return [tx for tx in transactions if tx["transactionHash"] not in SEEN_TXHASHES]
 | 
						||
    except requests.RequestException as e:
 | 
						||
        LOGGER.error("API请求失败: %s", e)
 | 
						||
        return None
 | 
						||
 | 
						||
 | 
						||
def calculate_daily_stats(transactions):
 | 
						||
    """计算每日统计数据(开盘价、收盘价、最高价、最低价、交易量)"""
 | 
						||
    daily_data = defaultdict(
 | 
						||
        lambda: {
 | 
						||
            "open": None,
 | 
						||
            "close": None,
 | 
						||
            "high": float("-inf"),
 | 
						||
            "low": float("inf"),
 | 
						||
            "tokensSold": 0,
 | 
						||
        }
 | 
						||
    )
 | 
						||
 | 
						||
    for tx in transactions:
 | 
						||
        # 使用本地时区转换时间戳
 | 
						||
        date = datetime.fromtimestamp(tx["blockTimestamp"]).strftime("%Y-%m-%d")
 | 
						||
        price = tx.get("price", 0)
 | 
						||
        tokens_sold = tx.get("tokensSold", 0)
 | 
						||
 | 
						||
        if daily_data[date]["open"] is None:
 | 
						||
            daily_data[date]["open"] = price
 | 
						||
        daily_data[date]["close"] = price
 | 
						||
        daily_data[date]["high"] = max(daily_data[date]["high"], price)
 | 
						||
        daily_data[date]["low"] = min(daily_data[date]["low"], price)
 | 
						||
        daily_data[date]["tokensSold"] += tokens_sold
 | 
						||
 | 
						||
    return [
 | 
						||
        {
 | 
						||
            "date": date,
 | 
						||
            "open": data["open"],
 | 
						||
            "close": data["close"],
 | 
						||
            "high": data["high"],
 | 
						||
            "low": data["low"],
 | 
						||
            "tokensSold": data["tokensSold"],
 | 
						||
        }
 | 
						||
        for date, data in daily_data.items()
 | 
						||
    ]
 | 
						||
 | 
						||
 | 
						||
def update_json_file(new_data):
 | 
						||
    """Update JSON file with new transaction data."""
 | 
						||
    try:
 | 
						||
        LOGGER.info("开始更新JSON文件,收到%d条新交易数据", len(new_data))
 | 
						||
        # 读取现有数据,如果文件不存在则初始化为空数组
 | 
						||
        try:
 | 
						||
            with open(JSON_FILE, "r", encoding="utf-8") as file:
 | 
						||
                existing_data = json.load(file)
 | 
						||
        except FileNotFoundError:
 | 
						||
            existing_data = []
 | 
						||
 | 
						||
        # 确保existing_data是列表
 | 
						||
        if not isinstance(existing_data, list):
 | 
						||
            existing_data = []
 | 
						||
 | 
						||
        # 计算每日统计数据
 | 
						||
        processed_data = calculate_daily_stats(new_data)
 | 
						||
 | 
						||
        # 合并数据而不是追加
 | 
						||
        existing_dates = {item["date"]: item for item in existing_data}
 | 
						||
        for new_item in processed_data:
 | 
						||
            date = new_item["date"]
 | 
						||
            if date in existing_dates:
 | 
						||
                # 合并相同日期的记录
 | 
						||
                existing_item = existing_dates[date]
 | 
						||
                existing_item["close"] = new_item["close"]
 | 
						||
                existing_item["high"] = max(existing_item["high"], new_item["high"])
 | 
						||
                existing_item["low"] = min(existing_item["low"], new_item["low"])
 | 
						||
                existing_item["tokensSold"] += new_item["tokensSold"]
 | 
						||
            else:
 | 
						||
                existing_data.append(new_item)
 | 
						||
                existing_dates[date] = new_item
 | 
						||
 | 
						||
        # 写回文件
 | 
						||
        with open(JSON_FILE, "w", encoding="utf-8") as file:
 | 
						||
            json.dump(existing_data, file, indent=4, ensure_ascii=False)
 | 
						||
 | 
						||
        LOGGER.info(
 | 
						||
            "成功更新%s,合并%d条记录,总计%d条记录",
 | 
						||
            JSON_FILE,
 | 
						||
            len(processed_data),
 | 
						||
            len(existing_data),
 | 
						||
        )
 | 
						||
    except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
 | 
						||
        LOGGER.error("更新JSON文件时发生错误: %s,跳过本次更新", e)
 | 
						||
 | 
						||
 | 
						||
def git_commit_and_push():
 | 
						||
    """自动提交并推送数据更新到Git仓库"""
 | 
						||
    try:
 | 
						||
        subprocess.run(["git", "add", "."], check=True)
 | 
						||
        subprocess.run(
 | 
						||
            [
 | 
						||
                "git",
 | 
						||
                "commit",
 | 
						||
                "--no-verify",
 | 
						||
                "--no-gpg-sign",
 | 
						||
                "-m",
 | 
						||
                str(datetime.now()),
 | 
						||
            ],
 | 
						||
            check=True,
 | 
						||
        )
 | 
						||
        subprocess.run(["git", "push"], check=True)
 | 
						||
        LOGGER.info("Git提交成功")
 | 
						||
    except subprocess.CalledProcessError as e:
 | 
						||
        LOGGER.error("Git操作失败: %s", e)
 | 
						||
 | 
						||
def general_info():
 | 
						||
    """获取general信息"""
 | 
						||
    LOGGER.info("获取general信息")
 | 
						||
    try:
 | 
						||
        process = subprocess.Popen(["python3", "general.py"])
 | 
						||
        process.wait()
 | 
						||
    except subprocess.CalledProcessError as e:
 | 
						||
        LOGGER.error("获取general信息失败: %s", e)
 | 
						||
 | 
						||
def load_processed_hashes():
 | 
						||
    """Load processed transaction hashes from file."""
 | 
						||
    try:
 | 
						||
        with open(HASH_FILE, "r", encoding="utf-8") as file:
 | 
						||
            return set(json.load(file))
 | 
						||
    except (FileNotFoundError, json.JSONDecodeError):
 | 
						||
        return set()
 | 
						||
 | 
						||
 | 
						||
def save_processed_hashes():
 | 
						||
    """Save processed transaction hashes to file."""
 | 
						||
    try:
 | 
						||
        # 读取现有数据
 | 
						||
        try:
 | 
						||
            with open(HASH_FILE, "r", encoding="utf-8") as file:
 | 
						||
                existing_data = json.load(file)
 | 
						||
        except (FileNotFoundError, json.JSONDecodeError):
 | 
						||
            existing_data = []
 | 
						||
 | 
						||
        # 合并新哈希并去重,保持原有顺序
 | 
						||
        new_hashes = [h for h in SEEN_TXHASHES if h not in existing_data]
 | 
						||
        updated_data = existing_data + new_hashes
 | 
						||
 | 
						||
        # 写回文件
 | 
						||
        with open(HASH_FILE, "w", encoding="utf-8") as file:
 | 
						||
            json.dump(updated_data, file, indent=4, ensure_ascii=False)
 | 
						||
 | 
						||
        LOGGER.info(
 | 
						||
            "成功更新%s,新增%d条哈希,总计%d条记录",
 | 
						||
            HASH_FILE,
 | 
						||
            len(new_hashes),
 | 
						||
            len(updated_data),
 | 
						||
        )
 | 
						||
    except IOError as e:
 | 
						||
        LOGGER.error("保存已处理交易哈希时发生错误: %s", e)
 | 
						||
 | 
						||
 | 
						||
def main():
 | 
						||
    """主循环,定期获取数据并更新"""
 | 
						||
    global SEEN_TXHASHES, LOGGER  # 添加logger到全局声明
 | 
						||
    setup_logging()  # 初始化logger
 | 
						||
    LOGGER.info("程序启动")
 | 
						||
    SEEN_TXHASHES = load_processed_hashes()
 | 
						||
    last_git_time = 0
 | 
						||
 | 
						||
    while True:
 | 
						||
        setup_logging()
 | 
						||
        try:
 | 
						||
            if new_transactions := fetch_latest_transactions():
 | 
						||
                LOGGER.info("获取到%d条新交易", len(new_transactions))
 | 
						||
                SEEN_TXHASHES.update(tx["transactionHash"] for tx in new_transactions)
 | 
						||
                save_processed_hashes()
 | 
						||
                update_json_file(new_transactions)
 | 
						||
 | 
						||
            current_time = time.time()
 | 
						||
            if current_time - last_git_time >= GIT_INTERVAL:
 | 
						||
                general_info()
 | 
						||
                git_commit_and_push()
 | 
						||
                last_git_time = current_time
 | 
						||
 | 
						||
            time.sleep(FETCH_INTERVAL)
 | 
						||
        except Exception as e:
 | 
						||
            LOGGER.error("错误: %s,继续运行...", e)
 | 
						||
            continue
 | 
						||
 | 
						||
 | 
						||
if __name__ == "__main__":
 | 
						||
    main()
 |