coinbus-data/coinbus/Macroeconomic_FBI_v2.py

98 lines
2.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import pymysql
from datetime import datetime
from w3lib.html import remove_tags
import pandas as pd
import time
def parse_treasury_data(data):
# 找到列头位置
header_index = data.index("Country")
columns = data[header_index:header_index+14] # Country + 13个月
rows = data[header_index+14:]
result = []
i = 0
while i < len(rows):
# 拼接国家名
country_parts = []
while i < len(rows) and not rows[i].replace('.', '', 1).isdigit():
country_parts.append(rows[i])
i += 1
country = " ".join(country_parts).replace(",", "")
# 取13个数值
values = rows[i:i+13]
i += 13
if len(values) == 13:
result.append([country] + values)
# 转成 DataFrame
df = pd.DataFrame(result, columns=columns)
# =================== 名称清洗 ===================
rename_map = {
"Of Which: Foreign Official": "Foreign Official",
"Of Which: Foreign Official Treasury Bills": "Treasury Bills",
"Of Which: Foreign Official T-Bonds &amp; Notes": "T-Bonds & Notes"
}
df["Country"] = df["Country"].replace(rename_map)
return df
def run_job():
print("=== 开始爬取并更新数据库 ===")
# =================== 爬取网页 =====================
page = requests.get("https://ticdata.treasury.gov/resource-center/data-chart-center/tic/Documents/slt_table5.html")
page = remove_tags(str(page.text))
page = page.split()
df = parse_treasury_data(page)
# =================== 连接数据库 =====================
db = pymysql.connect(
host="127.0.0.1",
user="root",
password="2GS@bPYcgiMyL14A",
database="Macroeconomics",
port=4423
)
cursor = db.cursor()
# 查询数据库中最新日期
cursor.execute("SELECT date FROM FBI ORDER BY date DESC LIMIT 1")
result = cursor.fetchone()
latest_date_in_db = result[0] if result else None # datetime 类型或 None
# =================== 补齐逻辑 =====================
for col in df.columns[1:]: # 遍历所有月份列
col_date = datetime.strptime(col, "%Y-%m")
# 如果数据库已有该日期,跳过
if latest_date_in_db and col_date <= latest_date_in_db:
continue
print(f"正在插入 {col} 的数据...")
insert_sql = "INSERT INTO FBI (date, name, value) VALUES (%s, %s, %s)"
for _, row in df.iterrows():
country = row["Country"]
value = row[col]
cursor.execute(insert_sql, (col_date.strftime("%Y-%m-01"), country, value))
db.commit()
print(f"{col} 插入完成")
cursor.close()
db.close()
print("=== 本次任务完成 ===\n")
# =================== 循环执行 =====================
if __name__ == "__main__":
while True:
run_job()
print("休眠 21600 秒6 小时)...\n")
time.sleep(21600) # 6小时