118 lines
4.3 KiB
Python
118 lines
4.3 KiB
Python
import requests
|
||
import pymysql
|
||
from datetime import datetime
|
||
import time
|
||
|
||
BEA_USER_ID = "146B5757-D9E3-442C-B6AC-ADE9E6B71114"
|
||
YEARS = ["2023","2024","2025"] # 第一次运行抓全部年份
|
||
SLEEP_SECONDS = 21600 # 6小时
|
||
|
||
def get_bea_data(year):
|
||
"""抓取指定年份的季度数据"""
|
||
url = (
|
||
f'https://apps.bea.gov/api/data?UserID={BEA_USER_ID}'
|
||
f'&method=GetData&datasetname=NIPA&TableName=T10105&Frequency=Q'
|
||
f'&Year={year}&ResultFormat=JSON'
|
||
)
|
||
response = requests.get(url)
|
||
return response.json()['BEAAPI']['Results']['Data']
|
||
|
||
def update_database(cursor, data):
|
||
"""整理并插入缺失季度数据"""
|
||
# 查询数据库已存在的季度
|
||
cursor.execute("SELECT times FROM PCE")
|
||
existing_pce = {row[0] for row in cursor.fetchall()}
|
||
cursor.execute("SELECT times FROM GPDI")
|
||
existing_gpdi = {row[0] for row in cursor.fetchall()}
|
||
cursor.execute("SELECT times FROM NETEXP")
|
||
existing_netexp = {row[0] for row in cursor.fetchall()}
|
||
|
||
# 整理数据
|
||
pce_data, gpdi_data, netexp_data = {}, {}, {}
|
||
for entry in data:
|
||
t = entry["TimePeriod"]
|
||
desc = entry["LineDescription"]
|
||
val = entry["DataValue"]
|
||
|
||
if desc == "Personal consumption expenditures":
|
||
pce_data.setdefault(t, {})["PCE"] = val
|
||
elif desc == "Durable goods":
|
||
pce_data.setdefault(t, {})["PCEDG"] = val
|
||
elif desc == "Nondurable goods":
|
||
pce_data.setdefault(t, {})["PCEND"] = val
|
||
elif desc == "Services" and entry["LineNumber"] == '6':
|
||
pce_data.setdefault(t, {})["PCES"] = val
|
||
|
||
elif desc == "Gross private domestic investment":
|
||
gpdi_data.setdefault(t, {})["GPDI"] = val
|
||
elif desc == "Fixed investment":
|
||
gpdi_data.setdefault(t, {})["FPI"] = val
|
||
elif desc == "Change in private inventories":
|
||
gpdi_data.setdefault(t, {})["CBI"] = val
|
||
|
||
elif desc == "Net exports of goods and services":
|
||
netexp_data.setdefault(t, {})["NETEXP"] = val
|
||
elif desc == "Imports":
|
||
netexp_data.setdefault(t, {})["IMPGS"] = val
|
||
elif desc == "Exports":
|
||
netexp_data.setdefault(t, {})["EXPGS"] = val
|
||
|
||
# 插入数据库缺失数据
|
||
for t, vals in pce_data.items():
|
||
if t not in existing_pce:
|
||
cursor.execute(
|
||
"INSERT INTO PCE (times, PCE, PCEDG, PCEND, PCES) VALUES (%s,%s,%s,%s,%s)",
|
||
(t, vals.get("PCE"), vals.get("PCEDG"), vals.get("PCEND"), vals.get("PCES"))
|
||
)
|
||
for t, vals in gpdi_data.items():
|
||
if t not in existing_gpdi:
|
||
cursor.execute(
|
||
"INSERT INTO GPDI (times, GPDI, FPI, CBI) VALUES (%s,%s,%s,%s)",
|
||
(t, vals.get("GPDI"), vals.get("FPI"), vals.get("CBI"))
|
||
)
|
||
for t, vals in netexp_data.items():
|
||
if t not in existing_netexp:
|
||
cursor.execute(
|
||
"INSERT INTO NETEXP (times, NETEXP, IMPGS, EXPGS) VALUES (%s,%s,%s,%s)",
|
||
(t, vals.get("NETEXP"), vals.get("IMPGS"), vals.get("EXPGS"))
|
||
)
|
||
|
||
def run_job(first_run=False):
|
||
"""运行一次抓取和更新"""
|
||
print(f"[{datetime.now()}] 开始抓取 BEA 数据并更新数据库...")
|
||
try:
|
||
db = pymysql.connect(
|
||
host="127.0.0.1",
|
||
user="root",
|
||
password="2GS@bPYcgiMyL14A",
|
||
database="Macroeconomics",
|
||
port=4423
|
||
)
|
||
cursor = db.cursor()
|
||
|
||
years_to_fetch = YEARS if first_run else [YEARS[-1]] # 第一次抓全部年份,否则只抓最新年份
|
||
for year in years_to_fetch:
|
||
data = get_bea_data(year)
|
||
update_database(cursor, data)
|
||
db.commit()
|
||
print(f"[{datetime.now()}] {year} 数据更新完成")
|
||
|
||
except pymysql.MySQLError as e:
|
||
print(f"[{datetime.now()}] 数据库错误: {e}")
|
||
except Exception as e:
|
||
print(f"[{datetime.now()}] 其他错误: {e}")
|
||
finally:
|
||
if 'cursor' in locals():
|
||
cursor.close()
|
||
if 'db' in locals():
|
||
db.close()
|
||
print(f"[{datetime.now()}] 本次任务完成。\n")
|
||
|
||
if __name__ == "__main__":
|
||
first_run = True
|
||
while True:
|
||
run_job(first_run)
|
||
first_run = False # 之后循环只抓最新季度
|
||
print(f"[{datetime.now()}] 休眠 {SLEEP_SECONDS} 秒(6小时)...\n")
|
||
time.sleep(SLEEP_SECONDS)
|