import time import requests import pymysql import ujson from datetime import datetime, timedelta # MySQL 连接信息 DB_CONFIG = { "host": "127.0.0.1", "user": "root", "password": "2GS@bPYcgiMyL14A", "database": "btcdb", "port": 4423 } # Nasdaq API Key NASDAQ_API_KEY = "FZqXog4sR-b7cYnXcRVV" # 获取已存在的时间戳 def get_existing_timestamps(): connection = pymysql.connect(**DB_CONFIG) existing_timestamps = set() try: with connection.cursor() as cursor: cursor.execute("SELECT timestamp, source FROM btc_prices") for row in cursor.fetchall(): existing_timestamps.add((row[0], row[1])) finally: connection.close() return existing_timestamps # 工具函数:将任意时间戳调整为北京时间当日 08:00 的时间戳 def adjust_to_beijing_08am(timestamp): dt = datetime.utcfromtimestamp(timestamp) + timedelta(hours=8) dt_08am = datetime(dt.year, dt.month, dt.day, 8, 0, 0) return int((dt_08am - timedelta(hours=8)).timestamp()) # 转回 UTC 存储 # Nasdaq 获取历史 BTC 美元价格 def get_nasdaq_price(): prices = {} url = f'https://data.nasdaq.com/api/v3/datatables/QDL/BCHAIN?code=MKPRU&api_key={NASDAQ_API_KEY}' response = requests.get(url) if response.status_code == 200: data = ujson.loads(response.content) if "datatable" in data and "data" in data["datatable"]: for item in data["datatable"]["data"]: daystr = item[1] price = item[2] dt = datetime.strptime(daystr, "%Y-%m-%d") dt_08am_bj = datetime(dt.year, dt.month, dt.day, 8, 0, 0) dt_08am_utc = dt_08am_bj - timedelta(hours=8) prices[int(dt_08am_utc.timestamp())] = float(price) print(f"Nasdaq 获取数据量: {len(prices)} 条") return prices # CryptoCompare 获取 BTC 历史每日收盘价(时间强制统一为北京时间 08:00) def get_cryptocompare_price(): url = "https://min-api.cryptocompare.com/data/v2/histoday" limit = 2000 to_ts = int(time.time()) prices = {} while True: params = { "fsym": "BTC", "tsym": "USD", "limit": limit, "toTs": to_ts } print(f"请求 CryptoCompare: {params}") response = requests.get(url, params=params) if response.status_code != 200: print("请求失败:", response.status_code) break data = ujson.loads(response.content) if data["Response"] != "Success": print("API 返回错误:", data.get("Message")) break entries = data["Data"]["Data"] if not entries: break for entry in entries: raw_ts = entry["time"] price = entry["close"] adjusted_ts = adjust_to_beijing_08am(raw_ts) prices[adjusted_ts] = price earliest = entries[0]["time"] if earliest <= 1279300000: # 大约2010年7月 break to_ts = earliest - 1 time.sleep(1) print(f"CryptoCompare 获取数据量: {len(prices)} 条") return prices # 保存数据到数据库 def save_to_database(data, source): existing_timestamps = get_existing_timestamps() connection = pymysql.connect(**DB_CONFIG) new_data_count = 0 try: with connection.cursor() as cursor: sql = """ INSERT INTO btc_prices (timestamp, price, source) VALUES (%s, %s, %s) """ for timestamp, price in data.items(): if (timestamp, source) not in existing_timestamps: try: cursor.execute(sql, (timestamp, price, source)) new_data_count += 1 except pymysql.MySQLError as e: print(f"插入错误: {e}") continue connection.commit() print(f"成功存入 {new_data_count} 条新数据({source})") finally: connection.close() # 定时任务 def fetch_and_store_data(): print("========== 开始获取比特币价格数据 ==========") # Nasdaq nasdaq_prices = get_nasdaq_price() save_to_database(nasdaq_prices, "Nasdaq") # CryptoCompare cc_prices = get_cryptocompare_price() save_to_database(cc_prices, "CryptoCompare") print("========== 数据存储完成 ==========") if __name__ == "__main__": while True: fetch_and_store_data() time.sleep(14400) # 每 4 小时执行一次