import json from decimal import Decimal import csv import os import logging from datetime import datetime from web3 import Web3 class UniversalLPPositionTracker: ERC20_ABI = [ { "constant": True, "inputs": [], "name": "symbol", "outputs": [{"name": "", "type": "string"}], "type": "function", }, { "constant": True, "inputs": [], "name": "decimals", "outputs": [{"name": "", "type": "uint8"}], "type": "function", }, { "constant": True, "inputs": [{"name": "account", "type": "address"}], "name": "balanceOf", "outputs": [{"name": "", "type": "uint256"}], "type": "function", }, { "constant": True, "inputs": [], "name": "totalSupply", "outputs": [{"name": "", "type": "uint256"}], "type": "function", }, ] PAIR_ABI = [ { "constant": True, "inputs": [], "name": "getReserves", "outputs": [ {"name": "reserve0", "type": "uint112"}, {"name": "reserve1", "type": "uint112"}, {"name": "blockTimestampLast", "type": "uint32"}, ], "type": "function", }, { "constant": True, "inputs": [], "name": "token0", "outputs": [{"name": "", "type": "address"}], "type": "function", }, { "constant": True, "inputs": [], "name": "token1", "outputs": [{"name": "", "type": "address"}], "type": "function", }, ] def __init__(self, config_dir="config"): self.config_dir = config_dir self.setup_logging() self.load_configs() def setup_logging(self): """设置日志系统""" os.makedirs("logs", exist_ok=True) log_format = "%(asctime)s - %(levelname)s - %(message)s" log_filename = f"logs/{datetime.now().strftime('%Y-%m-%d')}.log" logging.basicConfig( level=logging.INFO, format=log_format, handlers=[ logging.FileHandler(log_filename, encoding="utf-8"), logging.StreamHandler(), ], ) self.logger = logging.getLogger(__name__) self.logger.info("初始化日志系统") def get_local_timestamp(self): """获取本地时区的时间戳""" return datetime.now().strftime("%Y-%m-%dT%H:%M") def load_configs(self): """从JSON文件加载配置""" try: with open(f"{self.config_dir}/chains.json", "r", encoding="utf-8") as f: self.chain_configs = json.load(f) self.logger.info(f"加载区块链配置: {len(self.chain_configs)} 条链") except FileNotFoundError: self.logger.error("配置文件缺失: chains.json") self.chain_configs = {} except json.JSONDecodeError as e: self.logger.error(f"配置文件格式错误 - chains.json: {e}") self.chain_configs = {} try: with open(f"{self.config_dir}/positions.json", "r", encoding="utf-8") as f: self.positions_config = json.load(f) self.logger.info(f"加载仓位配置: {len(self.positions_config)} 个仓位") except FileNotFoundError: self.logger.error("配置文件缺失: positions.json") self.positions_config = [] except json.JSONDecodeError as e: self.logger.error(f"配置文件格式错误 - positions.json: {e}") self.positions_config = [] os.makedirs("data", exist_ok=True) def get_token_info(self, w3, token_address): """获取代币信息""" try: token_contract = w3.eth.contract( address=w3.to_checksum_address(token_address), abi=self.ERC20_ABI ) symbol = token_contract.functions.symbol().call() decimals = token_contract.functions.decimals().call() return {"address": token_address, "symbol": symbol, "decimals": decimals} except Exception as e: self.logger.warning(f"获取代币信息失败 {token_address}: {e}") return {"address": token_address, "symbol": "UNKNOWN", "decimals": 18} def get_pool_tvl_data(self, position_config): """获取整个资金池的TVL数据""" chain_name = position_config["chain"] pair_address = position_config["pool_address"] if chain_name not in self.chain_configs: self.logger.error(f"不支持的区块链: {chain_name}") return None chain_config = self.chain_configs[chain_name] w3 = Web3(Web3.HTTPProvider(chain_config["rpc_url"])) if not w3.is_connected(): self.logger.error(f"区块链连接失败: {chain_name}") return None try: pair_contract = w3.eth.contract( address=w3.to_checksum_address(pair_address), abi=self.PAIR_ABI + self.ERC20_ABI, ) self.logger.info(f"获取资金池数据: {pair_address}") reserves = pair_contract.functions.getReserves().call() reserve0, reserve1 = reserves[0], reserves[1] token0_address = pair_contract.functions.token0().call() token1_address = pair_contract.functions.token1().call() token0_info = self.get_token_info(w3, token0_address) token1_info = self.get_token_info(w3, token1_address) lp_total_supply = pair_contract.functions.totalSupply().call() token0_amount = Decimal(str(reserve0)) / (10 ** token0_info["decimals"]) token1_amount = Decimal(str(reserve1)) / (10 ** token1_info["decimals"]) lp_total_supply_formatted = Decimal(str(lp_total_supply)) / (10**18) self.logger.info( f"资金池数据 - LP: {lp_total_supply_formatted}, {token0_info['symbol']}: {token0_amount}, {token1_info['symbol']}: {token1_amount}" ) return { "timestamp": self.get_local_timestamp(), "chain": chain_name, "pair_address": pair_address, "lp_total_supply": lp_total_supply_formatted, "token0": {"info": token0_info, "total_amount": token0_amount}, "token1": {"info": token1_info, "total_amount": token1_amount}, } except Exception as e: self.logger.error(f"获取资金池数据失败 {pair_address}: {e}") return None def get_user_position_data(self, position_config): """获取用户仓位数据""" chain_name = position_config["chain"] pair_address = position_config["pool_address"] user_address = position_config["user_address"] if chain_name not in self.chain_configs: self.logger.error(f"不支持的区块链: {chain_name}") return None chain_config = self.chain_configs[chain_name] w3 = Web3(Web3.HTTPProvider(chain_config["rpc_url"])) if not w3.is_connected(): self.logger.error(f"区块链连接失败: {chain_name}") return None try: pair_contract = w3.eth.contract( address=w3.to_checksum_address(pair_address), abi=self.PAIR_ABI + self.ERC20_ABI, ) self.logger.info(f"获取用户仓位: {user_address} -> {pair_address}") user_lp_balance = pair_contract.functions.balanceOf( w3.to_checksum_address(user_address) ).call() lp_total_supply = pair_contract.functions.totalSupply().call() reserves = pair_contract.functions.getReserves().call() reserve0, reserve1 = reserves[0], reserves[1] token0_address = pair_contract.functions.token0().call() token1_address = pair_contract.functions.token1().call() token0_info = self.get_token_info(w3, token0_address) token1_info = self.get_token_info(w3, token1_address) if lp_total_supply > 0: user_share = Decimal(str(user_lp_balance)) / Decimal( str(lp_total_supply) ) else: user_share = Decimal("0") self.logger.warning("LP总供应量为0") user_token0_amount = (Decimal(str(reserve0)) * user_share) / ( 10 ** token0_info["decimals"] ) user_token1_amount = (Decimal(str(reserve1)) * user_share) / ( 10 ** token1_info["decimals"] ) lp_amount = Decimal(str(user_lp_balance)) / (10**18) total_amount = user_token0_amount + user_token1_amount self.logger.info( f"用户仓位 - LP: {lp_amount}, {token0_info['symbol']}: {user_token0_amount}, {token1_info['symbol']}: {user_token1_amount}, 总计: {total_amount}" ) return { "timestamp": self.get_local_timestamp(), "chain": chain_name, "user_address": user_address, "pair_address": pair_address, "lp_amount_formatted": lp_amount, "token0": {"info": token0_info, "amount_formatted": user_token0_amount}, "token1": {"info": token1_info, "amount_formatted": user_token1_amount}, "total_amount": total_amount, } except Exception as e: self.logger.error(f"获取用户仓位失败 {pair_address}: {e}") return None def read_previous_data(self, file_path): """读取之前的数据用于计算变化量""" if not os.path.exists(file_path): return None try: with open(file_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) rows = list(reader) if rows: return rows[-1] except Exception as e: self.logger.error(f"读取历史数据失败 {file_path}: {e}") return None def save_tvl_data(self, tvl_data): """保存TVL数据到单独的CSV文件""" pool_address = tvl_data["pair_address"] tvl_file = f"data/tvl_{pool_address}.csv" previous_data = self.read_previous_data(tvl_file) if previous_data: prev_total_amount = Decimal( previous_data[tvl_data["token0"]["info"]["symbol"]] ) + Decimal(previous_data[tvl_data["token1"]["info"]["symbol"]]) amount_change = ( Decimal(previous_data[tvl_data["token0"]["info"]["symbol"]]) + Decimal(previous_data[tvl_data["token1"]["info"]["symbol"]]) - Decimal(tvl_data["token0"]["total_amount"]) - Decimal(tvl_data["token1"]["total_amount"]) ) self.logger.info(f"数量变化: {amount_change}") else: amount_change = Decimal(tvl_data["token0"]["total_amount"]) + Decimal( tvl_data["token1"]["total_amount"] ) if amount_change == 0: return tvl_row = { "时间": tvl_data["timestamp"], "lptoken": str(tvl_data["lp_total_supply"]), f'{tvl_data["token0"]["info"]["symbol"]}': str( tvl_data["token0"]["total_amount"] ), f'{tvl_data["token1"]["info"]["symbol"]}': str( tvl_data["token1"]["total_amount"] ), } file_exists = os.path.exists(tvl_file) with open(tvl_file, "a", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=tvl_row.keys()) if not file_exists: writer.writeheader() writer.writerow(tvl_row) self.logger.info(f"保存TVL数据: {tvl_file}") def save_user_data(self, user_data): """保存用户仓位数据到CSV""" user_address = user_data["user_address"] pool_address = user_data["pair_address"] data_file = f"data/data_{user_address}_{pool_address}.csv" portfolio_file = f"data/portfolio_{user_address}_{pool_address}.csv" previous_data = self.read_previous_data(data_file) if previous_data: prev_total_amount = Decimal(previous_data["总数量"]) amount_change = user_data["total_amount"] - prev_total_amount self.logger.info(f"数量变化: {amount_change}") else: amount_change = user_data["total_amount"] if amount_change == 0: return data_row = { "时间": user_data["timestamp"], "lptoken": str(user_data["lp_amount_formatted"]), f'{user_data["token0"]["info"]["symbol"]}': str( user_data["token0"]["amount_formatted"] ), f'{user_data["token1"]["info"]["symbol"]}': str( user_data["token1"]["amount_formatted"] ), "总数量": str(user_data["total_amount"]), "变化量": str(amount_change), } file_exists = os.path.exists(data_file) with open(data_file, "a", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=data_row.keys()) if not file_exists: writer.writeheader() writer.writerow(data_row) self.logger.info(f"保存用户数据: {os.path.basename(data_file)}") portfolio_row = { "日期": user_data["timestamp"], "类型": "利息", "净额": str(amount_change), } portfolio_exists = os.path.exists(portfolio_file) with open(portfolio_file, "a", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=portfolio_row.keys()) if not portfolio_exists: writer.writeheader() writer.writerow(portfolio_row) self.logger.info(f"保存投资组合数据: {os.path.basename(portfolio_file)}") def track_all_positions(self): """跟踪所有仓位""" self.logger.info(f"开始处理 {len(self.positions_config)} 个仓位") processed_pools = set() success_count = 0 for i, position_config in enumerate(self.positions_config, 1): user_addr = position_config["user_address"] pool_addr = position_config["pool_address"] self.logger.info( f"[{i}/{len(self.positions_config)}] 处理仓位 {user_addr} -> {pool_addr}" ) pool_key = position_config["pool_address"] if pool_key not in processed_pools: tvl_data = self.get_pool_tvl_data(position_config) if tvl_data: self.save_tvl_data(tvl_data) processed_pools.add(pool_key) else: self.logger.error("资金池数据获取失败,跳过该仓位") continue user_data = self.get_user_position_data(position_config) if user_data: self.save_user_data(user_data) success_count += 1 else: self.logger.error("用户仓位数据获取失败") self.logger.info( f"处理完成: 成功 {success_count}/{len(self.positions_config)} 个仓位, {len(processed_pools)} 个资金池" ) def main(): """主函数""" try: tracker = UniversalLPPositionTracker("config") tracker.track_all_positions() except Exception as e: logging.error(f"程序执行失败: {e}") raise if __name__ == "__main__": main()