coinbus-data/coinbus/Macroeconomic_PCE_v3.py

118 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import pymysql
from datetime import datetime
import time
BEA_USER_ID = "146B5757-D9E3-442C-B6AC-ADE9E6B71114"
YEARS = ["2023","2024","2025"] # 第一次运行抓全部年份
SLEEP_SECONDS = 21600 # 6小时
def get_bea_data(year):
"""抓取指定年份的季度数据"""
url = (
f'https://apps.bea.gov/api/data?UserID={BEA_USER_ID}'
f'&method=GetData&datasetname=NIPA&TableName=T10105&Frequency=Q'
f'&Year={year}&ResultFormat=JSON'
)
response = requests.get(url)
return response.json()['BEAAPI']['Results']['Data']
def update_database(cursor, data):
"""整理并插入缺失季度数据"""
# 查询数据库已存在的季度
cursor.execute("SELECT times FROM PCE")
existing_pce = {row[0] for row in cursor.fetchall()}
cursor.execute("SELECT times FROM GPDI")
existing_gpdi = {row[0] for row in cursor.fetchall()}
cursor.execute("SELECT times FROM NETEXP")
existing_netexp = {row[0] for row in cursor.fetchall()}
# 整理数据
pce_data, gpdi_data, netexp_data = {}, {}, {}
for entry in data:
t = entry["TimePeriod"]
desc = entry["LineDescription"]
val = entry["DataValue"]
if desc == "Personal consumption expenditures":
pce_data.setdefault(t, {})["PCE"] = val
elif desc == "Durable goods":
pce_data.setdefault(t, {})["PCEDG"] = val
elif desc == "Nondurable goods":
pce_data.setdefault(t, {})["PCEND"] = val
elif desc == "Services" and entry["LineNumber"] == '6':
pce_data.setdefault(t, {})["PCES"] = val
elif desc == "Gross private domestic investment":
gpdi_data.setdefault(t, {})["GPDI"] = val
elif desc == "Fixed investment":
gpdi_data.setdefault(t, {})["FPI"] = val
elif desc == "Change in private inventories":
gpdi_data.setdefault(t, {})["CBI"] = val
elif desc == "Net exports of goods and services":
netexp_data.setdefault(t, {})["NETEXP"] = val
elif desc == "Imports":
netexp_data.setdefault(t, {})["IMPGS"] = val
elif desc == "Exports":
netexp_data.setdefault(t, {})["EXPGS"] = val
# 插入数据库缺失数据
for t, vals in pce_data.items():
if t not in existing_pce:
cursor.execute(
"INSERT INTO PCE (times, PCE, PCEDG, PCEND, PCES) VALUES (%s,%s,%s,%s,%s)",
(t, vals.get("PCE"), vals.get("PCEDG"), vals.get("PCEND"), vals.get("PCES"))
)
for t, vals in gpdi_data.items():
if t not in existing_gpdi:
cursor.execute(
"INSERT INTO GPDI (times, GPDI, FPI, CBI) VALUES (%s,%s,%s,%s)",
(t, vals.get("GPDI"), vals.get("FPI"), vals.get("CBI"))
)
for t, vals in netexp_data.items():
if t not in existing_netexp:
cursor.execute(
"INSERT INTO NETEXP (times, NETEXP, IMPGS, EXPGS) VALUES (%s,%s,%s,%s)",
(t, vals.get("NETEXP"), vals.get("IMPGS"), vals.get("EXPGS"))
)
def run_job(first_run=False):
"""运行一次抓取和更新"""
print(f"[{datetime.now()}] 开始抓取 BEA 数据并更新数据库...")
try:
db = pymysql.connect(
host="127.0.0.1",
user="root",
password="2GS@bPYcgiMyL14A",
database="Macroeconomics",
port=4423
)
cursor = db.cursor()
years_to_fetch = YEARS if first_run else [YEARS[-1]] # 第一次抓全部年份,否则只抓最新年份
for year in years_to_fetch:
data = get_bea_data(year)
update_database(cursor, data)
db.commit()
print(f"[{datetime.now()}] {year} 数据更新完成")
except pymysql.MySQLError as e:
print(f"[{datetime.now()}] 数据库错误: {e}")
except Exception as e:
print(f"[{datetime.now()}] 其他错误: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'db' in locals():
db.close()
print(f"[{datetime.now()}] 本次任务完成。\n")
if __name__ == "__main__":
first_run = True
while True:
run_job(first_run)
first_run = False # 之后循环只抓最新季度
print(f"[{datetime.now()}] 休眠 {SLEEP_SECONDS}6小时...\n")
time.sleep(SLEEP_SECONDS)