[volo-vault]: add support for Volo Vault on Sui

This commit is contained in:
2025-10-04 15:34:06 +08:00
parent 34df2b6120
commit 3a12f39425
6 changed files with 237 additions and 4 deletions

5
.gitignore vendored
View File

@@ -1 +1,4 @@
*.log
*.log
*.csv
*.json
*.bat

View File

@@ -1,3 +0,0 @@
data/
config/
*.bat

View File

@@ -0,0 +1,8 @@
[
{
"address": "0x",
"vault_id": [
"0x"
]
}
]

225
volo-vault/main.py Normal file
View File

@@ -0,0 +1,225 @@
import json
import csv
import os
import logging
from datetime import datetime
import requests
from decimal import Decimal
class VoloVaultTracker:
def __init__(self, config_dir="config"):
self.config_dir = config_dir
self.setup_logging()
self.load_configs()
def setup_logging(self):
"""设置日志系统"""
os.makedirs("logs", exist_ok=True)
log_format = "%(asctime)s - %(levelname)s - %(message)s"
log_filename = f"logs/{datetime.now().strftime('%Y-%m-%d')}.log"
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(log_filename, encoding="utf-8"),
logging.StreamHandler(),
],
)
self.logger = logging.getLogger(__name__)
self.logger.info("初始化日志系统")
def get_local_timestamp(self):
"""获取本地时区的时间戳"""
return datetime.now().strftime("%Y-%m-%dT%H:%M")
def load_configs(self):
"""从JSON文件加载配置"""
try:
with open(f"{self.config_dir}/config.json", "r", encoding="utf-8") as f:
self.config = json.load(f)
self.logger.info(f"加载配置: {len(self.config)} 个地址")
except FileNotFoundError:
self.logger.error("配置文件缺失: config.json")
self.config = []
except json.JSONDecodeError as e:
self.logger.error(f"配置文件格式错误 - config.json: {e}")
self.config = []
os.makedirs("data", exist_ok=True)
def get_user_position(self, address):
"""获取用户仓位数据"""
try:
url = f"https://vault-api.volosui.com/api/v1/users/{address}/position"
response = requests.get(url)
response.raise_for_status()
return response.json()
except Exception as e:
self.logger.error(f"获取用户仓位数据失败 {address}: {e}")
return None
def get_vaults_info(self):
"""获取所有金库信息"""
try:
url = "https://vault-api.volosui.com/api/v1/vaults"
response = requests.get(url)
response.raise_for_status()
return response.json()
except Exception as e:
self.logger.error(f"获取金库信息失败: {e}")
return None
def read_previous_data(self, file_path):
"""读取之前的数据用于计算变化量"""
if not os.path.exists(file_path):
return None
try:
with open(file_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
rows = list(reader)
if rows:
return rows[-1]
except Exception as e:
self.logger.error(f"读取历史数据失败 {file_path}: {e}")
return None
def save_user_position_data(self, address, vault_id, position_data, vault_info):
"""保存用户仓位数据到CSV"""
vault_name = vault_info["name"]
data_file = f"data/data_{address}_{vault_name}_{vault_id}.csv"
previous_data = self.read_previous_data(data_file)
pool_share_token_balance_change = "0"
pool_share_token_usd_change = "0"
yield_lifetime_amount_change = "0"
yield_lifetime_usd_change = "0"
if previous_data:
prev_balance = Decimal(previous_data["代币数量"])
prev_usd = Decimal(previous_data["代币价值USD"])
prev_yield_amount = Decimal(previous_data["累计收益数量"])
prev_yield_usd = Decimal(previous_data["累计收益价值USD"])
pool_share_token_balance_change = str(Decimal(str(position_data["poolShareTokenBalance"])) - prev_balance)
pool_share_token_usd_change = str(Decimal(str(position_data["poolShareTokenUsd"])) - prev_usd)
yield_lifetime_amount_change = str(Decimal(str(position_data["yieldLifetimeAmount"])) - prev_yield_amount)
yield_lifetime_usd_change = str(Decimal(str(position_data["yieldLifetimeUsd"])) - prev_yield_usd)
data_row = {
"时间": self.get_local_timestamp(),
"实时APR": str(position_data["vaultApr"]),
"份额": str(position_data["shares"]),
"代币价格": str(position_data["tokenPrice"]),
"代币数量": str(position_data["poolShareTokenBalance"]),
"代币数量变化量": pool_share_token_balance_change,
"代币价值USD": str(position_data["poolShareTokenUsd"]),
"代币价值USD变化量": pool_share_token_usd_change,
"累计收益数量": str(position_data["yieldLifetimeAmount"]),
"累计收益数量变化量": yield_lifetime_amount_change,
"累计收益价值USD": str(position_data["yieldLifetimeUsd"]),
"累计收益价值USD变化量": yield_lifetime_usd_change
}
file_exists = os.path.exists(data_file)
with open(data_file, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=data_row.keys())
if not file_exists:
writer.writeheader()
writer.writerow(data_row)
self.logger.info(f"保存用户仓位数据: {os.path.basename(data_file)}")
def save_vault_data(self, vault_info):
"""保存金库数据到CSV"""
vault_id = vault_info["id"]
vault_name = vault_info["name"]
vault_file = f"data/vault_{vault_name}_{vault_id}.csv"
data_row = {
"时间": self.get_local_timestamp(),
"实时APR": str(vault_info.get("instantAPR", "")),
"7天APY": str(vault_info.get("apy7d", {}).get("value", "")),
"30天APY": str(vault_info.get("apy30d", {}).get("value", "")),
"总份额": str(vault_info.get("totalShares", "")),
"代币价格": str(vault_info.get("coinPrice", "")),
"总代币数量": str(vault_info.get("totalStaked", "")),
"总代币价值USD": str(vault_info.get("totalStakedUsd", "")),
"上限代币数量": str(vault_info.get("stakeCapAmount", ""))
}
file_exists = os.path.exists(vault_file)
with open(vault_file, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=data_row.keys())
if not file_exists:
writer.writeheader()
writer.writerow(data_row)
self.logger.info(f"保存金库数据: {os.path.basename(vault_file)}")
def track_all_positions(self):
"""跟踪所有仓位"""
self.logger.info(f"开始处理 {len(self.config)} 个地址")
vaults_info = self.get_vaults_info()
if not vaults_info:
self.logger.error("获取金库信息失败,无法继续处理")
return
for vault in vaults_info.get("data", []):
self.save_vault_data(vault)
success_count = 0
for i, config_item in enumerate(self.config, 1):
address = config_item["address"]
self.logger.info(f"[{i}/{len(self.config)}] 处理地址: {address}")
position_data = self.get_user_position(address)
if not position_data:
self.logger.error(f"获取用户仓位数据失败: {address}")
continue
target_vault_ids = config_item.get("vault_id", [])
for vault_id in target_vault_ids:
found_position = None
for position in position_data:
if position.get("vaultId") == vault_id:
found_position = position
break
if found_position:
vault_info = None
for vault in vaults_info.get("data", []):
if vault["id"] == vault_id:
vault_info = vault
break
if vault_info:
self.save_user_position_data(address, vault_id, found_position, vault_info)
success_count += 1
else:
self.logger.warning(f"未找到金库信息: {vault_id}")
else:
self.logger.warning(f"未找到vault_id {vault_id} 在地址 {address} 的仓位中")
self.logger.info(f"处理完成: 成功 {success_count} 个仓位")
def main():
"""主函数"""
try:
tracker = VoloVaultTracker("config")
tracker.track_all_positions()
except Exception as e:
logging.error(f"程序执行失败: {e}")
raise
if __name__ == "__main__":
main()