117 lines
3.4 KiB
Python
117 lines
3.4 KiB
Python
import requests
|
||
import pymysql
|
||
import time
|
||
from datetime import datetime
|
||
|
||
# 目标币种列表(RUB 仍写在这里,方便统一逻辑)
|
||
symbols = ["EUR", "GBP", "JPY", "CAD", "SEK", "CHF", "CNY", "RUB"]
|
||
|
||
# 数据库配置
|
||
db_config = {
|
||
"host": "127.0.0.1",
|
||
"user": "root",
|
||
"password": "2GS@bPYcgiMyL14A",
|
||
"database": "Macroeconomics",
|
||
"port": 4423
|
||
}
|
||
|
||
def fetch_rates_frankfurter():
|
||
base = "USD"
|
||
url = f"https://api.frankfurter.app/latest?from={base}&to=" + ",".join([s for s in symbols if s != "RUB"])
|
||
retries = 5
|
||
while retries > 0:
|
||
try:
|
||
response = requests.get(url, timeout=10)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
rates = data.get("rates", {})
|
||
if not rates:
|
||
raise ValueError("接口返回空数据")
|
||
return rates
|
||
except Exception as e:
|
||
retries -= 1
|
||
print(f"Frankfurter 请求失败,重试中... 剩余次数: {retries}, 错误: {e}")
|
||
time.sleep(1)
|
||
print("Frankfurter 多次重试后失败,返回空数据")
|
||
return {}
|
||
|
||
def fetch_rub():
|
||
try:
|
||
url = "https://open.er-api.com/v6/latest/USD"
|
||
response = requests.get(url, timeout=10)
|
||
data = response.json()
|
||
if data.get("result") == "success":
|
||
rub_rate = data["rates"].get("RUB")
|
||
if rub_rate:
|
||
return rub_rate
|
||
except Exception as e:
|
||
print(f"获取 RUB 失败: {e}")
|
||
return None
|
||
|
||
def calc_dxy(rates):
|
||
weights = {
|
||
"EUR": 0.576,
|
||
"JPY": 0.136,
|
||
"GBP": 0.119,
|
||
"CAD": 0.091,
|
||
"SEK": 0.042,
|
||
"CHF": 0.036
|
||
}
|
||
weighted_sum = 0
|
||
weight_total = 0
|
||
for ccy, w in weights.items():
|
||
rate = rates.get(ccy)
|
||
if rate:
|
||
weighted_sum += rate * w
|
||
weight_total += w
|
||
if weight_total > 0:
|
||
return weighted_sum / weight_total
|
||
return None
|
||
|
||
def save_to_db(rates, dxy):
|
||
current_time = datetime.now().replace(second=0, microsecond=0)
|
||
data = {}
|
||
|
||
for ccy in symbols:
|
||
rate = rates.get(ccy)
|
||
if rate is not None:
|
||
data[f"USD{ccy}"] = round(rate, 5)
|
||
if dxy is not None:
|
||
data["DXY"] = round(dxy, 5)
|
||
|
||
connection = pymysql.connect(**db_config)
|
||
try:
|
||
with connection.cursor() as cursor:
|
||
for symbol, value in data.items():
|
||
query = """INSERT INTO exchangeRate (date, symbol, _value) VALUES (%s, %s, %s)"""
|
||
cursor.execute(query, (current_time, symbol, value))
|
||
connection.commit()
|
||
print(f"{current_time} 数据写入数据库成功")
|
||
except Exception as e:
|
||
print(f"写入数据库失败: {e}")
|
||
finally:
|
||
connection.close()
|
||
|
||
if __name__ == "__main__":
|
||
while True:
|
||
rates = fetch_rates_frankfurter()
|
||
|
||
# 获取 RUB 汇率
|
||
rub = fetch_rub()
|
||
if rub:
|
||
rates["RUB"] = rub
|
||
else:
|
||
print("未获取到 RUB 汇率")
|
||
|
||
if rates:
|
||
dxy = calc_dxy(rates)
|
||
print(f"汇率数据: {rates}")
|
||
if dxy:
|
||
print(f"美元指数近似值: {dxy:.5f}")
|
||
else:
|
||
print("美元指数近似值 无法计算")
|
||
save_to_db(rates, dxy)
|
||
else:
|
||
print("未获取到汇率数据")
|
||
|
||
time.sleep(1800) # 每30分钟执行一次 |