diff --git a/.gitignore b/.gitignore index bf0824e..186e949 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -*.log \ No newline at end of file +*.log +*.csv +*.json +*.bat \ No newline at end of file diff --git a/uniswap-v2/.gitignore b/uniswap-v2/.gitignore deleted file mode 100644 index 0bfa346..0000000 --- a/uniswap-v2/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -data/ -config/ -*.bat \ No newline at end of file diff --git a/uniswap-v2/config/chains.json b/uniswap-v2/config/chains.json.txt similarity index 100% rename from uniswap-v2/config/chains.json rename to uniswap-v2/config/chains.json.txt diff --git a/uniswap-v2/config/positions.json b/uniswap-v2/config/positions.json.txt similarity index 100% rename from uniswap-v2/config/positions.json rename to uniswap-v2/config/positions.json.txt diff --git a/volo-vault/config/config.json.txt b/volo-vault/config/config.json.txt new file mode 100644 index 0000000..b47a3e8 --- /dev/null +++ b/volo-vault/config/config.json.txt @@ -0,0 +1,8 @@ +[ + { + "address": "0x", + "vault_id": [ + "0x" + ] + } +] \ No newline at end of file diff --git a/volo-vault/main.py b/volo-vault/main.py new file mode 100644 index 0000000..98a594c --- /dev/null +++ b/volo-vault/main.py @@ -0,0 +1,225 @@ +import json +import csv +import os +import logging +from datetime import datetime +import requests +from decimal import Decimal + + +class VoloVaultTracker: + def __init__(self, config_dir="config"): + self.config_dir = config_dir + self.setup_logging() + self.load_configs() + + def setup_logging(self): + """设置日志系统""" + os.makedirs("logs", exist_ok=True) + + log_format = "%(asctime)s - %(levelname)s - %(message)s" + log_filename = f"logs/{datetime.now().strftime('%Y-%m-%d')}.log" + + logging.basicConfig( + level=logging.INFO, + format=log_format, + handlers=[ + logging.FileHandler(log_filename, encoding="utf-8"), + logging.StreamHandler(), + ], + ) + + self.logger = logging.getLogger(__name__) + self.logger.info("初始化日志系统") + + def get_local_timestamp(self): + """获取本地时区的时间戳""" + return datetime.now().strftime("%Y-%m-%dT%H:%M") + + def load_configs(self): + """从JSON文件加载配置""" + try: + with open(f"{self.config_dir}/config.json", "r", encoding="utf-8") as f: + self.config = json.load(f) + self.logger.info(f"加载配置: {len(self.config)} 个地址") + except FileNotFoundError: + self.logger.error("配置文件缺失: config.json") + self.config = [] + except json.JSONDecodeError as e: + self.logger.error(f"配置文件格式错误 - config.json: {e}") + self.config = [] + + os.makedirs("data", exist_ok=True) + + def get_user_position(self, address): + """获取用户仓位数据""" + try: + url = f"https://vault-api.volosui.com/api/v1/users/{address}/position" + response = requests.get(url) + response.raise_for_status() + return response.json() + except Exception as e: + self.logger.error(f"获取用户仓位数据失败 {address}: {e}") + return None + + def get_vaults_info(self): + """获取所有金库信息""" + try: + url = "https://vault-api.volosui.com/api/v1/vaults" + response = requests.get(url) + response.raise_for_status() + return response.json() + except Exception as e: + self.logger.error(f"获取金库信息失败: {e}") + return None + + def read_previous_data(self, file_path): + """读取之前的数据用于计算变化量""" + if not os.path.exists(file_path): + return None + + try: + with open(file_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + rows = list(reader) + if rows: + return rows[-1] + except Exception as e: + self.logger.error(f"读取历史数据失败 {file_path}: {e}") + return None + + def save_user_position_data(self, address, vault_id, position_data, vault_info): + """保存用户仓位数据到CSV""" + vault_name = vault_info["name"] + data_file = f"data/data_{address}_{vault_name}_{vault_id}.csv" + + previous_data = self.read_previous_data(data_file) + + pool_share_token_balance_change = "0" + pool_share_token_usd_change = "0" + yield_lifetime_amount_change = "0" + yield_lifetime_usd_change = "0" + + if previous_data: + prev_balance = Decimal(previous_data["代币数量"]) + prev_usd = Decimal(previous_data["代币价值USD"]) + prev_yield_amount = Decimal(previous_data["累计收益数量"]) + prev_yield_usd = Decimal(previous_data["累计收益价值USD"]) + + pool_share_token_balance_change = str(Decimal(str(position_data["poolShareTokenBalance"])) - prev_balance) + pool_share_token_usd_change = str(Decimal(str(position_data["poolShareTokenUsd"])) - prev_usd) + yield_lifetime_amount_change = str(Decimal(str(position_data["yieldLifetimeAmount"])) - prev_yield_amount) + yield_lifetime_usd_change = str(Decimal(str(position_data["yieldLifetimeUsd"])) - prev_yield_usd) + + data_row = { + "时间": self.get_local_timestamp(), + "实时APR": str(position_data["vaultApr"]), + "份额": str(position_data["shares"]), + "代币价格": str(position_data["tokenPrice"]), + "代币数量": str(position_data["poolShareTokenBalance"]), + "代币数量变化量": pool_share_token_balance_change, + "代币价值USD": str(position_data["poolShareTokenUsd"]), + "代币价值USD变化量": pool_share_token_usd_change, + "累计收益数量": str(position_data["yieldLifetimeAmount"]), + "累计收益数量变化量": yield_lifetime_amount_change, + "累计收益价值USD": str(position_data["yieldLifetimeUsd"]), + "累计收益价值USD变化量": yield_lifetime_usd_change + } + + file_exists = os.path.exists(data_file) + with open(data_file, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=data_row.keys()) + if not file_exists: + writer.writeheader() + writer.writerow(data_row) + + self.logger.info(f"保存用户仓位数据: {os.path.basename(data_file)}") + + def save_vault_data(self, vault_info): + """保存金库数据到CSV""" + vault_id = vault_info["id"] + vault_name = vault_info["name"] + vault_file = f"data/vault_{vault_name}_{vault_id}.csv" + + data_row = { + "时间": self.get_local_timestamp(), + "实时APR": str(vault_info.get("instantAPR", "")), + "7天APY": str(vault_info.get("apy7d", {}).get("value", "")), + "30天APY": str(vault_info.get("apy30d", {}).get("value", "")), + "总份额": str(vault_info.get("totalShares", "")), + "代币价格": str(vault_info.get("coinPrice", "")), + "总代币数量": str(vault_info.get("totalStaked", "")), + "总代币价值USD": str(vault_info.get("totalStakedUsd", "")), + "上限代币数量": str(vault_info.get("stakeCapAmount", "")) + } + + file_exists = os.path.exists(vault_file) + with open(vault_file, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=data_row.keys()) + if not file_exists: + writer.writeheader() + writer.writerow(data_row) + + self.logger.info(f"保存金库数据: {os.path.basename(vault_file)}") + + def track_all_positions(self): + """跟踪所有仓位""" + self.logger.info(f"开始处理 {len(self.config)} 个地址") + + vaults_info = self.get_vaults_info() + if not vaults_info: + self.logger.error("获取金库信息失败,无法继续处理") + return + + for vault in vaults_info.get("data", []): + self.save_vault_data(vault) + + success_count = 0 + + for i, config_item in enumerate(self.config, 1): + address = config_item["address"] + self.logger.info(f"[{i}/{len(self.config)}] 处理地址: {address}") + + position_data = self.get_user_position(address) + if not position_data: + self.logger.error(f"获取用户仓位数据失败: {address}") + continue + + target_vault_ids = config_item.get("vault_id", []) + for vault_id in target_vault_ids: + found_position = None + for position in position_data: + if position.get("vaultId") == vault_id: + found_position = position + break + + if found_position: + vault_info = None + for vault in vaults_info.get("data", []): + if vault["id"] == vault_id: + vault_info = vault + break + + if vault_info: + self.save_user_position_data(address, vault_id, found_position, vault_info) + success_count += 1 + else: + self.logger.warning(f"未找到金库信息: {vault_id}") + else: + self.logger.warning(f"未找到vault_id {vault_id} 在地址 {address} 的仓位中") + + self.logger.info(f"处理完成: 成功 {success_count} 个仓位") + + +def main(): + """主函数""" + try: + tracker = VoloVaultTracker("config") + tracker.track_all_positions() + except Exception as e: + logging.error(f"程序执行失败: {e}") + raise + + +if __name__ == "__main__": + main() \ No newline at end of file