98 lines
2.9 KiB
Python
98 lines
2.9 KiB
Python
import requests
|
||
import pymysql
|
||
from datetime import datetime
|
||
from w3lib.html import remove_tags
|
||
import pandas as pd
|
||
import time
|
||
|
||
def parse_treasury_data(data):
|
||
# 找到列头位置
|
||
header_index = data.index("Country")
|
||
columns = data[header_index:header_index+14] # Country + 13个月
|
||
rows = data[header_index+14:]
|
||
|
||
result = []
|
||
i = 0
|
||
while i < len(rows):
|
||
# 拼接国家名
|
||
country_parts = []
|
||
while i < len(rows) and not rows[i].replace('.', '', 1).isdigit():
|
||
country_parts.append(rows[i])
|
||
i += 1
|
||
country = " ".join(country_parts).replace(",", "")
|
||
|
||
# 取13个数值
|
||
values = rows[i:i+13]
|
||
i += 13
|
||
|
||
if len(values) == 13:
|
||
result.append([country] + values)
|
||
|
||
# 转成 DataFrame
|
||
df = pd.DataFrame(result, columns=columns)
|
||
|
||
# =================== 名称清洗 ===================
|
||
rename_map = {
|
||
"Of Which: Foreign Official": "Foreign Official",
|
||
"Of Which: Foreign Official Treasury Bills": "Treasury Bills",
|
||
"Of Which: Foreign Official T-Bonds & Notes": "T-Bonds & Notes"
|
||
}
|
||
df["Country"] = df["Country"].replace(rename_map)
|
||
|
||
return df
|
||
|
||
|
||
def run_job():
|
||
print("=== 开始爬取并更新数据库 ===")
|
||
|
||
# =================== 爬取网页 =====================
|
||
page = requests.get("https://ticdata.treasury.gov/resource-center/data-chart-center/tic/Documents/slt_table5.html")
|
||
page = remove_tags(str(page.text))
|
||
page = page.split()
|
||
|
||
df = parse_treasury_data(page)
|
||
|
||
# =================== 连接数据库 =====================
|
||
db = pymysql.connect(
|
||
host="127.0.0.1",
|
||
user="root",
|
||
password="2GS@bPYcgiMyL14A",
|
||
database="Macroeconomics",
|
||
port=4423
|
||
)
|
||
cursor = db.cursor()
|
||
|
||
# 查询数据库中最新日期
|
||
cursor.execute("SELECT date FROM FBI ORDER BY date DESC LIMIT 1")
|
||
result = cursor.fetchone()
|
||
latest_date_in_db = result[0] if result else None # datetime 类型或 None
|
||
|
||
# =================== 补齐逻辑 =====================
|
||
for col in df.columns[1:]: # 遍历所有月份列
|
||
col_date = datetime.strptime(col, "%Y-%m")
|
||
|
||
# 如果数据库已有该日期,跳过
|
||
if latest_date_in_db and col_date <= latest_date_in_db:
|
||
continue
|
||
|
||
print(f"正在插入 {col} 的数据...")
|
||
insert_sql = "INSERT INTO FBI (date, name, value) VALUES (%s, %s, %s)"
|
||
for _, row in df.iterrows():
|
||
country = row["Country"]
|
||
value = row[col]
|
||
cursor.execute(insert_sql, (col_date.strftime("%Y-%m-01"), country, value))
|
||
|
||
db.commit()
|
||
print(f"{col} 插入完成")
|
||
|
||
cursor.close()
|
||
db.close()
|
||
print("=== 本次任务完成 ===\n")
|
||
|
||
|
||
# =================== 循环执行 =====================
|
||
if __name__ == "__main__":
|
||
while True:
|
||
run_job()
|
||
print("休眠 21600 秒(6 小时)...\n")
|
||
time.sleep(21600) # 6小时 |