feat(uniswap-v2): add support for uniswap v2 positions

This commit is contained in:
2025-09-27 11:00:35 +08:00
commit 1eed2c4a61
8 changed files with 557 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
*.log

8
.pylintrc Normal file
View File

@@ -0,0 +1,8 @@
[MASTER]
disable=
broad-exception-caught,
logging-fstring-interpolation,
line-too-long,
missing-class-docstring,
missing-module-docstring,
missing-function-docstring,

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
web3

2
uniswap-v2/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
data/
*.bat

View File

@@ -0,0 +1,10 @@
{
"polygon": {
"rpc_url": "https://polygon-bor-rpc.publicnode.com",
"uniswap_v2_factory": "0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32"
},
"polygon1111": {
"rpc_url": "https://polygon-bor-rpc.publicnode.com",
"uniswap_v2_factory": "0x5757371414417b8C6CAad45bAeF941aBc7d3Ab32"
}
}

View File

@@ -0,0 +1,14 @@
[
{
"name(optional)": "DAI-USDT0",
"chain": "polygon",
"user_address": "0x",
"pool_address": "0x"
},
{
"name(optional)": "DAI-USDC",
"chain": "polygon",
"user_address": "0x",
"pool_address": "0x"
}
]

433
uniswap-v2/main.py Normal file
View File

