import requests import pymysql from datetime import datetime import time BEA_USER_ID = "146B5757-D9E3-442C-B6AC-ADE9E6B71114" YEARS = ["2023","2024","2025"] # 第一次运行抓全部年份 SLEEP_SECONDS = 21600 # 6小时 def get_bea_data(year): """抓取指定年份的季度数据""" url = ( f'https://apps.bea.gov/api/data?UserID={BEA_USER_ID}' f'&method=GetData&datasetname=NIPA&TableName=T10105&Frequency=Q' f'&Year={year}&ResultFormat=JSON' ) response = requests.get(url) return response.json()['BEAAPI']['Results']['Data'] def update_database(cursor, data): """整理并插入缺失季度数据""" # 查询数据库已存在的季度 cursor.execute("SELECT times FROM PCE") existing_pce = {row[0] for row in cursor.fetchall()} cursor.execute("SELECT times FROM GPDI") existing_gpdi = {row[0] for row in cursor.fetchall()} cursor.execute("SELECT times FROM NETEXP") existing_netexp = {row[0] for row in cursor.fetchall()} # 整理数据 pce_data, gpdi_data, netexp_data = {}, {}, {} for entry in data: t = entry["TimePeriod"] desc = entry["LineDescription"] val = entry["DataValue"] if desc == "Personal consumption expenditures": pce_data.setdefault(t, {})["PCE"] = val elif desc == "Durable goods": pce_data.setdefault(t, {})["PCEDG"] = val elif desc == "Nondurable goods": pce_data.setdefault(t, {})["PCEND"] = val elif desc == "Services" and entry["LineNumber"] == '6': pce_data.setdefault(t, {})["PCES"] = val elif desc == "Gross private domestic investment": gpdi_data.setdefault(t, {})["GPDI"] = val elif desc == "Fixed investment": gpdi_data.setdefault(t, {})["FPI"] = val elif desc == "Change in private inventories": gpdi_data.setdefault(t, {})["CBI"] = val elif desc == "Net exports of goods and services": netexp_data.setdefault(t, {})["NETEXP"] = val elif desc == "Imports": netexp_data.setdefault(t, {})["IMPGS"] = val elif desc == "Exports": netexp_data.setdefault(t, {})["EXPGS"] = val # 插入数据库缺失数据 for t, vals in pce_data.items(): if t not in existing_pce: cursor.execute( "INSERT INTO PCE (times, PCE, PCEDG, PCEND, PCES) VALUES (%s,%s,%s,%s,%s)", (t, vals.get("PCE"), vals.get("PCEDG"), vals.get("PCEND"), vals.get("PCES")) ) for t, vals in gpdi_data.items(): if t not in existing_gpdi: cursor.execute( "INSERT INTO GPDI (times, GPDI, FPI, CBI) VALUES (%s,%s,%s,%s)", (t, vals.get("GPDI"), vals.get("FPI"), vals.get("CBI")) ) for t, vals in netexp_data.items(): if t not in existing_netexp: cursor.execute( "INSERT INTO NETEXP (times, NETEXP, IMPGS, EXPGS) VALUES (%s,%s,%s,%s)", (t, vals.get("NETEXP"), vals.get("IMPGS"), vals.get("EXPGS")) ) def run_job(first_run=False): """运行一次抓取和更新""" print(f"[{datetime.now()}] 开始抓取 BEA 数据并更新数据库...") try: db = pymysql.connect( host="127.0.0.1", user="root", password="2GS@bPYcgiMyL14A", database="Macroeconomics", port=4423 ) cursor = db.cursor() years_to_fetch = YEARS if first_run else [YEARS[-1]] # 第一次抓全部年份,否则只抓最新年份 for year in years_to_fetch: data = get_bea_data(year) update_database(cursor, data) db.commit() print(f"[{datetime.now()}] {year} 数据更新完成") except pymysql.MySQLError as e: print(f"[{datetime.now()}] 数据库错误: {e}") except Exception as e: print(f"[{datetime.now()}] 其他错误: {e}") finally: if 'cursor' in locals(): cursor.close() if 'db' in locals(): db.close() print(f"[{datetime.now()}] 本次任务完成。\n") if __name__ == "__main__": first_run = True while True: run_job(first_run) first_run = False # 之后循环只抓最新季度 print(f"[{datetime.now()}] 休眠 {SLEEP_SECONDS} 秒(6小时)...\n") time.sleep(SLEEP_SECONDS)