98 lines
2.9 KiB
Python
98 lines
2.9 KiB
Python
|
import requests
|
|||
|
import pymysql
|
|||
|
from datetime import datetime
|
|||
|
from w3lib.html import remove_tags
|
|||
|
import pandas as pd
|
|||
|
import time
|
|||
|
|
|||
|
def parse_treasury_data(data):
|
|||
|
# 找到列头位置
|
|||
|
header_index = data.index("Country")
|
|||
|
columns = data[header_index:header_index+14] # Country + 13个月
|
|||
|
rows = data[header_index+14:]
|
|||
|
|
|||
|
result = []
|
|||
|
i = 0
|
|||
|
while i < len(rows):
|
|||
|
# 拼接国家名
|
|||
|
country_parts = []
|
|||
|
while i < len(rows) and not rows[i].replace('.', '', 1).isdigit():
|
|||
|
country_parts.append(rows[i])
|
|||
|
i += 1
|
|||
|
country = " ".join(country_parts).replace(",", "")
|
|||
|
|
|||
|
# 取13个数值
|
|||
|
values = rows[i:i+13]
|
|||
|
i += 13
|
|||
|
|
|||
|
if len(values) == 13:
|
|||
|
result.append([country] + values)
|
|||
|
|
|||
|
# 转成 DataFrame
|
|||
|
df = pd.DataFrame(result, columns=columns)
|
|||
|
|
|||
|
# =================== 名称清洗 ===================
|
|||
|
rename_map = {
|
|||
|
"Of Which: Foreign Official": "Foreign Official",
|
|||
|
"Of Which: Foreign Official Treasury Bills": "Treasury Bills",
|
|||
|
"Of Which: Foreign Official T-Bonds & Notes": "T-Bonds & Notes"
|
|||
|
}
|
|||
|
df["Country"] = df["Country"].replace(rename_map)
|
|||
|
|
|||
|
return df
|
|||
|
|
|||
|
|
|||
|
def run_job():
|
|||
|
print("=== 开始爬取并更新数据库 ===")
|
|||
|
|
|||
|
# =================== 爬取网页 =====================
|
|||
|
page = requests.get("https://ticdata.treasury.gov/resource-center/data-chart-center/tic/Documents/slt_table5.html")
|
|||
|
page = remove_tags(str(page.text))
|
|||
|
page = page.split()
|
|||
|
|
|||
|
df = parse_treasury_data(page)
|
|||
|
|
|||
|
# =================== 连接数据库 =====================
|
|||
|
db = pymysql.connect(
|
|||
|
host="127.0.0.1",
|
|||
|
user="root",
|
|||
|
password="2GS@bPYcgiMyL14A",
|
|||
|
database="Macroeconomics",
|
|||
|
port=4423
|
|||
|
)
|
|||
|
cursor = db.cursor()
|
|||
|
|
|||
|
# 查询数据库中最新日期
|
|||
|
cursor.execute("SELECT date FROM FBI ORDER BY date DESC LIMIT 1")
|
|||
|
result = cursor.fetchone()
|
|||
|
latest_date_in_db = result[0] if result else None # datetime 类型或 None
|
|||
|
|
|||
|
# =================== 补齐逻辑 =====================
|
|||
|
for col in df.columns[1:]: # 遍历所有月份列
|
|||
|
col_date = datetime.strptime(col, "%Y-%m")
|
|||
|
|
|||
|
# 如果数据库已有该日期,跳过
|
|||
|
if latest_date_in_db and col_date <= latest_date_in_db:
|
|||
|
continue
|
|||
|
|
|||
|
print(f"正在插入 {col} 的数据...")
|
|||
|
insert_sql = "INSERT INTO FBI (date, name, value) VALUES (%s, %s, %s)"
|
|||
|
for _, row in df.iterrows():
|
|||
|
country = row["Country"]
|
|||
|
value = row[col]
|
|||
|
cursor.execute(insert_sql, (col_date.strftime("%Y-%m-01"), country, value))
|
|||
|
|
|||
|
db.commit()
|
|||
|
print(f"{col} 插入完成")
|
|||
|
|
|||
|
cursor.close()
|
|||
|
db.close()
|
|||
|
print("=== 本次任务完成 ===\n")
|
|||
|
|
|||
|
|
|||
|
# =================== 循环执行 =====================
|
|||
|
if __name__ == "__main__":
|
|||
|
while True:
|
|||
|
run_job()
|
|||
|
print("休眠 21600 秒(6 小时)...\n")
|
|||
|
time.sleep(21600) # 6小时
|