Files
dexrp-price/main.py
2025-07-30 15:44:16 +08:00

241 lines
7.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""DEXRP价格数据收集与分析模块自动获取交易数据并生成每日统计"""
import json
import time
import subprocess
from datetime import datetime
from collections import defaultdict
import logging
from pathlib import Path
import requests
# 配置常量
API_URL = "https://backend.dexrp.io/vending/last" # DEXRP API地址
JSON_FILE = "price.json" # 价格数据存储文件
HASH_FILE = "processed.json" # 已处理交易哈希记录文件
LOG_DIR = "logs"
LOG_FILE_FORMAT = "%Y-%m-%d.log"
# 全局变量
SEEN_TXHASHES = set() # 内存中的已处理交易哈希集合
GIT_INTERVAL = 3600 # Git提交间隔(秒)
FETCH_INTERVAL = 10 # API轮询间隔(秒)
LOGGER = None # 添加全局logger变量
def setup_logging():
"""配置日志记录"""
global LOGGER # 声明使用全局logger
Path(LOG_DIR).mkdir(exist_ok=True)
log_file = Path("output.log")
logging.basicConfig(
level=logging.WARNING, # 只记录WARNING及以上级别
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_file, encoding="utf-8"),
],
)
LOGGER = logging.getLogger(__name__) # 赋值给全局logger
return LOGGER
def fetch_latest_transactions():
"""从API获取最新交易数据并过滤已处理记录"""
try:
response = requests.get(API_URL, timeout=10)
response.raise_for_status()
transactions = response.json()
# transactions = json.loads(
# open("D:/DOWNLOADS/last.json", "r", encoding="utf-8").read()
# )
return [tx for tx in transactions if tx["transactionHash"] not in SEEN_TXHASHES]
except requests.RequestException as e:
LOGGER.error("API请求失败: %s", e)
return None
def calculate_daily_stats(transactions):
"""计算每日统计数据(开盘价、收盘价、最高价、最低价、交易量)"""
daily_data = defaultdict(
lambda: {
"open": None,
"close": None,
"high": float("-inf"),
"low": float("inf"),
"tokensSold": 0,
}
)
for tx in transactions:
# 使用本地时区转换时间戳
date = datetime.fromtimestamp(tx["blockTimestamp"]).strftime("%Y-%m-%d")
price = tx.get("price", 0)
tokens_sold = tx.get("tokensSold", 0)
if daily_data[date]["open"] is None:
daily_data[date]["open"] = price
daily_data[date]["close"] = price
daily_data[date]["high"] = max(daily_data[date]["high"], price)
daily_data[date]["low"] = min(daily_data[date]["low"], price)
daily_data[date]["tokensSold"] += tokens_sold
return [
{
"date": date,
"open": data["open"],
"close": data["close"],
"high": data["high"],
"low": data["low"],
"tokensSold": data["tokensSold"],
}
for date, data in daily_data.items()
]
def update_json_file(new_data):
"""Update JSON file with new transaction data."""
try:
LOGGER.info("开始更新JSON文件收到%d条新交易数据", len(new_data))
# 读取现有数据,如果文件不存在则初始化为空数组
try:
with open(JSON_FILE, "r", encoding="utf-8") as file:
existing_data = json.load(file)
except FileNotFoundError:
existing_data = []
# 确保existing_data是列表
if not isinstance(existing_data, list):
existing_data = []
# 计算每日统计数据
processed_data = calculate_daily_stats(new_data)
# 合并数据而不是追加
existing_dates = {item["date"]: item for item in existing_data}
for new_item in processed_data:
date = new_item["date"]
if date in existing_dates:
# 合并相同日期的记录
existing_item = existing_dates[date]
existing_item["close"] = new_item["close"]
existing_item["high"] = max(existing_item["high"], new_item["high"])
existing_item["low"] = min(existing_item["low"], new_item["low"])
existing_item["tokensSold"] += new_item["tokensSold"]
else:
existing_data.append(new_item)
existing_dates[date] = new_item
# 写回文件
with open(JSON_FILE, "w", encoding="utf-8") as file:
json.dump(existing_data, file, indent=4, ensure_ascii=False)
LOGGER.info(
"成功更新%s,合并%d条记录,总计%d条记录",
JSON_FILE,
len(processed_data),
len(existing_data),
)
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
LOGGER.error("更新JSON文件时发生错误: %s,跳过本次更新", e)
def git_commit_and_push():
"""自动提交并推送数据更新到Git仓库"""
try:
subprocess.run(["git", "add", "."], check=True)
subprocess.run(
[
"git",
"commit",
"--no-verify",
"--no-gpg-sign",
"-m",
f"Auto update at {datetime.now()}",
],
check=True,
)
subprocess.run(["git", "push"], check=True)
LOGGER.info("Git提交成功")
except subprocess.CalledProcessError as e:
LOGGER.error("Git操作失败: %s", e)
def general_info():
"""获取general信息"""
LOGGER.info("获取general信息")
try:
process = subprocess.Popen(["python3", "general.py"])
process.wait()
except subprocess.CalledProcessError as e:
LOGGER.error("获取general信息失败: %s", e)
def load_processed_hashes():
"""Load processed transaction hashes from file."""
try:
with open(HASH_FILE, "r", encoding="utf-8") as file:
return set(json.load(file))
except (FileNotFoundError, json.JSONDecodeError):
return set()
def save_processed_hashes():
"""Save processed transaction hashes to file."""
try:
# 读取现有数据
try:
with open(HASH_FILE, "r", encoding="utf-8") as file:
existing_data = json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
existing_data = []
# 合并新哈希并去重,保持原有顺序
new_hashes = [h for h in SEEN_TXHASHES if h not in existing_data]
updated_data = existing_data + new_hashes
# 写回文件
with open(HASH_FILE, "w", encoding="utf-8") as file:
json.dump(updated_data, file, indent=4, ensure_ascii=False)
LOGGER.info(
"成功更新%s,新增%d条哈希,总计%d条记录",
HASH_FILE,
len(new_hashes),
len(updated_data),
)
except IOError as e:
LOGGER.error("保存已处理交易哈希时发生错误: %s", e)
def main():
"""主循环,定期获取数据并更新"""
global SEEN_TXHASHES, LOGGER # 添加logger到全局声明
setup_logging() # 初始化logger
LOGGER.info("程序启动")
SEEN_TXHASHES = load_processed_hashes()
last_git_time = 0
while True:
setup_logging()
try:
if new_transactions := fetch_latest_transactions():
LOGGER.info("获取到%d条新交易", len(new_transactions))
SEEN_TXHASHES.update(tx["transactionHash"] for tx in new_transactions)
save_processed_hashes()
update_json_file(new_transactions)
current_time = time.time()
if current_time - last_git_time >= GIT_INTERVAL:
general_info()
git_commit_and_push()
last_git_time = current_time
time.sleep(FETCH_INTERVAL)
except Exception as e:
LOGGER.error("错误: %s,继续运行...", e)
continue
if __name__ == "__main__":
main()