coinbus-data/coinbus/btc_price_fetcher.py

85 lines
2.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import requests
import pymysql
from datetime import datetime
# MySQL配置
DB_CONFIG = {
"host": "127.0.0.1",
"user": "root",
"password": "2GS@bPYcgiMyL14A",
"database": "btcdb",
"port": 4423
}
# 获取当前时间戳
def get_current_timestamp():
return int(time.time())
# 获取API1的BTC价格示例币安
def get_binance_price():
url = "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT"
resp = requests.get(url, timeout=5)
resp.raise_for_status()
data = resp.json()
return float(data["price"])
# 获取API2的BTC价格示例Coinbase
def get_coinbase_price():
url = "https://api.coinbase.com/v2/prices/spot?currency=USD"
resp = requests.get(url, timeout=5)
resp.raise_for_status()
data = resp.json()
return float(data["data"]["amount"])
# 更新或插入价格
def upsert_price(source, price, timestamp):
connection = pymysql.connect(**DB_CONFIG)
try:
with connection.cursor() as cursor:
# 先判断该 source 是否已存在
sql_check = "SELECT id FROM btc_realtime_prices WHERE source = %s"
cursor.execute(sql_check, (source,))
result = cursor.fetchone()
if result:
# 已存在,执行更新
sql_update = """
UPDATE btc_realtime_prices
SET price = %s, timestamp = %s
WHERE source = %s
"""
cursor.execute(sql_update, (price, timestamp, source))
else:
# 不存在,执行插入
sql_insert = """
INSERT INTO btc_realtime_prices (timestamp, source, price)
VALUES (%s, %s, %s)
"""
cursor.execute(sql_insert, (timestamp, source, price))
connection.commit()
finally:
connection.close()
def main():
while True:
now_ts = get_current_timestamp()
try:
binance_price = get_binance_price()
print(f"Binance BTC Price: {binance_price}")
upsert_price("binance", binance_price, now_ts)
except Exception as e:
print(f"获取Binance价格失败: {e}")
try:
coinbase_price = get_coinbase_price()
print(f"Coinbase BTC Price: {coinbase_price}")
upsert_price("coinbase", coinbase_price, now_ts)
except Exception as e:
print(f"获取Coinbase价格失败: {e}")
time.sleep(60) # 每分钟执行一次
if __name__ == "__main__":
main()