434 lines
16 KiB
Python
434 lines
16 KiB
Python
import json
|
|
from decimal import Decimal
|
|
import csv
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
from web3 import Web3
|
|
|
|
|
|
class UniversalLPPositionTracker:
|
|
ERC20_ABI = [
|
|
{
|
|
"constant": True,
|
|
"inputs": [],
|
|
"name": "symbol",
|
|
"outputs": [{"name": "", "type": "string"}],
|
|
"type": "function",
|
|
},
|
|
{
|
|
"constant": True,
|
|
"inputs": [],
|
|
"name": "decimals",
|
|
"outputs": [{"name": "", "type": "uint8"}],
|
|
"type": "function",
|
|
},
|
|
{
|
|
"constant": True,
|
|
"inputs": [{"name": "account", "type": "address"}],
|
|
"name": "balanceOf",
|
|
"outputs": [{"name": "", "type": "uint256"}],
|
|
"type": "function",
|
|
},
|
|
{
|
|
"constant": True,
|
|
"inputs": [],
|
|
"name": "totalSupply",
|
|
"outputs": [{"name": "", "type": "uint256"}],
|
|
"type": "function",
|
|
},
|
|
]
|
|
|
|
PAIR_ABI = [
|
|
{
|
|
"constant": True,
|
|
"inputs": [],
|
|
"name": "getReserves",
|
|
"outputs": [
|
|
{"name": "reserve0", "type": "uint112"},
|
|
{"name": "reserve1", "type": "uint112"},
|
|
{"name": "blockTimestampLast", "type": "uint32"},
|
|
],
|
|
"type": "function",
|
|
},
|
|
{
|
|
"constant": True,
|
|
"inputs": [],
|
|
"name": "token0",
|
|
"outputs": [{"name": "", "type": "address"}],
|
|
"type": "function",
|
|
},
|
|
{
|
|
"constant": True,
|
|
"inputs": [],
|
|
"name": "token1",
|
|
"outputs": [{"name": "", "type": "address"}],
|
|
"type": "function",
|
|
},
|
|
]
|
|
|
|
def __init__(self, config_dir="config"):
|
|
self.config_dir = config_dir
|
|
self.setup_logging()
|
|
self.load_configs()
|
|
|
|
def setup_logging(self):
|
|
"""设置日志系统"""
|
|
os.makedirs("logs", exist_ok=True)
|
|
|
|
log_format = "%(asctime)s - %(levelname)s - %(message)s"
|
|
log_filename = f"logs/{datetime.now().strftime('%Y-%m-%d')}.log"
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format=log_format,
|
|
handlers=[
|
|
logging.FileHandler(log_filename, encoding="utf-8"),
|
|
logging.StreamHandler(),
|
|
],
|
|
)
|
|
|
|
self.logger = logging.getLogger(__name__)
|
|
self.logger.info("初始化日志系统")
|
|
|
|
def get_local_timestamp(self):
|
|
"""获取本地时区的时间戳"""
|
|
return datetime.now().strftime("%Y-%m-%dT%H:%M")
|
|
|
|
def load_configs(self):
|
|
"""从JSON文件加载配置"""
|
|
try:
|
|
with open(f"{self.config_dir}/chains.json", "r", encoding="utf-8") as f:
|
|
self.chain_configs = json.load(f)
|
|
self.logger.info(f"加载区块链配置: {len(self.chain_configs)} 条链")
|
|
except FileNotFoundError:
|
|
self.logger.error("配置文件缺失: chains.json")
|
|
self.chain_configs = {}
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"配置文件格式错误 - chains.json: {e}")
|
|
self.chain_configs = {}
|
|
|
|
try:
|
|
with open(f"{self.config_dir}/positions.json", "r", encoding="utf-8") as f:
|
|
self.positions_config = json.load(f)
|
|
self.logger.info(f"加载仓位配置: {len(self.positions_config)} 个仓位")
|
|
except FileNotFoundError:
|
|
self.logger.error("配置文件缺失: positions.json")
|
|
self.positions_config = []
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"配置文件格式错误 - positions.json: {e}")
|
|
self.positions_config = []
|
|
|
|
os.makedirs("data", exist_ok=True)
|
|
|
|
def get_token_info(self, w3, token_address):
|
|
"""获取代币信息"""
|
|
try:
|
|
token_contract = w3.eth.contract(
|
|
address=w3.to_checksum_address(token_address), abi=self.ERC20_ABI
|
|
)
|
|
|
|
symbol = token_contract.functions.symbol().call()
|
|
decimals = token_contract.functions.decimals().call()
|
|
|
|
return {"address": token_address, "symbol": symbol, "decimals": decimals}
|
|
except Exception as e:
|
|
self.logger.warning(f"获取代币信息失败 {token_address}: {e}")
|
|
return {"address": token_address, "symbol": "UNKNOWN", "decimals": 18}
|
|
|
|
def get_pool_tvl_data(self, position_config):
|
|
"""获取整个资金池的TVL数据"""
|
|
chain_name = position_config["chain"]
|
|
pair_address = position_config["pool_address"]
|
|
|
|
if chain_name not in self.chain_configs:
|
|
self.logger.error(f"不支持的区块链: {chain_name}")
|
|
return None
|
|
|
|
chain_config = self.chain_configs[chain_name]
|
|
w3 = Web3(Web3.HTTPProvider(chain_config["rpc_url"]))
|
|
|
|
if not w3.is_connected():
|
|
self.logger.error(f"区块链连接失败: {chain_name}")
|
|
return None
|
|
|
|
try:
|
|
pair_contract = w3.eth.contract(
|
|
address=w3.to_checksum_address(pair_address),
|
|
abi=self.PAIR_ABI + self.ERC20_ABI,
|
|
)
|
|
|
|
self.logger.info(f"获取资金池数据: {pair_address}")
|
|
|
|
reserves = pair_contract.functions.getReserves().call()
|
|
reserve0, reserve1 = reserves[0], reserves[1]
|
|
|
|
token0_address = pair_contract.functions.token0().call()
|
|
token1_address = pair_contract.functions.token1().call()
|
|
|
|
token0_info = self.get_token_info(w3, token0_address)
|
|
token1_info = self.get_token_info(w3, token1_address)
|
|
|
|
lp_total_supply = pair_contract.functions.totalSupply().call()
|
|
|
|
token0_amount = Decimal(str(reserve0)) / (10 ** token0_info["decimals"])
|
|
token1_amount = Decimal(str(reserve1)) / (10 ** token1_info["decimals"])
|
|
lp_total_supply_formatted = Decimal(str(lp_total_supply)) / (10**18)
|
|
|
|
self.logger.info(
|
|
f"资金池数据 - LP: {lp_total_supply_formatted}, {token0_info['symbol']}: {token0_amount}, {token1_info['symbol']}: {token1_amount}"
|
|
)
|
|
|
|
return {
|
|
"timestamp": self.get_local_timestamp(),
|
|
"chain": chain_name,
|
|
"pair_address": pair_address,
|
|
"lp_total_supply": lp_total_supply_formatted,
|
|
"token0": {"info": token0_info, "total_amount": token0_amount},
|
|
"token1": {"info": token1_info, "total_amount": token1_amount},
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"获取资金池数据失败 {pair_address}: {e}")
|
|
return None
|
|
|
|
def get_user_position_data(self, position_config):
|
|
"""获取用户仓位数据"""
|
|
chain_name = position_config["chain"]
|
|
pair_address = position_config["pool_address"]
|
|
user_address = position_config["user_address"]
|
|
|
|
if chain_name not in self.chain_configs:
|
|
self.logger.error(f"不支持的区块链: {chain_name}")
|
|
return None
|
|
|
|
chain_config = self.chain_configs[chain_name]
|
|
w3 = Web3(Web3.HTTPProvider(chain_config["rpc_url"]))
|
|
|
|
if not w3.is_connected():
|
|
self.logger.error(f"区块链连接失败: {chain_name}")
|
|
return None
|
|
|
|
try:
|
|
pair_contract = w3.eth.contract(
|
|
address=w3.to_checksum_address(pair_address),
|
|
abi=self.PAIR_ABI + self.ERC20_ABI,
|
|
)
|
|
|
|
self.logger.info(f"获取用户仓位: {user_address} -> {pair_address}")
|
|
|
|
user_lp_balance = pair_contract.functions.balanceOf(
|
|
w3.to_checksum_address(user_address)
|
|
).call()
|
|
|
|
lp_total_supply = pair_contract.functions.totalSupply().call()
|
|
reserves = pair_contract.functions.getReserves().call()
|
|
reserve0, reserve1 = reserves[0], reserves[1]
|
|
|
|
token0_address = pair_contract.functions.token0().call()
|
|
token1_address = pair_contract.functions.token1().call()
|
|
token0_info = self.get_token_info(w3, token0_address)
|
|
token1_info = self.get_token_info(w3, token1_address)
|
|
|
|
if lp_total_supply > 0:
|
|
user_share = Decimal(str(user_lp_balance)) / Decimal(
|
|
str(lp_total_supply)
|
|
)
|
|
else:
|
|
user_share = Decimal("0")
|
|
self.logger.warning("LP总供应量为0")
|
|
|
|
user_token0_amount = (Decimal(str(reserve0)) * user_share) / (
|
|
10 ** token0_info["decimals"]
|
|
)
|
|
user_token1_amount = (Decimal(str(reserve1)) * user_share) / (
|
|
10 ** token1_info["decimals"]
|
|
)
|
|
lp_amount = Decimal(str(user_lp_balance)) / (10**18)
|
|
|
|
total_amount = user_token0_amount + user_token1_amount
|
|
|
|
self.logger.info(
|
|
f"用户仓位 - LP: {lp_amount}, {token0_info['symbol']}: {user_token0_amount}, {token1_info['symbol']}: {user_token1_amount}, 总计: {total_amount}"
|
|
)
|
|
|
|
return {
|
|
"timestamp": self.get_local_timestamp(),
|
|
"chain": chain_name,
|
|
"user_address": user_address,
|
|
"pair_address": pair_address,
|
|
"lp_amount_formatted": lp_amount,
|
|
"token0": {"info": token0_info, "amount_formatted": user_token0_amount},
|
|
"token1": {"info": token1_info, "amount_formatted": user_token1_amount},
|
|
"total_amount": total_amount,
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"获取用户仓位失败 {pair_address}: {e}")
|
|
return None
|
|
|
|
def read_previous_data(self, file_path):
|
|
"""读取之前的数据用于计算变化量"""
|
|
if not os.path.exists(file_path):
|
|
return None
|
|
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
rows = list(reader)
|
|
if rows:
|
|
return rows[-1]
|
|
except Exception as e:
|
|
self.logger.error(f"读取历史数据失败 {file_path}: {e}")
|
|
return None
|
|
|
|
def save_tvl_data(self, tvl_data):
|
|
"""保存TVL数据到单独的CSV文件"""
|
|
pool_address = tvl_data["pair_address"]
|
|
tvl_file = f"data/tvl_{pool_address}.csv"
|
|
|
|
previous_data = self.read_previous_data(tvl_file)
|
|
|
|
if previous_data:
|
|
prev_total_amount = Decimal(
|
|
previous_data[tvl_data["token0"]["info"]["symbol"]]
|
|
) + Decimal(previous_data[tvl_data["token1"]["info"]["symbol"]])
|
|
amount_change = (
|
|
Decimal(previous_data[tvl_data["token0"]["info"]["symbol"]])
|
|
+ Decimal(previous_data[tvl_data["token1"]["info"]["symbol"]])
|
|
- Decimal(tvl_data["token0"]["total_amount"])
|
|
- Decimal(tvl_data["token1"]["total_amount"])
|
|
)
|
|
self.logger.info(f"数量变化: {amount_change}")
|
|
else:
|
|
amount_change = Decimal(tvl_data["token0"]["total_amount"]) + Decimal(
|
|
tvl_data["token1"]["total_amount"]
|
|
)
|
|
|
|
if amount_change == 0:
|
|
return
|
|
|
|
tvl_row = {
|
|
"时间": tvl_data["timestamp"],
|
|
"lptoken": str(tvl_data["lp_total_supply"]),
|
|
f'{tvl_data["token0"]["info"]["symbol"]}': str(
|
|
tvl_data["token0"]["total_amount"]
|
|
),
|
|
f'{tvl_data["token1"]["info"]["symbol"]}': str(
|
|
tvl_data["token1"]["total_amount"]
|
|
),
|
|
}
|
|
|
|
file_exists = os.path.exists(tvl_file)
|
|
with open(tvl_file, "a", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=tvl_row.keys())
|
|
if not file_exists:
|
|
writer.writeheader()
|
|
writer.writerow(tvl_row)
|
|
self.logger.info(f"保存TVL数据: {tvl_file}")
|
|
|
|
def save_user_data(self, user_data):
|
|
"""保存用户仓位数据到CSV"""
|
|
user_address = user_data["user_address"]
|
|
pool_address = user_data["pair_address"]
|
|
|
|
data_file = f"data/data_{user_address}_{pool_address}.csv"
|
|
portfolio_file = f"data/portfolio_{user_address}_{pool_address}.csv"
|
|
|
|
previous_data = self.read_previous_data(data_file)
|
|
|
|
if previous_data:
|
|
prev_total_amount = Decimal(previous_data["总数量"])
|
|
amount_change = user_data["total_amount"] - prev_total_amount
|
|
self.logger.info(f"数量变化: {amount_change}")
|
|
else:
|
|
amount_change = user_data["total_amount"]
|
|
|
|
if amount_change == 0:
|
|
return
|
|
|
|
data_row = {
|
|
"时间": user_data["timestamp"],
|
|
"lptoken": str(user_data["lp_amount_formatted"]),
|
|
f'{user_data["token0"]["info"]["symbol"]}': str(
|
|
user_data["token0"]["amount_formatted"]
|
|
),
|
|
f'{user_data["token1"]["info"]["symbol"]}': str(
|
|
user_data["token1"]["amount_formatted"]
|
|
),
|
|
"总数量": str(user_data["total_amount"]),
|
|
"变化量": str(amount_change),
|
|
}
|
|
|
|
file_exists = os.path.exists(data_file)
|
|
with open(data_file, "a", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=data_row.keys())
|
|
if not file_exists:
|
|
writer.writeheader()
|
|
writer.writerow(data_row)
|
|
|
|
self.logger.info(f"保存用户数据: {os.path.basename(data_file)}")
|
|
|
|
portfolio_row = {
|
|
"日期": user_data["timestamp"],
|
|
"类型": "利息",
|
|
"净额": str(amount_change),
|
|
}
|
|
|
|
portfolio_exists = os.path.exists(portfolio_file)
|
|
with open(portfolio_file, "a", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=portfolio_row.keys())
|
|
if not portfolio_exists:
|
|
writer.writeheader()
|
|
writer.writerow(portfolio_row)
|
|
self.logger.info(f"保存投资组合数据: {os.path.basename(portfolio_file)}")
|
|
|
|
def track_all_positions(self):
|
|
"""跟踪所有仓位"""
|
|
self.logger.info(f"开始处理 {len(self.positions_config)} 个仓位")
|
|
|
|
processed_pools = set()
|
|
success_count = 0
|
|
|
|
for i, position_config in enumerate(self.positions_config, 1):
|
|
user_addr = position_config["user_address"]
|
|
pool_addr = position_config["pool_address"]
|
|
self.logger.info(
|
|
f"[{i}/{len(self.positions_config)}] 处理仓位 {user_addr} -> {pool_addr}"
|
|
)
|
|
|
|
pool_key = position_config["pool_address"]
|
|
if pool_key not in processed_pools:
|
|
tvl_data = self.get_pool_tvl_data(position_config)
|
|
if tvl_data:
|
|
self.save_tvl_data(tvl_data)
|
|
processed_pools.add(pool_key)
|
|
else:
|
|
self.logger.error("资金池数据获取失败,跳过该仓位")
|
|
continue
|
|
|
|
user_data = self.get_user_position_data(position_config)
|
|
if user_data:
|
|
self.save_user_data(user_data)
|
|
success_count += 1
|
|
else:
|
|
self.logger.error("用户仓位数据获取失败")
|
|
|
|
self.logger.info(
|
|
f"处理完成: 成功 {success_count}/{len(self.positions_config)} 个仓位, {len(processed_pools)} 个资金池"
|
|
)
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
try:
|
|
tracker = UniversalLPPositionTracker("config")
|
|
tracker.track_all_positions()
|
|
except Exception as e:
|
|
logging.error(f"程序执行失败: {e}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|