Files
coinbus-data/coinbus/btc_update.py
2025-10-17 16:15:17 +08:00

126 lines
3.5 KiB
Python

import pymysql
import json
import os
import time
from datetime import datetime, timedelta
# 数据库配置
DB_CONFIG = {
"host": "192.168.194.240",
"user": "root",
"password": "2GS@bPYcgiMyL14A",
"database": "btcdb",
"port": 4423,
"connect_timeout": 60,
"read_timeout": 60,
"write_timeout": 60,
"charset": "utf8mb4"
}
# 数据文件路径
DATA_FILE = "btc_historical_price.py"
# 定时任务间隔(秒)—— 例如 3600 为每小时更新一次
INTERVAL = 28800
def get_new_prices(source, last_timestamp=None):
"""
从数据库获取 source 数据源的最新价格
仅每天北京时间 08:00:00 的数据减 8 小时存入文件
"""
conn = pymysql.connect(**DB_CONFIG)
prices = {}
try:
with conn.cursor() as cursor:
if last_timestamp:
sql = """
SELECT timestamp, price
FROM btc_prices
WHERE source = %s AND timestamp > %s
ORDER BY timestamp
"""
cursor.execute(sql, (source, last_timestamp))
else:
sql = """
SELECT timestamp, price
FROM btc_prices
WHERE source = %s
ORDER BY timestamp
"""
cursor.execute(sql, (source,))
rows = cursor.fetchall()
for timestamp, price in rows:
ts_int = int(timestamp)
# 转换为北京时间
dt_beijing = datetime.utcfromtimestamp(ts_int) + timedelta(hours=8)
# 如果是每天 08:00:00 北京时间,则减 8 小时
if dt_beijing.hour == 8 and dt_beijing.minute == 0 and dt_beijing.second == 0:
ts_int -= 8 * 3600
prices[str(ts_int)] = float(price)
finally:
conn.close()
return prices
def load_existing_data():
"""加载历史价格数据"""
if not os.path.exists(DATA_FILE):
return {}, {}
try:
with open(DATA_FILE, "r", encoding="utf-8") as f:
ns = {}
exec(f.read(), ns)
return ns.get("prices_temp", {}), ns.get("prices", {})
except Exception:
return {}, {}
def save_prices(prices_temp, prices):
"""保存价格数据到文件"""
with open(DATA_FILE, "w", encoding="utf-8") as f:
f.write("# 自动生成的BTC历史价格数据文件\n")
f.write(f"# 更新时间: {datetime.now()}\n\n")
f.write("prices_temp = ")
f.write(json.dumps(prices_temp, indent=4, ensure_ascii=False))
f.write("\n\nprices = ")
f.write(json.dumps(prices, indent=4, ensure_ascii=False))
f.write("\n")
def get_last_timestamp(price_dict):
"""获取当前字典中最大的时间戳"""
if not price_dict:
return None
return max(int(ts) for ts in price_dict.keys())
def update_once():
"""执行一次更新流程"""
prices_temp, prices = load_existing_data()
last_nasdaq_ts = get_last_timestamp(prices_temp)
last_crypto_ts = get_last_timestamp(prices)
nasdaq_new = get_new_prices("Nasdaq", last_nasdaq_ts)
crypto_new = get_new_prices("CryptoCompare", last_crypto_ts)
prices_temp.update(nasdaq_new)
prices.update(crypto_new)
save_prices(prices_temp, prices)
def main():
"""主循环任务"""
while True:
try:
update_once()
except Exception:
pass
time.sleep(INTERVAL)
if __name__ == "__main__":
main()