coinbus-data/coinbus/Macroeconomic_PCE_v3.py

118 lines
4.3 KiB
Python
Raw Permalink Normal View History

2025-08-29 08:52:11 +00:00
import requests
import pymysql
from datetime import datetime
import time
BEA_USER_ID = "146B5757-D9E3-442C-B6AC-ADE9E6B71114"
YEARS = ["2023","2024","2025"] # 第一次运行抓全部年份
SLEEP_SECONDS = 21600 # 6小时
def get_bea_data(year):
"""抓取指定年份的季度数据"""
url = (
f'https://apps.bea.gov/api/data?UserID={BEA_USER_ID}'
f'&method=GetData&datasetname=NIPA&TableName=T10105&Frequency=Q'
f'&Year={year}&ResultFormat=JSON'
)
response = requests.get(url)
return response.json()['BEAAPI']['Results']['Data']
def update_database(cursor, data):
"""整理并插入缺失季度数据"""
# 查询数据库已存在的季度
cursor.execute("SELECT times FROM PCE")
existing_pce = {row[0] for row in cursor.fetchall()}
cursor.execute("SELECT times FROM GPDI")
existing_gpdi = {row[0] for row in cursor.fetchall()}
cursor.execute("SELECT times FROM NETEXP")
existing_netexp = {row[0] for row in cursor.fetchall()}
# 整理数据
pce_data, gpdi_data, netexp_data = {}, {}, {}
for entry in data:
t = entry["TimePeriod"]
desc = entry["LineDescription"]
val = entry["DataValue"]
if desc == "Personal consumption expenditures":
pce_data.setdefault(t, {})["PCE"] = val
elif desc == "Durable goods":
pce_data.setdefault(t, {})["PCEDG"] = val
elif desc == "Nondurable goods":
pce_data.setdefault(t, {})["PCEND"] = val
elif desc == "Services" and entry["LineNumber"] == '6':
pce_data.setdefault(t, {})["PCES"] = val
elif desc == "Gross private domestic investment":
gpdi_data.setdefault(t, {})["GPDI"] = val
elif desc == "Fixed investment":
gpdi_data.setdefault(t, {})["FPI"] = val
elif desc == "Change in private inventories":
gpdi_data.setdefault(t, {})["CBI"] = val
elif desc == "Net exports of goods and services":
netexp_data.setdefault(t, {})["NETEXP"] = val
elif desc == "Imports":
netexp_data.setdefault(t, {})["IMPGS"] = val
elif desc == "Exports":
netexp_data.setdefault(t, {})["EXPGS"] = val
# 插入数据库缺失数据
for t, vals in pce_data.items():
if t not in existing_pce:
cursor.execute(
"INSERT INTO PCE (times, PCE, PCEDG, PCEND, PCES) VALUES (%s,%s,%s,%s,%s)",
(t, vals.get("PCE"), vals.get("PCEDG"), vals.get("PCEND"), vals.get("PCES"))
)
for t, vals in gpdi_data.items():
if t not in existing_gpdi:
cursor.execute(
"INSERT INTO GPDI (times, GPDI, FPI, CBI) VALUES (%s,%s,%s,%s)",
(t, vals.get("GPDI"), vals.get("FPI"), vals.get("CBI"))
)
for t, vals in netexp_data.items():
if t not in existing_netexp:
cursor.execute(
"INSERT INTO NETEXP (times, NETEXP, IMPGS, EXPGS) VALUES (%s,%s,%s,%s)",
(t, vals.get("NETEXP"), vals.get("IMPGS"), vals.get("EXPGS"))
)
def run_job(first_run=False):
"""运行一次抓取和更新"""
print(f"[{datetime.now()}] 开始抓取 BEA 数据并更新数据库...")
try:
db = pymysql.connect(
host="127.0.0.1",
user="root",
password="2GS@bPYcgiMyL14A",
database="Macroeconomics",
port=4423
)
cursor = db.cursor()
years_to_fetch = YEARS if first_run else [YEARS[-1]] # 第一次抓全部年份,否则只抓最新年份
for year in years_to_fetch:
data = get_bea_data(year)
update_database(cursor, data)
db.commit()
print(f"[{datetime.now()}] {year} 数据更新完成")
except pymysql.MySQLError as e:
print(f"[{datetime.now()}] 数据库错误: {e}")
except Exception as e:
print(f"[{datetime.now()}] 其他错误: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'db' in locals():
db.close()
print(f"[{datetime.now()}] 本次任务完成。\n")
if __name__ == "__main__":
first_run = True
while True:
run_job(first_run)
first_run = False # 之后循环只抓最新季度
print(f"[{datetime.now()}] 休眠 {SLEEP_SECONDS}6小时...\n")
time.sleep(SLEEP_SECONDS)