init
This commit is contained in:
234
main.py
Normal file
234
main.py
Normal file
@@ -0,0 +1,234 @@
|
||||
"""DEXRP价格数据收集与分析模块,自动获取交易数据并生成每日统计"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
# 配置常量
|
||||
API_URL = "https://backend.dexrp.io/vending/last" # DEXRP API地址
|
||||
JSON_FILE = "price.json" # 价格数据存储文件
|
||||
HASH_FILE = "processed.json" # 已处理交易哈希记录文件
|
||||
|
||||
# 全局变量
|
||||
SEEN_TXHASHES = set() # 内存中的已处理交易哈希集合
|
||||
GIT_INTERVAL = 86400 # Git提交间隔(秒) 1天
|
||||
FETCH_INTERVAL = 10 # API轮询间隔(秒)
|
||||
LOGGER = None # 添加全局logger变量
|
||||
|
||||
|
||||
def setup_logging():
|
||||
"""配置日志记录"""
|
||||
global LOGGER # 声明使用全局logger
|
||||
log_file = Path("output.log")
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.WARNING, # 只记录WARNING及以上级别
|
||||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler(log_file, encoding="utf-8"),
|
||||
],
|
||||
)
|
||||
LOGGER = logging.getLogger(__name__) # 赋值给全局logger
|
||||
return LOGGER
|
||||
|
||||
|
||||
def fetch_latest_transactions():
|
||||
"""从API获取最新交易数据并过滤已处理记录"""
|
||||
try:
|
||||
response = requests.get(API_URL, timeout=10)
|
||||
response.raise_for_status()
|
||||
transactions = response.json()
|
||||
return [tx for tx in transactions if tx["transactionHash"] not in SEEN_TXHASHES]
|
||||
except requests.RequestException as e:
|
||||
LOGGER.error("API请求失败: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def calculate_daily_stats(transactions):
|
||||
"""计算每日统计数据(开盘价、收盘价、最高价、最低价、交易量)"""
|
||||
daily_data = defaultdict(
|
||||
lambda: {
|
||||
"open": None,
|
||||
"close": None,
|
||||
"high": float("-inf"),
|
||||
"low": float("inf"),
|
||||
"tokensSold": 0,
|
||||
}
|
||||
)
|
||||
|
||||
for tx in transactions:
|
||||
# 使用本地时区转换时间戳
|
||||
date = datetime.fromtimestamp(tx["blockTimestamp"]).strftime("%Y-%m-%d")
|
||||
price = tx.get("price", 0)
|
||||
tokens_sold = tx.get("tokensSold", 0)
|
||||
|
||||
if daily_data[date]["open"] is None:
|
||||
daily_data[date]["open"] = price
|
||||
daily_data[date]["close"] = price
|
||||
daily_data[date]["high"] = max(daily_data[date]["high"], price)
|
||||
daily_data[date]["low"] = min(daily_data[date]["low"], price)
|
||||
daily_data[date]["tokensSold"] += tokens_sold
|
||||
|
||||
return [
|
||||
{
|
||||
"date": date,
|
||||
"open": data["open"],
|
||||
"close": data["close"],
|
||||
"high": data["high"],
|
||||
"low": data["low"],
|
||||
"tokensSold": data["tokensSold"],
|
||||
}
|
||||
for date, data in daily_data.items()
|
||||
]
|
||||
|
||||
|
||||
def update_json_file(new_data):
|
||||
"""Update JSON file with new transaction data."""
|
||||
try:
|
||||
LOGGER.info("开始更新JSON文件,收到%d条新交易数据", len(new_data))
|
||||
# 读取现有数据,如果文件不存在则初始化为空数组
|
||||
try:
|
||||
with open(JSON_FILE, "r", encoding="utf-8") as file:
|
||||
existing_data = json.load(file)
|
||||
except FileNotFoundError:
|
||||
existing_data = []
|
||||
|
||||
# 确保existing_data是列表
|
||||
if not isinstance(existing_data, list):
|
||||
existing_data = []
|
||||
|
||||
# 计算每日统计数据
|
||||
processed_data = calculate_daily_stats(new_data)
|
||||
|
||||
# 合并数据而不是追加
|
||||
existing_dates = {item["date"]: item for item in existing_data}
|
||||
for new_item in processed_data:
|
||||
date = new_item["date"]
|
||||
if date in existing_dates:
|
||||
# 合并相同日期的记录
|
||||
existing_item = existing_dates[date]
|
||||
existing_item["close"] = new_item["close"]
|
||||
existing_item["high"] = max(existing_item["high"], new_item["high"])
|
||||
existing_item["low"] = min(existing_item["low"], new_item["low"])
|
||||
existing_item["tokensSold"] += new_item["tokensSold"]
|
||||
else:
|
||||
existing_data.append(new_item)
|
||||
existing_dates[date] = new_item
|
||||
|
||||
# 写回文件
|
||||
with open(JSON_FILE, "w", encoding="utf-8") as file:
|
||||
json.dump(existing_data, file, indent=4, ensure_ascii=False)
|
||||
|
||||
LOGGER.info(
|
||||
"成功更新%s,合并%d条记录,总计%d条记录",
|
||||
JSON_FILE,
|
||||
len(processed_data),
|
||||
len(existing_data),
|
||||
)
|
||||
except (FileNotFoundError, json.JSONDecodeError, IOError) as e:
|
||||
LOGGER.error("更新JSON文件时发生错误: %s,跳过本次更新", e)
|
||||
|
||||
|
||||
def git_commit_and_push():
|
||||
"""自动提交并推送数据更新到Git仓库"""
|
||||
try:
|
||||
subprocess.run(["git", "add", "."], check=True)
|
||||
subprocess.run(
|
||||
[
|
||||
"git",
|
||||
"commit",
|
||||
"--no-verify",
|
||||
"--no-gpg-sign",
|
||||
"-m",
|
||||
str(datetime.now()),
|
||||
],
|
||||
check=True,
|
||||
)
|
||||
subprocess.run(["git", "push"], check=True)
|
||||
LOGGER.info("Git提交成功")
|
||||
except subprocess.CalledProcessError as e:
|
||||
LOGGER.error("Git操作失败: %s", e)
|
||||
|
||||
def general_info():
|
||||
"""获取general信息"""
|
||||
LOGGER.info("获取general信息")
|
||||
try:
|
||||
process = subprocess.Popen(["python3", "general.py"])
|
||||
process.wait()
|
||||
except subprocess.CalledProcessError as e:
|
||||
LOGGER.error("获取general信息失败: %s", e)
|
||||
|
||||
def load_processed_hashes():
|
||||
"""Load processed transaction hashes from file."""
|
||||
try:
|
||||
with open(HASH_FILE, "r", encoding="utf-8") as file:
|
||||
return set(json.load(file))
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return set()
|
||||
|
||||
|
||||
def save_processed_hashes():
|
||||
"""Save processed transaction hashes to file."""
|
||||
try:
|
||||
# 读取现有数据
|
||||
try:
|
||||
with open(HASH_FILE, "r", encoding="utf-8") as file:
|
||||
existing_data = json.load(file)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
existing_data = []
|
||||
|
||||
# 合并新哈希并去重,保持原有顺序
|
||||
new_hashes = [h for h in SEEN_TXHASHES if h not in existing_data]
|
||||
updated_data = existing_data + new_hashes
|
||||
|
||||
# 写回文件
|
||||
with open(HASH_FILE, "w", encoding="utf-8") as file:
|
||||
json.dump(updated_data, file, indent=4, ensure_ascii=False)
|
||||
|
||||
LOGGER.info(
|
||||
"成功更新%s,新增%d条哈希,总计%d条记录",
|
||||
HASH_FILE,
|
||||
len(new_hashes),
|
||||
len(updated_data),
|
||||
)
|
||||
except IOError as e:
|
||||
LOGGER.error("保存已处理交易哈希时发生错误: %s", e)
|
||||
|
||||
|
||||
def main():
|
||||
"""主循环,定期获取数据并更新"""
|
||||
global SEEN_TXHASHES, LOGGER # 添加logger到全局声明
|
||||
setup_logging() # 初始化logger
|
||||
LOGGER.info("程序启动")
|
||||
SEEN_TXHASHES = load_processed_hashes()
|
||||
last_git_time = 0
|
||||
|
||||
while True:
|
||||
setup_logging()
|
||||
try:
|
||||
if new_transactions := fetch_latest_transactions():
|
||||
LOGGER.info("获取到%d条新交易", len(new_transactions))
|
||||
SEEN_TXHASHES.update(tx["transactionHash"] for tx in new_transactions)
|
||||
save_processed_hashes()
|
||||
update_json_file(new_transactions)
|
||||
|
||||
current_time = time.time()
|
||||
if current_time - last_git_time >= GIT_INTERVAL:
|
||||
general_info()
|
||||
git_commit_and_push()
|
||||
last_git_time = current_time
|
||||
|
||||
time.sleep(FETCH_INTERVAL)
|
||||
except Exception as e:
|
||||
LOGGER.error("错误: %s,继续运行...", e)
|
||||
continue
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user