coinbus-data/coinbus/btc_prices.py

141 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import requests
import pymysql
import ujson
from datetime import datetime, timedelta
# MySQL 连接信息
DB_CONFIG = {
"host": "127.0.0.1",
"user": "root",
"password": "2GS@bPYcgiMyL14A",
"database": "btcdb",
"port": 4423
}
# Nasdaq API Key
NASDAQ_API_KEY = "FZqXog4sR-b7cYnXcRVV"
# 获取已存在的时间戳
def get_existing_timestamps():
connection = pymysql.connect(**DB_CONFIG)
existing_timestamps = set()
try:
with connection.cursor() as cursor:
cursor.execute("SELECT timestamp, source FROM btc_prices")
for row in cursor.fetchall():
existing_timestamps.add((row[0], row[1]))
finally:
connection.close()
return existing_timestamps
# 工具函数:将任意时间戳调整为北京时间当日 08:00 的时间戳
def adjust_to_beijing_08am(timestamp):
dt = datetime.utcfromtimestamp(timestamp) + timedelta(hours=8)
dt_08am = datetime(dt.year, dt.month, dt.day, 8, 0, 0)
return int((dt_08am - timedelta(hours=8)).timestamp()) # 转回 UTC 存储
# Nasdaq 获取历史 BTC 美元价格
def get_nasdaq_price():
prices = {}
url = f'https://data.nasdaq.com/api/v3/datatables/QDL/BCHAIN?code=MKPRU&api_key={NASDAQ_API_KEY}'
response = requests.get(url)
if response.status_code == 200:
data = ujson.loads(response.content)
if "datatable" in data and "data" in data["datatable"]:
for item in data["datatable"]["data"]:
daystr = item[1]
price = item[2]
dt = datetime.strptime(daystr, "%Y-%m-%d")
dt_08am_bj = datetime(dt.year, dt.month, dt.day, 8, 0, 0)
dt_08am_utc = dt_08am_bj - timedelta(hours=8)
prices[int(dt_08am_utc.timestamp())] = float(price)
print(f"Nasdaq 获取数据量: {len(prices)}")
return prices
# CryptoCompare 获取 BTC 历史每日收盘价(时间强制统一为北京时间 08:00
def get_cryptocompare_price():
url = "https://min-api.cryptocompare.com/data/v2/histoday"
limit = 2000
to_ts = int(time.time())
prices = {}
while True:
params = {
"fsym": "BTC",
"tsym": "USD",
"limit": limit,
"toTs": to_ts
}
print(f"请求 CryptoCompare: {params}")
response = requests.get(url, params=params)
if response.status_code != 200:
print("请求失败:", response.status_code)
break
data = ujson.loads(response.content)
if data["Response"] != "Success":
print("API 返回错误:", data.get("Message"))
break
entries = data["Data"]["Data"]
if not entries:
break
for entry in entries:
raw_ts = entry["time"]
price = entry["close"]
adjusted_ts = adjust_to_beijing_08am(raw_ts)
prices[adjusted_ts] = price
earliest = entries[0]["time"]
if earliest <= 1279300000: # 大约2010年7月
break
to_ts = earliest - 1
time.sleep(1)
print(f"CryptoCompare 获取数据量: {len(prices)}")
return prices
# 保存数据到数据库
def save_to_database(data, source):
existing_timestamps = get_existing_timestamps()
connection = pymysql.connect(**DB_CONFIG)
new_data_count = 0
try:
with connection.cursor() as cursor:
sql = """
INSERT INTO btc_prices (timestamp, price, source)
VALUES (%s, %s, %s)
"""
for timestamp, price in data.items():
if (timestamp, source) not in existing_timestamps:
try:
cursor.execute(sql, (timestamp, price, source))
new_data_count += 1
except pymysql.MySQLError as e:
print(f"插入错误: {e}")
continue
connection.commit()
print(f"成功存入 {new_data_count} 条新数据({source}")
finally:
connection.close()
# 定时任务
def fetch_and_store_data():
print("========== 开始获取比特币价格数据 ==========")
# Nasdaq
nasdaq_prices = get_nasdaq_price()
save_to_database(nasdaq_prices, "Nasdaq")
# CryptoCompare
cc_prices = get_cryptocompare_price()
save_to_database(cc_prices, "CryptoCompare")
print("========== 数据存储完成 ==========")
if __name__ == "__main__":
while True:
fetch_and_store_data()
time.sleep(14400) # 每 4 小时执行一次