diff --git a/coinbus/btc_update.py b/coinbus/btc_update.py new file mode 100644 index 0000000..ce4e227 --- /dev/null +++ b/coinbus/btc_update.py @@ -0,0 +1,125 @@ +import pymysql +import json +import os +import time +from datetime import datetime, timedelta + +# 数据库配置 +DB_CONFIG = { + "host": "192.168.194.240", + "user": "root", + "password": "2GS@bPYcgiMyL14A", + "database": "btcdb", + "port": 4423, + "connect_timeout": 60, + "read_timeout": 60, + "write_timeout": 60, + "charset": "utf8mb4" +} + +# 数据文件路径 +DATA_FILE = "btc_historical_price.py" + +# 定时任务间隔(秒)—— 例如 3600 为每小时更新一次 +INTERVAL = 28800 + + +def get_new_prices(source, last_timestamp=None): + """ + 从数据库获取 source 数据源的最新价格 + 仅每天北京时间 08:00:00 的数据减 8 小时存入文件 + """ + conn = pymysql.connect(**DB_CONFIG) + prices = {} + try: + with conn.cursor() as cursor: + if last_timestamp: + sql = """ + SELECT timestamp, price + FROM btc_prices + WHERE source = %s AND timestamp > %s + ORDER BY timestamp + """ + cursor.execute(sql, (source, last_timestamp)) + else: + sql = """ + SELECT timestamp, price + FROM btc_prices + WHERE source = %s + ORDER BY timestamp + """ + cursor.execute(sql, (source,)) + rows = cursor.fetchall() + for timestamp, price in rows: + ts_int = int(timestamp) + # 转换为北京时间 + dt_beijing = datetime.utcfromtimestamp(ts_int) + timedelta(hours=8) + # 如果是每天 08:00:00 北京时间,则减 8 小时 + if dt_beijing.hour == 8 and dt_beijing.minute == 0 and dt_beijing.second == 0: + ts_int -= 8 * 3600 + prices[str(ts_int)] = float(price) + finally: + conn.close() + return prices + + +def load_existing_data(): + """加载历史价格数据""" + if not os.path.exists(DATA_FILE): + return {}, {} + + try: + with open(DATA_FILE, "r", encoding="utf-8") as f: + ns = {} + exec(f.read(), ns) + return ns.get("prices_temp", {}), ns.get("prices", {}) + except Exception: + return {}, {} + + +def save_prices(prices_temp, prices): + """保存价格数据到文件""" + with open(DATA_FILE, "w", encoding="utf-8") as f: + f.write("# 自动生成的BTC历史价格数据文件\n") + f.write(f"# 更新时间: {datetime.now()}\n\n") + f.write("prices_temp = ") + f.write(json.dumps(prices_temp, indent=4, ensure_ascii=False)) + f.write("\n\nprices = ") + f.write(json.dumps(prices, indent=4, ensure_ascii=False)) + f.write("\n") + + +def get_last_timestamp(price_dict): + """获取当前字典中最大的时间戳""" + if not price_dict: + return None + return max(int(ts) for ts in price_dict.keys()) + + +def update_once(): + """执行一次更新流程""" + prices_temp, prices = load_existing_data() + last_nasdaq_ts = get_last_timestamp(prices_temp) + last_crypto_ts = get_last_timestamp(prices) + + nasdaq_new = get_new_prices("Nasdaq", last_nasdaq_ts) + crypto_new = get_new_prices("CryptoCompare", last_crypto_ts) + + prices_temp.update(nasdaq_new) + prices.update(crypto_new) + + save_prices(prices_temp, prices) + + +def main(): + """主循环任务""" + while True: + try: + update_once() + except Exception: + pass + time.sleep(INTERVAL) + + +if __name__ == "__main__": + main() diff --git a/coinbus/btc_utxos_lyq2.py b/coinbus/btc_utxos_lyq2.py index aff4f61..ad94a59 100644 --- a/coinbus/btc_utxos_lyq2.py +++ b/coinbus/btc_utxos_lyq2.py @@ -203,9 +203,10 @@ class UtxosIf: addr = None addrtype = None ip = "127.0.0.1" - port = " 8332" + port = "8332" user = "user" password = "password" + timeout=100 if rpc is None: rpc = RPC(ip, port, user, password) tx = None @@ -1169,24 +1170,59 @@ class UtxosIf: # 对于MongoDB中的每个集合,该方法将数据插入到集合中,并打印插入的文档以进行验证。 # 此外,该方法还会计算和更新一些其他指标(如利润率、实现价格、短期和长期持有者的利润率、相对比率等),并将它们插入“utxosv2”表中。 # 总体而言,这种方法有助于将分析的数据存储到关系数据库 (MySQL) 和NoSQL数据库 (MongoDB) 中,从而便于检索和进一步分析。 - def get_history_price(self): - prices = {} - response_price = requests.get( - 'https://data.nasdaq.com/api/v3/datatables/QDL/BCHAIN?code=MKPRU;api_key=FZqXog4sR-b7cYnXcRVV') - if response_price.status_code == 200: - #print(response_price.content) - priceweb = ujson.loads(response_price.content) - if "datatable" in priceweb: - priceset = priceweb["datatable"] - if "data" in priceset: - pricedata = priceset["data"] - for price in pricedata: - daystr = price[1] - p = price[2] - dayutc = time.mktime(time.strptime(daystr, "%Y-%m-%d")) - prices[str(int(dayutc))] = float(p) - #print(price, int(dayutc), g_prices[str(int(dayutc))]) - return prices + def get_history_price(self, batch_size=5000): + """获取数据库中的 Nasdaq 数据,存入字典""" + db_config = { + "host": "192.168.194.240", + "user": "root", + "password": "2GS@bPYcgiMyL14A", + "database": "btcdb", + "port": 4423, + "connect_timeout": 60, + "read_timeout": 60, + "write_timeout": 60, + "charset": "utf8mb4" + } + + offset = 0 + self.pricedict = {} + + while True: + connection = pymysql.connect(**db_config) + try: + with connection.cursor() as cursor: + sql = "SELECT timestamp, price FROM btc_prices WHERE source = 'Nasdaq' ORDER BY timestamp LIMIT %s OFFSET %s" + cursor.execute(sql, (batch_size, offset)) + rows = cursor.fetchall() + if not rows: + break + for timestamp, price in rows: + self.pricedict[str(int(timestamp))] = float(price) + finally: + connection.close() + + offset += batch_size + if len(rows) < batch_size: + break # 最后一页读取完成 + + return self.pricedict + #prices = {} + #response_price = requests.get( + # 'https://data.nasdaq.com/api/v3/datatables/QDL/BCHAIN?code=MKPRU;api_key=FZqXog4sR-b7cYnXcRVV') + #if response_price.status_code == 200: + # #print(response_price.content) + # priceweb = ujson.loads(response_price.content) + # if "datatable" in priceweb: + # priceset = priceweb["datatable"] + # if "data" in priceset: + # pricedata = priceset["data"] + # for price in pricedata: + # daystr = price[1] + # p = price[2] + # dayutc = time.mktime(time.strptime(daystr, "%Y-%m-%d")) + # prices[str(int(dayutc))] = float(p) + # #print(price, int(dayutc), g_prices[str(int(dayutc))]) + #return prices # “get_history_price”方法似乎用于从WebAPI终结点检索历史价格数据。以下是它的作用的细分: # - 初始化一个空字典“prices”来存储历史价格数据。 # - 向指定的API端点发送HTTP GET请求,该端点可能提供历史比特币价格数据。 @@ -1200,54 +1236,95 @@ class UtxosIf: # - 使用'strptime'将日期字符串 ('daystr') 解析为Unix时间戳 ('dayutc')。 # - 将Unix 时间戳作为键存储在“prices”字典中,并将相应的price ('p') 作为值。 # 最后,它返回包含历史价格数据的“prices”字典,其中Unix时间戳作为键,价格作为值 - def get_history_price2(self, pricedict): + def get_history_price2(self, batch_size=5000): #pricedict = {} - dayt = time.gmtime() - daystr = time.strftime("%Y", dayt) - year = int(daystr) - end_year = year - while True: - url = "" - if end_year != year: - start_year = end_year - url = "https://data.messari.io/api/v1/assets/bitcoin/metrics/price/time-series?start=" - else: - url = "https://data.messari.io/api/v1/assets/bitcoin/metrics/price/time-series?after=" + str( - year) + "-01-01&order=descending&interval=1d" + """获取数据库中的 Messari 数据,存入字典""" + db_config = { + "host": "192.168.194.240", + "user": "root", + "password": "2GS@bPYcgiMyL14A", + "database": "btcdb", + "port": 4423, + "connect_timeout": 60, + "read_timeout": 60, + "write_timeout": 60, + "charset": "utf8mb4" + } - if end_year != year: - url = url + str(start_year) + "-01-01&end=" + str(end_year) + "-12-31&order=descending&interval=1d" - header_set = {} - header_set["x-messari-api-key"] = "aH2pyj5i4QGo1k1gLxXEbIJ5RJr+FYKLEWk6cRT6RuSc6lRY" - # header_set["Content-Type"] = "application/json" - print(header_set, url) - response_price = requests.get(url, headers=header_set) - # print(response_price) - if response_price.status_code == 200: - # print(response_price.content) - priceweb = ujson.loads(response_price.content) - if "data" in priceweb: - priceset = priceweb["data"] - if "values" in priceset: - valueset = priceset["values"] - if valueset is not None: - for supply in valueset: - dayutc = int(supply[0] / 1000) - s = supply[1] - ret_time = time.gmtime(dayutc) - ret_daystr = time.strftime("%d %b %Y", ret_time) - ret_dayutc = int(time.mktime(time.strptime(ret_daystr, "%d %b %Y"))) - pricedict[str(ret_dayutc)] = float(s) - # print(s, dayutc, pricedict[str(dayutc)]) - # break - else: - break - else: - break - end_year -= 1 - time.sleep(2) - print(pricedict) - return pricedict + offset = 0 + self.pricedict = {} + + while True: + connection = pymysql.connect(**db_config) + try: + with connection.cursor() as cursor: + sql = """ + SELECT timestamp, price + FROM btc_prices + WHERE source = 'CryptoCompare' + ORDER BY timestamp + LIMIT %s OFFSET %s + """ + cursor.execute(sql, (batch_size, offset)) + rows = cursor.fetchall() + if not rows: + break + for timestamp, price in rows: + self.pricedict[str(int(timestamp))] = float(price) + finally: + connection.close() + + offset += batch_size + if len(rows) < batch_size: + break # 数据已全部读取 + + return self.pricedict + #dayt = time.gmtime() + #daystr = time.strftime("%Y", dayt) + #year = int(daystr) + #end_year = year + #while True: + # url = "" + # if end_year != year: + # start_year = end_year + # url = "https://data.messari.io/api/v1/assets/bitcoin/metrics/price/time-series?start=" + # else: + # url = "https://data.messari.io/api/v1/assets/bitcoin/metrics/price/time-series?after=" + str( + # year) + "-01-01&order=descending&interval=1d" +# +# if end_year != year: +# url = url + str(start_year) + "-01-01&end=" + str(end_year) + "-12-31&order=descending&interval=1d" +# header_set = {} +# header_set["x-messari-api-key"] = "aH2pyj5i4QGo1k1gLxXEbIJ5RJr+FYKLEWk6cRT6RuSc6lRY" +# # header_set["Content-Type"] = "application/json" +# print(header_set, url) +# response_price = requests.get(url, headers=header_set) +# # print(response_price) +# if response_price.status_code == 200: +# # print(response_price.content) +# priceweb = ujson.loads(response_price.content) +# if "data" in priceweb: +# priceset = priceweb["data"] +# if "values" in priceset: +# valueset = priceset["values"] +# if valueset is not None: +# for supply in valueset: +# dayutc = int(supply[0] / 1000) +# s = supply[1] +# ret_time = time.gmtime(dayutc) +# ret_daystr = time.strftime("%d %b %Y", ret_time) +# ret_dayutc = int(time.mktime(time.strptime(ret_daystr, "%d %b %Y"))) +# pricedict[str(ret_dayutc)] = float(s) +# # print(s, dayutc, pricedict[str(dayutc)]) +# # break +# else: +# break +# else: +# break +# end_year -= 1 +# time.sleep(2) +# print(pricedict) +# return pricedict # 'get_history # 初始化与时间相关的变量,包括当前年份。 # 进入一个循环,从当前年份开始,向后循环访问年份。 @@ -1264,21 +1341,46 @@ class UtxosIf: # 最后,它返回“pricedict” def get_current_price(self): price = 0 + DB_CONFIG = { + "host": "192.168.194.240", + "user": "root", + "password": "2GS@bPYcgiMyL14A", + "database": "btcdb", + "port": 4423 + } + connection = pymysql.connect(**DB_CONFIG) try: - response_price = requests.get( - 'https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT') - prices = ujson.loads(response_price.text) - price = float(prices["price"]) - print(response_price.text, price) - response_price.close() - # print("price", price) - return price - except: - response_price = requests.get("https://api.coinpaprika.com/v1/tickers/btc-bitcoin") - prices = ujson.loads(response_price.text) - price = float(prices["quotes"]["USD"]["price"]) - response_price.close() - return price + with connection.cursor() as cursor: + for source in ("binance", "coinbase"): + cursor.execute(""" + SELECT price FROM btc_realtime_prices + WHERE source=%s + ORDER BY timestamp DESC + LIMIT 1 + """, (source,)) + row = cursor.fetchone() + if row: + price = float(row[0]) + break + finally: + connection.close() + return price + #price = 0 + #try: + # response_price = requests.get( + # 'https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT') + # prices = ujson.loads(response_price.text) + # price = float(prices["price"]) + # print(response_price.text, price) + # response_price.close() + # # print("price", price) + # return price + #except: + # response_price = requests.get("https://api.coinpaprika.com/v1/tickers/btc-bitcoin") + # prices = ujson.loads(response_price.text) + # price = float(prices["quotes"]["USD"]["price"]) + # response_price.close() + # return price # 该函数似乎是一种从特定 # API端点检索比特币当前价格的方法。以下是其功能的细分:get_current_price # 初始化默认值为0的变量。price @@ -1393,14 +1495,15 @@ class UtxosIf: password = "password" rpc = RPC(ip, port, user, password) +# total_height = rpc.blockchain.get_block_count() while True: try: total_height = rpc.blockchain.get_block_count() break except Exception as e: - print(f"❌ get_block_count 超时或异常:{e},重试中...") - time.sleep(3) - rpc = RPC(ip, port, user, password) # 重新建立连接 + print("rpctimeout") + time.sleep(10) + rpc = RPC(ip, port, user, password) if height >= total_height: return prev_height = None @@ -1488,14 +1591,15 @@ class UtxosIf: password = "password" rpc = RPC(ip, port, user, password) +# total_height = rpc.blockchain.get_block_count() while True: try: total_height = rpc.blockchain.get_block_count() break except Exception as e: - print(f"❌ get_block_count 超时或异常:{e},重试中...") + print("rpctimeout") time.sleep(10) - rpc = RPC(ip, port, user, password) # 重新建立连接 + rpc = RPC(ip, port, user, password) if height >= total_height: return #total_height = rpc.blockchain.get_block_count() @@ -1557,6 +1661,7 @@ class UtxosIf: #current_price = self.get_current_price() #if current_price == 0: # return + print("handle_utxos initiate") connin = sqlite3.connect("utxos.sqlite") cin = connin.cursor() cursorin = cin.execute("SELECT * from utxos") @@ -1581,12 +1686,13 @@ class UtxosIf: daystr = time.strftime("%d %b %Y", dt2) dayutc = int(time.mktime(time.strptime(daystr, "%d %b %Y"))) price = 0 - dayutc=dayutc-28800 - current_price=current_price-28800 - print("ceshi",str(dayutc)) - print("ceshi2", str(current_price)) - if str(dayutc) > str(check_dayutc): - continue + d_t = datetime.utcfromtimestamp(dayutc) + timedelta(hours=8) + d_t08am = datetime(d_t.year, d_t.month, d_t.day, 8, 0, 0) + dayutc=int((d_t08am - timedelta(hours=8)).timestamp())-28800 + d_t2 = datetime.utcfromtimestamp(check_dayutc) + timedelta(hours=8) + d_t208am = datetime(d_t2.year, d_t2.month, d_t2.day, 8, 0, 0) + check_dayutc=int((d_t208am - timedelta(hours=8)).timestamp())-28800 + print("ceshi",5) if str(dayutc) in prices: price = int(prices[str(dayutc)]) else: @@ -1617,7 +1723,9 @@ class UtxosIf: self.lth_mvrv = self.lth_mv / self.lth_rcap if self.sth_rcap > 0: self.sth_mvrv = self.sth_mv / self.sth_rcap + print("save_db initiate") self.save_db() + print("save_db ok") cin.close() #cout.close() @@ -1694,15 +1802,15 @@ if __name__ == '__main__': os.system("if [ -e utxos.dat ]; then rm utxos.dat; fi") os.system("if [ -e utxos.sqlite ]; then rm utxos.sqlite; fi") check_dayutc = int(time.mktime(time.strptime(check_dt, "%Y-%m-%d"))) - cmd = "~/bitcoin-29.0/bin/bitcoin-cli -rpcuser=user -rpcpassword=password invalidateblock " + daily_hash[str(check_dayutc)] + cmd = "~/bitcoin-29.0/bin/bitcoin-cli -rpcuser=user -rpcpassword=password -rpcclienttimeout=600 invalidateblock " + daily_hash[str(check_dayutc)] os.system(cmd) - print("select ok") + print("select ok",daily_hash[str(check_dayutc)]) time.sleep(60); - os.system("~/bitcoin-29.0/bin/bitcoin-cli -rpcuser=user -rpcpassword=password dumptxoutset ~/utxos.dat") + os.system("~/bitcoin-29.0/bin/bitcoin-cli -rpcuser=user -rpcpassword=password -rpcclienttimeout=1800 dumptxoutset ~/utxos.dat latest") print("dumptxoutset ok") time.sleep(60); - # os.system("./utxo_to_sqlite ./utxos.dat ./utxos.sqlite") - os.system("python3 ~/utxo_to_sqlite.py ~/utxos.dat ~/utxos.sqlite") + #os.system("./utxo_to_sqlite ./utxos.dat ./utxos.sqlite") + os.system("python3 utxo_to_sqlite.py ./utxos.dat ./utxos.sqlite") print("utxo_to_sqlite ok") time.sleep(60); stats.utxos(check_dayutc) @@ -1727,4 +1835,4 @@ if __name__ == '__main__': # - 它使用“utxos”方法处理指定日期的 UTXO。 # - 它使用比特币 CLI 命令“reconsiderblock”重新考虑之前失效的区块。 # 8. 每个步骤都附有指示脚本进度的打印语句。 -# 这个脚本似乎是一个更大的系统的一部分,该系统与比特币网络交互并执行与UTXO相关的任务。它严重依赖比特币核心软件和命令行界面(“bitcoin-cli”) \ No newline at end of file +# 这个脚本似乎是一个更大的系统的一部分,该系统与比特币网络交互并执行与UTXO相关的任务。它严重依赖比特币核心软件和命令行界面(“bitcoin-cli”)