coinbus-data/coinbus/exchangeRate_lyq.py

117 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import pymysql
import time
from datetime import datetime
# 目标币种列表RUB 仍写在这里,方便统一逻辑)
symbols = ["EUR", "GBP", "JPY", "CAD", "SEK", "CHF", "CNY", "RUB"]
# 数据库配置
db_config = {
"host": "127.0.0.1",
"user": "root",
"password": "2GS@bPYcgiMyL14A",
"database": "Macroeconomics",
"port": 4423
}
def fetch_rates_frankfurter():
base = "USD"
url = f"https://api.frankfurter.app/latest?from={base}&to=" + ",".join([s for s in symbols if s != "RUB"])
retries = 5
while retries > 0:
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
rates = data.get("rates", {})
if not rates:
raise ValueError("接口返回空数据")
return rates
except Exception as e:
retries -= 1
print(f"Frankfurter 请求失败,重试中... 剩余次数: {retries}, 错误: {e}")
time.sleep(1)
print("Frankfurter 多次重试后失败,返回空数据")
return {}
def fetch_rub():
try:
url = "https://open.er-api.com/v6/latest/USD"
response = requests.get(url, timeout=10)
data = response.json()
if data.get("result") == "success":
rub_rate = data["rates"].get("RUB")
if rub_rate:
return rub_rate
except Exception as e:
print(f"获取 RUB 失败: {e}")
return None
def calc_dxy(rates):
weights = {
"EUR": 0.576,
"JPY": 0.136,
"GBP": 0.119,
"CAD": 0.091,
"SEK": 0.042,
"CHF": 0.036
}
weighted_sum = 0
weight_total = 0
for ccy, w in weights.items():
rate = rates.get(ccy)
if rate:
weighted_sum += rate * w
weight_total += w
if weight_total > 0:
return weighted_sum / weight_total
return None
def save_to_db(rates, dxy):
current_time = datetime.now().replace(second=0, microsecond=0)
data = {}
for ccy in symbols:
rate = rates.get(ccy)
if rate is not None:
data[f"USD{ccy}"] = round(rate, 5)
if dxy is not None:
data["DXY"] = round(dxy, 5)
connection = pymysql.connect(**db_config)
try:
with connection.cursor() as cursor:
for symbol, value in data.items():
query = """INSERT INTO exchangeRate (date, symbol, _value) VALUES (%s, %s, %s)"""
cursor.execute(query, (current_time, symbol, value))
connection.commit()
print(f"{current_time} 数据写入数据库成功")
except Exception as e:
print(f"写入数据库失败: {e}")
finally:
connection.close()
if __name__ == "__main__":
while True:
rates = fetch_rates_frankfurter()
# 获取 RUB 汇率
rub = fetch_rub()
if rub:
rates["RUB"] = rub
else:
print("未获取到 RUB 汇率")
if rates:
dxy = calc_dxy(rates)
print(f"汇率数据: {rates}")
if dxy:
print(f"美元指数近似值: {dxy:.5f}")
else:
print("美元指数近似值 无法计算")
save_to_db(rates, dxy)
else:
print("未获取到汇率数据")
time.sleep(1800) # 每30分钟执行一次