import pymysql import json import os import time from datetime import datetime, timedelta # 数据库配置 DB_CONFIG = { "host": "192.168.194.240", "user": "root", "password": "2GS@bPYcgiMyL14A", "database": "btcdb", "port": 4423, "connect_timeout": 60, "read_timeout": 60, "write_timeout": 60, "charset": "utf8mb4" } # 数据文件路径 DATA_FILE = "btc_historical_price.py" # 定时任务间隔(秒)—— 例如 3600 为每小时更新一次 INTERVAL = 28800 def get_new_prices(source, last_timestamp=None): """ 从数据库获取 source 数据源的最新价格 仅每天北京时间 08:00:00 的数据减 8 小时存入文件 """ conn = pymysql.connect(**DB_CONFIG) prices = {} try: with conn.cursor() as cursor: if last_timestamp: sql = """ SELECT timestamp, price FROM btc_prices WHERE source = %s AND timestamp > %s ORDER BY timestamp """ cursor.execute(sql, (source, last_timestamp)) else: sql = """ SELECT timestamp, price FROM btc_prices WHERE source = %s ORDER BY timestamp """ cursor.execute(sql, (source,)) rows = cursor.fetchall() for timestamp, price in rows: ts_int = int(timestamp) # 转换为北京时间 dt_beijing = datetime.utcfromtimestamp(ts_int) + timedelta(hours=8) # 如果是每天 08:00:00 北京时间,则减 8 小时 if dt_beijing.hour == 8 and dt_beijing.minute == 0 and dt_beijing.second == 0: ts_int -= 8 * 3600 prices[str(ts_int)] = float(price) finally: conn.close() return prices def load_existing_data(): """加载历史价格数据""" if not os.path.exists(DATA_FILE): return {}, {} try: with open(DATA_FILE, "r", encoding="utf-8") as f: ns = {} exec(f.read(), ns) return ns.get("prices_temp", {}), ns.get("prices", {}) except Exception: return {}, {} def save_prices(prices_temp, prices): """保存价格数据到文件""" with open(DATA_FILE, "w", encoding="utf-8") as f: f.write("# 自动生成的BTC历史价格数据文件\n") f.write(f"# 更新时间: {datetime.now()}\n\n") f.write("prices_temp = ") f.write(json.dumps(prices_temp, indent=4, ensure_ascii=False)) f.write("\n\nprices = ") f.write(json.dumps(prices, indent=4, ensure_ascii=False)) f.write("\n") def get_last_timestamp(price_dict): """获取当前字典中最大的时间戳""" if not price_dict: return None return max(int(ts) for ts in price_dict.keys()) def update_once(): """执行一次更新流程""" prices_temp, prices = load_existing_data() last_nasdaq_ts = get_last_timestamp(prices_temp) last_crypto_ts = get_last_timestamp(prices) nasdaq_new = get_new_prices("Nasdaq", last_nasdaq_ts) crypto_new = get_new_prices("CryptoCompare", last_crypto_ts) prices_temp.update(nasdaq_new) prices.update(crypto_new) save_prices(prices_temp, prices) def main(): """主循环任务""" while True: try: update_once() except Exception: pass time.sleep(INTERVAL) if __name__ == "__main__": main()