97 lines
3.4 KiB
Python
97 lines
3.4 KiB
Python
|
import pymysql
|
||
|
import time
|
||
|
from selenium import webdriver
|
||
|
from selenium.webdriver.chrome.options import Options
|
||
|
from selenium.webdriver.chrome.service import Service
|
||
|
from w3lib.html import remove_tags
|
||
|
from bs4 import BeautifulSoup
|
||
|
from datetime import datetime
|
||
|
|
||
|
# 配置 Selenium
|
||
|
chrome_options = Options()
|
||
|
chrome_options.add_argument("--headless")
|
||
|
chrome_options.add_argument('--no-sandbox')
|
||
|
chrome_options.add_argument('--disable-gpu')
|
||
|
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
|
chrome_options.add_argument('blink-settings=imagesEnabled=false')
|
||
|
|
||
|
# 在 Selenium 3 中,直接指定 chrome_options 参数即可
|
||
|
browser = webdriver.Chrome(executable_path="chromedriver", options=chrome_options)
|
||
|
# 将月份映射整理成一个字典
|
||
|
MONTH_MAPPING = {
|
||
|
"Jan": "/1/", "Feb": "/2/", "Mar": "/3/", "Apr": "/4/",
|
||
|
"May": "/5/", "Jun": "/6/", "Jul": "/7/", "Aug": "/8/",
|
||
|
"Sep": "/9/", "Oct": "/10/", "Nov": "/11/", "Dec": "/12/"
|
||
|
}
|
||
|
|
||
|
# 数据库连接配置
|
||
|
DB_CONFIG = {
|
||
|
"host": "127.0.0.1",
|
||
|
"user": "root",
|
||
|
"password": "2GS@bPYcgiMyL14A",
|
||
|
"database": "Macroeconomics",
|
||
|
"port": 4423
|
||
|
}
|
||
|
|
||
|
def fetch_web_data():
|
||
|
"""抓取网页数据并解析日期和利率数据"""
|
||
|
browser.get("https://www.federalreserve.gov/releases/h15/")
|
||
|
soup = BeautifulSoup(browser.page_source, 'html.parser')
|
||
|
|
||
|
# 获取日期
|
||
|
date_text = soup.find_all('th', class_="colhead sticky sticky-row-cell")[-1].get_text(strip=True)
|
||
|
for month, replacement in MONTH_MAPPING.items():
|
||
|
date_text = date_text.replace(month, replacement)
|
||
|
date = datetime.strptime(date_text.replace('*', ''), '%Y/%m/%d')
|
||
|
|
||
|
# 获取利率数据
|
||
|
data = [remove_tags(str(td)).strip() for td in soup.find_all('td', class_="data")]
|
||
|
return date, data
|
||
|
|
||
|
def get_latest_db_date():
|
||
|
"""从数据库获取最新日期"""
|
||
|
with pymysql.connect(**DB_CONFIG) as conn:
|
||
|
with conn.cursor() as cursor:
|
||
|
cursor.execute("SELECT MAX(date) FROM USTreasuriesYields")
|
||
|
result = cursor.fetchone()
|
||
|
return result[0] if result[0] else None
|
||
|
|
||
|
def insert_data(date, rates, mprime, dpcredit):
|
||
|
"""插入数据到数据库"""
|
||
|
with pymysql.connect(**DB_CONFIG) as conn:
|
||
|
with conn.cursor() as cursor:
|
||
|
sql_treasuries = """
|
||
|
INSERT INTO USTreasuriesYields (date, 1_Mo, 3_Mo, 6_Mo, 1_Yr, 2_Yr, 5_Yr, 10_Yr, 20_Yr, 30_Yr)
|
||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
|
"""
|
||
|
sql_interest_rate = """
|
||
|
INSERT INTO InterestRate (date, name, _value)
|
||
|
VALUES (%s, %s, %s)
|
||
|
"""
|
||
|
cursor.execute(sql_treasuries, [date] + rates)
|
||
|
cursor.execute(sql_interest_rate, (date, 'BPL', mprime))
|
||
|
cursor.execute(sql_interest_rate, (date, 'DWPC', dpcredit))
|
||
|
conn.commit()
|
||
|
|
||
|
def main():
|
||
|
while True:
|
||
|
new_date, data = fetch_web_data()
|
||
|
old_date = get_latest_db_date()
|
||
|
|
||
|
if old_date and new_date <= old_date:
|
||
|
time.sleep(21600) # 6小时
|
||
|
continue
|
||
|
|
||
|
# 提取利率数据
|
||
|
rates = [
|
||
|
data[i].replace('ND', 'NULL') if i < len(data) else 'NULL'
|
||
|
for i in [104, 109, 114, 119, 124, 134, 144, 149, 154]
|
||
|
]
|
||
|
mprime = data[54] if len(data) > 54 else 'NULL'
|
||
|
dpcredit = data[59] if len(data) > 59 else 'NULL'
|
||
|
|
||
|
insert_data(new_date, rates, mprime, dpcredit)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|