commit 1eed2c4a6172ce04a5c6dcd72ff20934ee4c9ba7 Author: Zichao Lin Date: Sat Sep 27 11:00:35 2025 +0800 feat(uniswap-v2): add support for uniswap v2 positions diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bf0824e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.log \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..b556b89 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,8 @@ +[MASTER] +disable= + broad-exception-caught, + logging-fstring-interpolation, + line-too-long, + missing-class-docstring, + missing-module-docstring, + missing-function-docstring, \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3290557 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +web3 \ No newline at end of file diff --git a/uniswap-v2/.gitignore b/uniswap-v2/.gitignore new file mode 100644 index 0000000..77f030e --- /dev/null +++ b/uniswap-v2/.gitignore @@ -0,0 +1,2 @@ +data/ +*.bat \ No newline at end of file diff --git a/uniswap-v2/config/chains.json b/uniswap-v2/config/chains.json new file mode 100644 index 0000000..03844e2 --- /dev/null +++ b/uniswap-v2/config/chains.json @@ -0,0 +1,10 @@ +{ + "polygon": { + "rpc_url": "https://polygon-bor-rpc.publicnode.com", + "uniswap_v2_factory": "0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32" + }, + "polygon1111": { + "rpc_url": "https://polygon-bor-rpc.publicnode.com", + "uniswap_v2_factory": "0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32" + } +} \ No newline at end of file diff --git a/uniswap-v2/config/positions.json b/uniswap-v2/config/positions.json new file mode 100644 index 0000000..f1d1dfe --- /dev/null +++ b/uniswap-v2/config/positions.json @@ -0,0 +1,14 @@ +[ + { + "name(optional)": "DAI-USDT0", + "chain": "polygon", + "user_address": "0x", + "pool_address": "0x" + }, + { + "name(optional)": "DAI-USDC", + "chain": "polygon", + "user_address": "0x", + "pool_address": "0x" + } +] \ No newline at end of file diff --git a/uniswap-v2/main.py b/uniswap-v2/main.py new file mode 100644 index 0000000..bee2d6a --- /dev/null +++ b/uniswap-v2/main.py @@ -0,0 +1,433 @@ +import json +from decimal import Decimal +import csv +import os +import logging +from datetime import datetime +from web3 import Web3 + + +class UniversalLPPositionTracker: + ERC20_ABI = [ + { + "constant": True, + "inputs": [], + "name": "symbol", + "outputs": [{"name": "", "type": "string"}], + "type": "function", + }, + { + "constant": True, + "inputs": [], + "name": "decimals", + "outputs": [{"name": "", "type": "uint8"}], + "type": "function", + }, + { + "constant": True, + "inputs": [{"name": "account", "type": "address"}], + "name": "balanceOf", + "outputs": [{"name": "", "type": "uint256"}], + "type": "function", + }, + { + "constant": True, + "inputs": [], + "name": "totalSupply", + "outputs": [{"name": "", "type": "uint256"}], + "type": "function", + }, + ] + + PAIR_ABI = [ + { + "constant": True, + "inputs": [], + "name": "getReserves", + "outputs": [ + {"name": "reserve0", "type": "uint112"}, + {"name": "reserve1", "type": "uint112"}, + {"name": "blockTimestampLast", "type": "uint32"}, + ], + "type": "function", + }, + { + "constant": True, + "inputs": [], + "name": "token0", + "outputs": [{"name": "", "type": "address"}], + "type": "function", + }, + { + "constant": True, + "inputs": [], + "name": "token1", + "outputs": [{"name": "", "type": "address"}], + "type": "function", + }, + ] + + def __init__(self, config_dir="config"): + self.config_dir = config_dir + self.setup_logging() + self.load_configs() + + def setup_logging(self): + """设置日志系统""" + os.makedirs("logs", exist_ok=True) + + log_format = "%(asctime)s - %(levelname)s - %(message)s" + log_filename = f"logs/{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + + logging.basicConfig( + level=logging.INFO, + format=log_format, + handlers=[ + logging.FileHandler(log_filename, encoding="utf-8"), + logging.StreamHandler(), + ], + ) + + self.logger = logging.getLogger(__name__) + self.logger.info("初始化日志系统") + + def get_local_timestamp(self): + """获取本地时区的时间戳""" + return datetime.now().strftime("%Y-%m-%dT%H:%M") + + def load_configs(self): + """从JSON文件加载配置""" + try: + with open(f"{self.config_dir}/chains.json", "r", encoding="utf-8") as f: + self.chain_configs = json.load(f) + self.logger.info(f"加载区块链配置: {len(self.chain_configs)} 条链") + except FileNotFoundError: + self.logger.error("配置文件缺失: chains.json") + self.chain_configs = {} + except json.JSONDecodeError as e: + self.logger.error(f"配置文件格式错误 - chains.json: {e}") + self.chain_configs = {} + + try: + with open(f"{self.config_dir}/positions.json", "r", encoding="utf-8") as f: + self.positions_config = json.load(f) + self.logger.info(f"加载仓位配置: {len(self.positions_config)} 个仓位") + except FileNotFoundError: + self.logger.error("配置文件缺失: positions.json") + self.positions_config = [] + except json.JSONDecodeError as e: + self.logger.error(f"配置文件格式错误 - positions.json: {e}") + self.positions_config = [] + + os.makedirs("data", exist_ok=True) + + def get_token_info(self, w3, token_address): + """获取代币信息""" + try: + token_contract = w3.eth.contract( + address=w3.to_checksum_address(token_address), abi=self.ERC20_ABI + ) + + symbol = token_contract.functions.symbol().call() + decimals = token_contract.functions.decimals().call() + + return {"address": token_address, "symbol": symbol, "decimals": decimals} + except Exception as e: + self.logger.warning(f"获取代币信息失败 {token_address}: {e}") + return {"address": token_address, "symbol": "UNKNOWN", "decimals": 18} + + def get_pool_tvl_data(self, position_config): + """获取整个资金池的TVL数据""" + chain_name = position_config["chain"] + pair_address = position_config["pool_address"] + + if chain_name not in self.chain_configs: + self.logger.error(f"不支持的区块链: {chain_name}") + return None + + chain_config = self.chain_configs[chain_name] + w3 = Web3(Web3.HTTPProvider(chain_config["rpc_url"])) + + if not w3.is_connected(): + self.logger.error(f"区块链连接失败: {chain_name}") + return None + + try: + pair_contract = w3.eth.contract( + address=w3.to_checksum_address(pair_address), + abi=self.PAIR_ABI + self.ERC20_ABI, + ) + + self.logger.info(f"获取资金池数据: {pair_address}") + + reserves = pair_contract.functions.getReserves().call() + reserve0, reserve1 = reserves[0], reserves[1] + + token0_address = pair_contract.functions.token0().call() + token1_address = pair_contract.functions.token1().call() + + token0_info = self.get_token_info(w3, token0_address) + token1_info = self.get_token_info(w3, token1_address) + + lp_total_supply = pair_contract.functions.totalSupply().call() + + token0_amount = Decimal(str(reserve0)) / (10 ** token0_info["decimals"]) + token1_amount = Decimal(str(reserve1)) / (10 ** token1_info["decimals"]) + lp_total_supply_formatted = Decimal(str(lp_total_supply)) / (10**18) + + self.logger.info( + f"资金池数据 - LP: {lp_total_supply_formatted}, {token0_info['symbol']}: {token0_amount}, {token1_info['symbol']}: {token1_amount}" + ) + + return { + "timestamp": self.get_local_timestamp(), + "chain": chain_name, + "pair_address": pair_address, + "lp_total_supply": lp_total_supply_formatted, + "token0": {"info": token0_info, "total_amount": token0_amount}, + "token1": {"info": token1_info, "total_amount": token1_amount}, + } + + except Exception as e: + self.logger.error(f"获取资金池数据失败 {pair_address}: {e}") + return None + + def get_user_position_data(self, position_config): + """获取用户仓位数据""" + chain_name = position_config["chain"] + pair_address = position_config["pool_address"] + user_address = position_config["user_address"] + + if chain_name not in self.chain_configs: + self.logger.error(f"不支持的区块链: {chain_name}") + return None + + chain_config = self.chain_configs[chain_name] + w3 = Web3(Web3.HTTPProvider(chain_config["rpc_url"])) + + if not w3.is_connected(): + self.logger.error(f"区块链连接失败: {chain_name}") + return None + + try: + pair_contract = w3.eth.contract( + address=w3.to_checksum_address(pair_address), + abi=self.PAIR_ABI + self.ERC20_ABI, + ) + + self.logger.info(f"获取用户仓位: {user_address} -> {pair_address}") + + user_lp_balance = pair_contract.functions.balanceOf( + w3.to_checksum_address(user_address) + ).call() + + lp_total_supply = pair_contract.functions.totalSupply().call() + reserves = pair_contract.functions.getReserves().call() + reserve0, reserve1 = reserves[0], reserves[1] + + token0_address = pair_contract.functions.token0().call() + token1_address = pair_contract.functions.token1().call() + token0_info = self.get_token_info(w3, token0_address) + token1_info = self.get_token_info(w3, token1_address) + + if lp_total_supply > 0: + user_share = Decimal(str(user_lp_balance)) / Decimal( + str(lp_total_supply) + ) + else: + user_share = Decimal("0") + self.logger.warning("LP总供应量为0") + + user_token0_amount = (Decimal(str(reserve0)) * user_share) / ( + 10 ** token0_info["decimals"] + ) + user_token1_amount = (Decimal(str(reserve1)) * user_share) / ( + 10 ** token1_info["decimals"] + ) + lp_amount = Decimal(str(user_lp_balance)) / (10**18) + + total_amount = user_token0_amount + user_token1_amount + + self.logger.info( + f"用户仓位 - LP: {lp_amount}, {token0_info['symbol']}: {user_token0_amount}, {token1_info['symbol']}: {user_token1_amount}, 总计: {total_amount}" + ) + + return { + "timestamp": self.get_local_timestamp(), + "chain": chain_name, + "user_address": user_address, + "pair_address": pair_address, + "lp_amount_formatted": lp_amount, + "token0": {"info": token0_info, "amount_formatted": user_token0_amount}, + "token1": {"info": token1_info, "amount_formatted": user_token1_amount}, + "total_amount": total_amount, + } + + except Exception as e: + self.logger.error(f"获取用户仓位失败 {pair_address}: {e}") + return None + + def read_previous_data(self, file_path): + """读取之前的数据用于计算变化量""" + if not os.path.exists(file_path): + return None + + try: + with open(file_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + rows = list(reader) + if rows: + return rows[-1] + except Exception as e: + self.logger.error(f"读取历史数据失败 {file_path}: {e}") + return None + + def save_tvl_data(self, tvl_data): + """保存TVL数据到单独的CSV文件""" + pool_address = tvl_data["pair_address"] + tvl_file = f"data/tvl_{pool_address}.csv" + + previous_data = self.read_previous_data(tvl_file) + + if previous_data: + prev_total_amount = Decimal( + previous_data[tvl_data["token0"]["info"]["symbol"]] + ) + Decimal(previous_data[tvl_data["token1"]["info"]["symbol"]]) + amount_change = ( + Decimal(previous_data[tvl_data["token0"]["info"]["symbol"]]) + + Decimal(previous_data[tvl_data["token1"]["info"]["symbol"]]) + - Decimal(tvl_data["token0"]["total_amount"]) + - Decimal(tvl_data["token1"]["total_amount"]) + ) + self.logger.info(f"数量变化: {amount_change}") + else: + amount_change = Decimal(tvl_data["token0"]["total_amount"]) + Decimal( + tvl_data["token1"]["total_amount"] + ) + + if amount_change == 0: + return + + tvl_row = { + "时间": tvl_data["timestamp"], + "lptoken": str(tvl_data["lp_total_supply"]), + f'{tvl_data["token0"]["info"]["symbol"]}': str( + tvl_data["token0"]["total_amount"] + ), + f'{tvl_data["token1"]["info"]["symbol"]}': str( + tvl_data["token1"]["total_amount"] + ), + } + + file_exists = os.path.exists(tvl_file) + with open(tvl_file, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=tvl_row.keys()) + if not file_exists: + writer.writeheader() + writer.writerow(tvl_row) + self.logger.info(f"保存TVL数据: {tvl_file}") + + def save_user_data(self, user_data): + """保存用户仓位数据到CSV""" + user_address = user_data["user_address"] + pool_address = user_data["pair_address"] + + data_file = f"data/data_{user_address}_{pool_address}.csv" + portfolio_file = f"data/portfolio_{user_address}_{pool_address}.csv" + + previous_data = self.read_previous_data(data_file) + + if previous_data: + prev_total_amount = Decimal(previous_data["总数量"]) + amount_change = user_data["total_amount"] - prev_total_amount + self.logger.info(f"数量变化: {amount_change}") + else: + amount_change = user_data["total_amount"] + + if amount_change == 0: + return + + data_row = { + "时间": user_data["timestamp"], + "lptoken": str(user_data["lp_amount_formatted"]), + f'{user_data["token0"]["info"]["symbol"]}': str( + user_data["token0"]["amount_formatted"] + ), + f'{user_data["token1"]["info"]["symbol"]}': str( + user_data["token1"]["amount_formatted"] + ), + "总数量": str(user_data["total_amount"]), + "变化量": str(amount_change), + } + + file_exists = os.path.exists(data_file) + with open(data_file, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=data_row.keys()) + if not file_exists: + writer.writeheader() + writer.writerow(data_row) + + self.logger.info(f"保存用户数据: {os.path.basename(data_file)}") + + portfolio_row = { + "日期": user_data["timestamp"], + "类型": "利息", + "净额": str(amount_change), + } + + portfolio_exists = os.path.exists(portfolio_file) + with open(portfolio_file, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=portfolio_row.keys()) + if not portfolio_exists: + writer.writeheader() + writer.writerow(portfolio_row) + self.logger.info(f"保存投资组合数据: {os.path.basename(portfolio_file)}") + + def track_all_positions(self): + """跟踪所有仓位""" + self.logger.info(f"开始处理 {len(self.positions_config)} 个仓位") + + processed_pools = set() + success_count = 0 + + for i, position_config in enumerate(self.positions_config, 1): + user_addr = position_config["user_address"] + pool_addr = position_config["pool_address"] + self.logger.info( + f"[{i}/{len(self.positions_config)}] 处理仓位 {user_addr} -> {pool_addr}" + ) + + pool_key = position_config["pool_address"] + if pool_key not in processed_pools: + tvl_data = self.get_pool_tvl_data(position_config) + if tvl_data: + self.save_tvl_data(tvl_data) + processed_pools.add(pool_key) + else: + self.logger.error("资金池数据获取失败,跳过该仓位") + continue + + user_data = self.get_user_position_data(position_config) + if user_data: + self.save_user_data(user_data) + success_count += 1 + else: + self.logger.error("用户仓位数据获取失败") + + self.logger.info( + f"处理完成: 成功 {success_count}/{len(self.positions_config)} 个仓位, {len(processed_pools)} 个资金池" + ) + + +def main(): + """主函数""" + try: + tracker = UniversalLPPositionTracker("config") + tracker.track_all_positions() + except Exception as e: + logging.error(f"程序执行失败: {e}") + raise + + +if __name__ == "__main__": + main() diff --git a/uniswap-v2/merge.py b/uniswap-v2/merge.py new file mode 100644 index 0000000..ed2257d --- /dev/null +++ b/uniswap-v2/merge.py @@ -0,0 +1,88 @@ +import os +import csv +from decimal import Decimal +from datetime import datetime + + +def merge_portfolio_files(): + data_dir = "data" + if not os.path.exists(data_dir): + print(f"目录 {data_dir} 不存在") + return + + portfolio_files = [] + for filename in os.listdir(data_dir): + if filename.startswith("portfolio_") and filename.endswith(".csv"): + portfolio_files.append(filename) + + if not portfolio_files: + print("未找到portfolio_*.csv文件") + return + + for input_filename in portfolio_files: + output_filename = input_filename.replace("portfolio_", "merged_") + input_path = os.path.join(data_dir, input_filename) + output_path = os.path.join(data_dir, output_filename) + + try: + process_single_file(input_path, output_path) + print(f"成功处理: {input_filename} -> {output_filename}") + except Exception as e: + print(f"处理文件 {input_filename} 时出错: {e}") + + +def process_single_file(input_path, output_path): + date_data = {} # 格式: {日期: {'amount': 净额总和, 'last_time': 最后时间}} + + with open(input_path, "r", encoding="utf-8") as infile: + reader = csv.DictReader(infile) + + if not all(field in reader.fieldnames for field in ["日期", "类型", "净额"]): + raise ValueError("CSV文件格式不正确,需要的列:日期,类型,净额") + + for row in reader: + full_datetime_str = row["日期"] + try: + full_datetime = datetime.fromisoformat( + full_datetime_str.replace("Z", "+00:00") + ) + except ValueError: + print(f"警告:跳过无效的日期格式: {full_datetime_str}") + continue + + date_only = full_datetime.date().isoformat() + + try: + amount = Decimal(row["净额"]) + except ValueError: + print(f"警告:跳过无效的净额值: {row['净额']}") + continue + + if date_only in date_data: + date_data[date_only]["amount"] += amount + if full_datetime > date_data[date_only]["last_time"]: + date_data[date_only]["last_time"] = full_datetime + else: + date_data[date_only] = {"amount": amount, "last_time": full_datetime} + + sorted_dates = sorted(date_data.keys()) + + with open(output_path, "w", encoding="utf-8", newline="") as outfile: + writer = csv.writer(outfile) + writer.writerow(["日期", "类型", "净额"]) + + for date in sorted_dates: + data = date_data[date] + last_datetime_str = data["last_time"].strftime("%Y-%m-%dT%H:%M") + writer.writerow([last_datetime_str, "利息", data['amount']]) + + +def main(): + """主函数""" + print("开始处理portfolio文件...") + merge_portfolio_files() + print("处理完成!") + + +if __name__ == "__main__": + main()