@@ -0,0 +1,433 @@
import json
from decimal import Decimal
import csv
import os
import logging
from datetime import datetime
from web3 import Web3
class UniversalLPPositionTracker:
ERC20_ABI = [
{
"constant": True,
"inputs": [],
"name": "symbol",
"outputs": [{"name": "", "type": "string"}],
"type": "function",
},
{
"constant": True,
"inputs": [],
"name": "decimals",
"outputs": [{"name": "", "type": "uint8"}],
"type": "function",
},
{
"constant": True,
"inputs": [{"name": "account", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "", "type": "uint256"}],
"type": "function",
},
{
"constant": True,
"inputs": [],
"name": "totalSupply",
"outputs": [{"name": "", "type": "uint256"}],
"type": "function",
},
]
PAIR_ABI = [
{
"constant": True,
"inputs": [],
"name": "getReserves",
"outputs": [
{"name": "reserve0", "type": "uint112"},
{"name": "reserve1", "type": "uint112"},
{"name": "blockTimestampLast", "type": "uint32"},
],
"type": "function",
},
{
"constant": True,
"inputs": [],
"name": "token0",
"outputs": [{"name": "", "type": "address"}],
"type": "function",
},
{
"constant": True,
"inputs": [],
"name": "token1",
"outputs": [{"name": "", "type": "address"}],
"type": "function",
},
]
def __init__(self, config_dir="config"):
self.config_dir = config_dir
self.setup_logging()
self.load_configs()
def setup_logging(self):
"""设置日志系统"""
os.makedirs("logs", exist_ok=True)
log_format = "%(asctime)s - %(levelname)s - %(message)s"
log_filename = f"logs/{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format=log_format,
handlers=[
logging.FileHandler(log_filename, encoding="utf-8"),
logging.StreamHandler(),
],
)
self.logger = logging.getLogger(__name__)
self.logger.info("初始化日志系统")
def get_local_timestamp(self):
"""获取本地时区的时间戳"""
return datetime.now().strftime("%Y-%m-%dT%H:%M")
def load_configs(self):
"""从JSON文件加载配置"""
try:
with open(f"{self.config_dir}/chains.json", "r", encoding="utf-8") as f:
self.chain_configs = json.load(f)
self.logger.info(f"加载区块链配置: {len(self.chain_configs)} 条链")
except FileNotFoundError:
self.logger.error("配置文件缺失: chains.json")
self.chain_configs = {}
except json.JSONDecodeError as e:
self.logger.error(f"配置文件格式错误 - chains.json: {e}")
self.chain_configs = {}
try:
with open(f"{self.config_dir}/positions.json", "r", encoding="utf-8") as f:
self.positions_config = json.load(f)
self.logger.info(f"加载仓位配置: {len(self.positions_config)} 个仓位")
except FileNotFoundError:
self.logger.error("配置文件缺失: positions.json")
self.positions_config = []
except json.JSONDecodeError as e:
self.logger.error(f"配置文件格式错误 - positions.json: {e}")
self.positions_config = []
os.makedirs("data", exist_ok=True)
def get_token_info(self, w3, token_address):
"""获取代币信息"""
try:
token_contract = w3.eth.contract(
address=w3.to_checksum_address(token_address), abi=self.ERC20_ABI
)
symbol = token_contract.functions.symbol().call()
decimals = token_contract.functions.decimals().call()
return {"address": token_address, "symbol": symbol, "decimals": decimals}
except Exception as e:
self.logger.warning(f"获取代币信息失败 {token_address}: {e}")
return {"address": token_address, "symbol": "UNKNOWN", "decimals": 18}
def get_pool_tvl_data(self, position_config):
"""获取整个资金池的TVL数据"""
chain_name = position_config["chain"]
pair_address = position_config["pool_address"]
if chain_name not in self.chain_configs:
self.logger.error(f"不支持的区块链: {chain_name}")
return None
chain_config = self.chain_configs[chain_name]
w3 = Web3(Web3.HTTPProvider(chain_config["rpc_url"]))
if not w3.is_connected():
self.logger.error(f"区块链连接失败: {chain_name}")
return None
try:
pair_contract = w3.eth.contract(
address=w3.to_checksum_address(pair_address),
abi=self.PAIR_ABI + self.ERC20_ABI,
)
self.logger.info(f"获取资金池数据: {pair_address}")
reserves = pair_contract.functions.getReserves().call()
reserve0, reserve1 = reserves[0], reserves[1]
token0_address = pair_contract.functions.token0().call()
token1_address = pair_contract.functions.token1().call()
token0_info = self.get_token_info(w3, token0_address)
token1_info = self.get_token_info(w3, token1_address)
lp_total_supply = pair_contract.functions.totalSupply().call()
token0_amount = Decimal(str(reserve0)) / (10 ** token0_info["decimals"])
token1_amount = Decimal(str(reserve1)) / (10 ** token1_info["decimals"])
lp_total_supply_formatted = Decimal(str(lp_total_supply)) / (10**18)
self.logger.info(
f"资金池数据 - LP: {lp_total_supply_formatted}, {token0_info['symbol']}: {token0_amount}, {token1_info['symbol']}: {token1_amount}"
)
return {
"timestamp": self.get_local_timestamp(),
"chain": chain_name,
"pair_address": pair_address,
"lp_total_supply": lp_total_supply_formatted,
"token0": {"info": token0_info, "total_amount": token0_amount},
"token1": {"info": token1_info, "total_amount": token1_amount},
}
except Exception as e:
self.logger.error(f"获取资金池数据失败 {pair_address}: {e}")
return None
def get_user_position_data(self, position_config):
"""获取用户仓位数据"""
chain_name = position_config["chain"]
pair_address = position_config["pool_address"]
user_address = position_config["user_address"]
if chain_name not in self.chain_configs:
self.logger.error(f"不支持的区块链: {chain_name}")
return None
chain_config = self.chain_configs[chain_name]
w3 = Web3(Web3.HTTPProvider(chain_config["rpc_url"]))
if not w3.is_connected():
self.logger.error(f"区块链连接失败: {chain_name}")
return None
try:
pair_contract = w3.eth.contract(
address=w3.to_checksum_address(pair_address),
abi=self.PAIR_ABI + self.ERC20_ABI,
)
self.logger.info(f"获取用户仓位: {user_address} -> {pair_address}")
user_lp_balance = pair_contract.functions.balanceOf(
w3.to_checksum_address(user_address)
).call()
lp_total_supply = pair_contract.functions.totalSupply().call()
reserves = pair_contract.functions.getReserves().call()
reserve0, reserve1 = reserves[0], reserves[1]
token0_address = pair_contract.functions.token0().call()
token1_address = pair_contract.functions.token1().call()
token0_info = self.get_token_info(w3, token0_address)
token1_info = self.get_token_info(w3, token1_address)
if lp_total_supply > 0:
user_share = Decimal(str(user_lp_balance)) / Decimal(
str(lp_total_supply)
)
else:
user_share = Decimal("0")
self.logger.warning("LP总供应量为0")
user_token0_amount = (Decimal(str(reserve0)) * user_share) / (
10 ** token0_info["decimals"]
)
user_token1_amount = (Decimal(str(reserve1)) * user_share) / (
10 ** token1_info["decimals"]
)
lp_amount = Decimal(str(user_lp_balance)) / (10**18)
total_amount = user_token0_amount + user_token1_amount
self.logger.info(
f"用户仓位 - LP: {lp_amount}, {token0_info['symbol']}: {user_token0_amount}, {token1_info['symbol']}: {user_token1_amount}, 总计: {total_amount}"
)
return {
"timestamp": self.get_local_timestamp(),
"chain": chain_name,
"user_address": user_address,
"pair_address": pair_address,
"lp_amount_formatted": lp_amount,
"token0": {"info": token0_info, "amount_formatted": user_token0_amount},
"token1": {"info": token1_info, "amount_formatted": user_token1_amount},
"total_amount": total_amount,
}
except Exception as e:
self.logger.error(f"获取用户仓位失败 {pair_address}: {e}")
return None
def read_previous_data(self, file_path):
"""读取之前的数据用于计算变化量"""
if not os.path.exists(file_path):
return None
try:
with open(file_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
rows = list(reader)
if rows:
return rows[-1]
except Exception as e:
self.logger.error(f"读取历史数据失败 {file_path}: {e}")
return None
def save_tvl_data(self, tvl_data):
"""保存TVL数据到单独的CSV文件"""
pool_address = tvl_data["pair_address"]
tvl_file = f"data/tvl_{pool_address}.csv"
previous_data = self.read_previous_data(tvl_file)
if previous_data:
prev_total_amount = Decimal(
previous_data[tvl_data["token0"]["info"]["symbol"]]
) + Decimal(previous_data[tvl_data["token1"]["info"]["symbol"]])
amount_change = (
Decimal(previous_data[tvl_data["token0"]["info"]["symbol"]])
+ Decimal(previous_data[tvl_data["token1"]["info"]["symbol"]])
- Decimal(tvl_data["token0"]["total_amount"])
- Decimal(tvl_data["token1"]["total_amount"])
)
self.logger.info(f"数量变化: {amount_change}")
else:
amount_change = Decimal(tvl_data["token0"]["total_amount"]) + Decimal(
tvl_data["token1"]["total_amount"]
)
if amount_change == 0:
return
tvl_row = {
"时间": tvl_data["timestamp"],
"lptoken": str(tvl_data["lp_total_supply"]),
f'{tvl_data["token0"]["info"]["symbol"]}': str(
tvl_data["token0"]["total_amount"]
),
f'{tvl_data["token1"]["info"]["symbol"]}': str(
tvl_data["token1"]["total_amount"]
),
}
file_exists = os.path.exists(tvl_file)
with open(tvl_file, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=tvl_row.keys())
if not file_exists:
writer.writeheader()
writer.writerow(tvl_row)
self.logger.info(f"保存TVL数据: {tvl_file}")
def save_user_data(self, user_data):
"""保存用户仓位数据到CSV"""
user_address = user_data["user_address"]
pool_address = user_data["pair_address"]
data_file = f"data/data_{user_address}_{pool_address}.csv"
portfolio_file = f"data/portfolio_{user_address}_{pool_address}.csv"
previous_data = self.read_previous_data(data_file)
if previous_data:
prev_total_amount = Decimal(previous_data["总数量"])
amount_change = user_data["total_amount"] - prev_total_amount
self.logger.info(f"数量变化: {amount_change}")
else:
amount_change = user_data["total_amount"]
if amount_change == 0:
return
data_row = {
"时间": user_data["timestamp"],
"lptoken": str(user_data["lp_amount_formatted"]),
f'{user_data["token0"]["info"]["symbol"]}': str(
user_data["token0"]["amount_formatted"]
),
f'{user_data["token1"]["info"]["symbol"]}': str(
user_data["token1"]["amount_formatted"]
),
"总数量": str(user_data["total_amount"]),
"变化量": str(amount_change),
}
file_exists = os.path.exists(data_file)
with open(data_file, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=data_row.keys())
if not file_exists:
writer.writeheader()
writer.writerow(data_row)
self.logger.info(f"保存用户数据: {os.path.basename(data_file)}")
portfolio_row = {
"日期": user_data["timestamp"],
"类型": "利息",
"净额": str(amount_change),
}
portfolio_exists = os.path.exists(portfolio_file)
with open(portfolio_file, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=portfolio_row.keys())
if not portfolio_exists:
writer.writeheader()
writer.writerow(portfolio_row)
self.logger.info(f"保存投资组合数据: {os.path.basename(portfolio_file)}")
def track_all_positions(self):
"""跟踪所有仓位"""
self.logger.info(f"开始处理 {len(self.positions_config)} 个仓位")
processed_pools = set()
success_count = 0
for i, position_config in enumerate(self.positions_config, 1):
user_addr = position_config["user_address"]
pool_addr = position_config["pool_address"]
self.logger.info(
f"[{i}/{len(self.positions_config)}] 处理仓位 {user_addr} -> {pool_addr}"
)
pool_key = position_config["pool_address"]
if pool_key not in processed_pools:
tvl_data = self.get_pool_tvl_data(position_config)
if tvl_data:
self.save_tvl_data(tvl_data)
processed_pools.add(pool_key)
else:
self.logger.error("资金池数据获取失败,跳过该仓位")
continue
user_data = self.get_user_position_data(position_config)
if user_data:
self.save_user_data(user_data)
success_count += 1
else:
self.logger.error("用户仓位数据获取失败")
self.logger.info(
f"处理完成: 成功 {success_count}/{len(self.positions_config)} 个仓位, {len(processed_pools)} 个资金池"
)
def main():
"""主函数"""
try:
tracker = UniversalLPPositionTracker("config")
tracker.track_all_positions()
except Exception as e:
logging.error(f"程序执行失败: {e}")
raise
if __name__ == "__main__":
main()

88
uniswap-v2/merge.py Normal file
View File

@@ -0,0 +1,88 @@
import os
import csv
from decimal import Decimal
from datetime import datetime
def merge_portfolio_files():
data_dir = "data"
if not os.path.exists(data_dir):
print(f"目录 {data_dir} 不存在")
return
portfolio_files = []
for filename in os.listdir(data_dir):
if filename.startswith("portfolio_") and filename.endswith(".csv"):
portfolio_files.append(filename)
if not portfolio_files:
print("未找到portfolio_*.csv文件")
return
for input_filename in portfolio_files:
output_filename = input_filename.replace("portfolio_", "merged_")
input_path = os.path.join(data_dir, input_filename)
output_path = os.path.join(data_dir, output_filename)
try:
process_single_file(input_path, output_path)
print(f"成功处理: {input_filename} -> {output_filename}")
except Exception as e:
print(f"处理文件 {input_filename} 时出错: {e}")
def process_single_file(input_path, output_path):
date_data = {} # 格式: {日期: {'amount': 净额总和, 'last_time': 最后时间}}
with open(input_path, "r", encoding="utf-8") as infile:
reader = csv.DictReader(infile)
if not all(field in reader.fieldnames for field in ["日期", "类型", "净额"]):
raise ValueError("CSV文件格式不正确需要的列日期,类型,净额")
for row in reader:
full_datetime_str = row["日期"]
try:
full_datetime = datetime.fromisoformat(
full_datetime_str.replace("Z", "+00:00")
)
except ValueError:
print(f"警告:跳过无效的日期格式: {full_datetime_str}")
continue
date_only = full_datetime.date().isoformat()
try:
amount = Decimal(row["净额"])
except ValueError:
print(f"警告:跳过无效的净额值: {row['净额']}")
continue
if date_only in date_data:
date_data[date_only]["amount"] += amount
if full_datetime > date_data[date_only]["last_time"]:
date_data[date_only]["last_time"] = full_datetime
else:
date_data[date_only] = {"amount": amount, "last_time": full_datetime}
sorted_dates = sorted(date_data.keys())
with open(output_path, "w", encoding="utf-8", newline="") as outfile:
writer = csv.writer(outfile)
writer.writerow(["日期", "类型", "净额"])
for date in sorted_dates:
data = date_data[date]
last_datetime_str = data["last_time"].strftime("%Y-%m-%dT%H:%M")
writer.writerow([last_datetime_str, "利息", data['amount']])
def main():
"""主函数"""
print("开始处理portfolio文件...")
merge_portfolio_files()
print("处理完成!")
if __name__ == "__main__":
main()