From 5964a7fb8bc92e49ad50f5d164fbab425c51d78c Mon Sep 17 00:00:00 2001 From: fengche <1158629543@qq.com> Date: Fri, 22 Aug 2025 15:18:56 +0800 Subject: [PATCH] =?UTF-8?q?coinbus=E4=BB=A3=E7=A0=81=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- coinbus/Binance_fapi.py | 119 +++++ coinbus/CoinmarketCap.py | 115 ++++ coinbus/arh999_lyq.py | 1005 +++++++++++++++++++++++++++++++++++ coinbus/arh999eth_lyq.py | 504 ++++++++++++++++++ coinbus/check_order_lyq.py | 184 +++++++ coinbus/check_zone_lyq.py | 146 +++++ coinbus/exchangeRate_lyq.py | 117 ++++ 7 files changed, 2190 insertions(+) create mode 100644 coinbus/Binance_fapi.py create mode 100644 coinbus/CoinmarketCap.py create mode 100644 coinbus/arh999_lyq.py create mode 100644 coinbus/arh999eth_lyq.py create mode 100644 coinbus/check_order_lyq.py create mode 100644 coinbus/check_zone_lyq.py create mode 100644 coinbus/exchangeRate_lyq.py diff --git a/coinbus/Binance_fapi.py b/coinbus/Binance_fapi.py new file mode 100644 index 0000000..f947594 --- /dev/null +++ b/coinbus/Binance_fapi.py @@ -0,0 +1,119 @@ +import pymysql +import requests +import time +import schedule +from datetime import datetime + +# MySQL连接函数 +def connect_to_db(): + return pymysql.connect( + host="127.0.0.1", # 数据库主机 + user="root", # 数据库用户名 + password="2GS@bPYcgiMyL14A", # 数据库密码 + database="binance_api", # 数据库名称 + port=4423 # 数据库端口 + ) + +# 执行SQL查询的函数 +def execute_query(query, params=None): + conn = connect_to_db() # 连接数据库 + with conn.cursor() as cursor: + cursor.execute(query, params) # 执行SQL语句 + conn.commit() # 提交事务 + conn.close() # 关闭数据库连接 + +# 北京时间转换函数 +def bj_time(timestamp): + # 将时间戳转换为北京时间 + return datetime.utcfromtimestamp(timestamp / 1000).strftime('%Y-%m-%d %H:%M:%S') + +# Binance API客户端 +class BinanceAPI: + base_url = "https://fapi.binance.com" # Binance的基础API URL + + @staticmethod + def get(endpoint, params=None): + # 发送GET请求到Binance API + response = requests.get(f"{BinanceAPI.base_url}{endpoint}", params=params) + return response.json() # 返回JSON格式的响应数据 + +# 任务1:获取资金费率并插入数据库 +def funding_rate(): + # 获取BTC和ETH的资金费率数据 + btc_data = BinanceAPI.get("/fapi/v1/fundingRate", {"symbol": "BTCUSDT"}) + eth_data = BinanceAPI.get("/fapi/v1/fundingRate", {"symbol": "ETHUSDT"}) + + # 准备SQL插入语句 + btc_sql = """INSERT INTO fundingrate(symbol, ts, fundingRate) + VALUES ("BTCUSDT", %s, %s)""" + eth_sql = """INSERT INTO fundingrate(symbol, ts, fundingRate) + VALUES ("ETHUSDT", %s, %s)""" + + # 执行SQL插入操作 + execute_query(btc_sql, (btc_data[-1]['fundingTime'], btc_data[-1]['fundingRate'])) + execute_query(eth_sql, (eth_data[-1]['fundingTime'], eth_data[-1]['fundingRate'])) + +# 任务2:获取未平仓合约数并插入数据库 +def open_interest(): + # 获取BTC和ETH的未平仓合约数数据 + btc_data = BinanceAPI.get("/fapi/v1/openInterest", {"symbol": "BTCUSDT"}) + eth_data = BinanceAPI.get("/fapi/v1/openInterest", {"symbol": "ETHUSDT"}) + + # 准备SQL插入语句 + btc_sql = """INSERT INTO openInterest(symbol, ts, openInterest) + VALUES ("BTCUSDT", %s, %s)""" + eth_sql = """INSERT INTO openInterest(symbol, ts, openInterest) + VALUES ("ETHUSDT", %s, %s)""" + + # 执行SQL插入操作 + execute_query(btc_sql, (btc_data['time'], btc_data['openInterest'])) + execute_query(eth_sql, (eth_data['time'], eth_data['openInterest'])) + +# 任务3:获取长短比并插入数据库 +def long_short_ratio(interval): + # 获取BTC和ETH的长短比数据 + btc_data = BinanceAPI.get("/futures/data/takerlongshortRatio", { + "symbol": "BTCUSDT", "period": interval + }) + eth_data = BinanceAPI.get("/futures/data/takerlongshortRatio", { + "symbol": "ETHUSDT", "period": interval + }) + + # 准备SQL插入语句 + btc_sql = f"""INSERT INTO longshortratio{interval}(symbol, ts, buyVol, sellVol, buySellRatio) + VALUES ("BTCUSDT", %s, %s, %s, %s)""" + eth_sql = f"""INSERT INTO longshortratio{interval}(symbol, ts, buyVol, sellVol, buySellRatio) + VALUES ("ETHUSDT", %s, %s, %s, %s)""" + + # 执行SQL插入操作 + execute_query(btc_sql, (btc_data[-1]['timestamp'], btc_data[-1]['buyVol'], btc_data[-1]['sellVol'], btc_data[-1]['buySellRatio'])) + execute_query(eth_sql, (eth_data[-1]['timestamp'], eth_data[-1]['buyVol'], eth_data[-1]['sellVol'], eth_data[-1]['buySellRatio'])) + +# 定时任务调度 +def schedule_jobs(): + # 每天0点、8点和16点1分执行资金费率任务 + schedule.every().day.at("00:01").do(funding_rate) + schedule.every().day.at("08:01").do(funding_rate) + schedule.every().day.at("16:01").do(funding_rate) + + # 每分钟的15秒执行未平仓合约数任务 + schedule.every().minute.at(":15").do(open_interest) + schedule.every().minute.at(":25").do(open_interest) + schedule.every().minute.at(":35").do(open_interest) + schedule.every().minute.at(":45").do(open_interest) + schedule.every().minute.at(":55").do(open_interest) + + # 每分钟的15秒执行长短比任务,周期为5m, 15m, 30m等 + intervals = ["5m", "15m", "30m", "1h", "2h", "4h", "6h", "12h", "1d"] + for interval in intervals: + schedule.every().minute.at(":15").do(long_short_ratio, interval=interval) + +# 启动任务调度 +def run(): + schedule_jobs() # 设置定时任务 + while True: + schedule.run_pending() # 执行所有待处理任务 + time.sleep(1) # 每秒检查一次任务是否到期 + +if __name__ == "__main__": + run() # 运行定时任务调度 \ No newline at end of file diff --git a/coinbus/CoinmarketCap.py b/coinbus/CoinmarketCap.py new file mode 100644 index 0000000..4c22afd --- /dev/null +++ b/coinbus/CoinmarketCap.py @@ -0,0 +1,115 @@ +import requests +import pymysql +import time +from apscheduler.schedulers.blocking import BlockingScheduler +from datetime import datetime, timedelta + +# API 密钥和请求头 +API_KEY = "83bf85c1-1bd8-426a-a043-6b67dad8bda5" +headers = { "X-CMC_PRO_API_KEY": API_KEY } +base_url = "https://pro-api.coinmarketcap.com" +url = f"{base_url}/v1/cryptocurrency/listings/latest" + +# MySQL 数据库连接配置 +db_config = { + 'host': '127.0.0.1', # 数据库主机地址 + 'user': 'root', # 数据库用户名 + 'password': '2GS@bPYcgiMyL14A', # 数据库密码 + 'database': 'coinmarketcap', # 数据库名称 + 'port': 4423 # 数据库端口 +} + +# 创建数据库表格(如果不存在) +def create_table(): + connection = pymysql.connect(**db_config) # 连接到数据库 + cursor = connection.cursor() # 创建游标对象 + + # 创建表格的 SQL 语句 + create_table_query = """ + CREATE TABLE IF NOT EXISTS marketInfo ( + id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, # 自增ID + update_time DATETIME NOT NULL, # 更新时间 + symbol CHAR(15) NOT NULL, # 币种符号 + ranks INT NOT NULL, # 排名 + price DOUBLE NOT NULL, # 当前价格 + market_cap DOUBLE NOT NULL, # 市值 + volume_24h DOUBLE NOT NULL, # 24小时交易量 + volume_change_24h DOUBLE NOT NULL, # 24小时交易量变化 + percent_change_1h DOUBLE NOT NULL, # 1小时价格变化 + percent_change_24h DOUBLE NOT NULL, # 24小时价格变化 + percent_change_7d DOUBLE NOT NULL, # 7天价格变化 + percent_change_30d DOUBLE NOT NULL, # 30天价格变化 + percent_change_60d DOUBLE NOT NULL, # 60天价格变化 + percent_change_90d DOUBLE NOT NULL # 90天价格变化 + ); + """ + cursor.execute(create_table_query) # 执行创建表格的 SQL 语句 + connection.commit() # 提交事务 + cursor.close() # 关闭游标 + connection.close() # 关闭数据库连接 + +# 将 UTC 时间转换为北京时间 +def bj_time(utc_time): + """ 将 UTC 时间转换为北京时间 """ + utc_time = datetime.strptime(utc_time, '%Y-%m-%dT%H:%M:%S.%fZ') # 将 UTC 时间字符串转换为 datetime 对象 + beijing_time = utc_time + timedelta(hours=8) # 北京时间比 UTC 时间快 8 小时 + return beijing_time.strftime('%Y-%m-%d %H:%M:%S') # 格式化成字符串 + +# 获取市场数据并插入到数据库 +def marketcap(): + try: + # 向 CoinMarketCap API 发送请求,获取加密货币的市场数据 + response = requests.get(url, headers=headers, params={"limit": 200}) + response.raise_for_status() # 如果请求失败,抛出异常 + except requests.RequestException: + time.sleep(60) # 等待 1 分钟后重试 + response = requests.get(url, headers=headers, params={"limit": 200}) + + data = response.json() # 将返回的 JSON 数据转换为 Python 字典 + for item in data['data']: # 遍历获取的数据 + quote = item['quote']['USD'] # 获取 USD 相关的市场数据 + update_time = bj_time(quote['last_updated']) # 转换更新时间为北京时间 + symbol = item['symbol'] # 获取币种符号 + ranks = item['cmc_rank'] # 获取排名 + price = quote['price'] # 获取价格 + market_cap = quote['market_cap'] # 获取市值 + volume_24h = quote['volume_24h'] # 获取 24 小时交易量 + volume_change_24h = quote['volume_change_24h'] # 获取 24 小时交易量变化 + percent_change_1h = quote['percent_change_1h'] # 获取 1 小时价格变化 + percent_change_24h = quote['percent_change_24h'] # 获取 24 小时价格变化 + percent_change_7d = quote['percent_change_7d'] # 获取 7 天价格变化 + percent_change_30d = quote['percent_change_30d'] # 获取 30 天价格变化 + percent_change_60d = quote['percent_change_60d'] # 获取 60 天价格变化 + percent_change_90d = quote['percent_change_90d'] # 获取 90 天价格变化 + + # 将数据插入到 MySQL 数据库 + connection = pymysql.connect(**db_config) # 连接到数据库 + cursor = connection.cursor() # 创建游标对象 + insert_query = """ + INSERT INTO marketInfo ( + update_time, symbol, ranks, price, market_cap, volume_24h, + volume_change_24h, percent_change_1h, percent_change_24h, + percent_change_7d, percent_change_30d, percent_change_60d, + percent_change_90d + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); + """ + # 执行插入数据的 SQL 语句 + cursor.execute(insert_query, ( + update_time, symbol, ranks, price, market_cap, volume_24h, + volume_change_24h, percent_change_1h, percent_change_24h, + percent_change_7d, percent_change_30d, percent_change_60d, + percent_change_90d + )) + connection.commit() # 提交事务 + cursor.close() # 关闭游标 + connection.close() # 关闭数据库连接 + +# 定时任务:每 5 分钟执行一次 marketcap 函数 +def schedule_job(): + scheduler = BlockingScheduler() # 创建一个阻塞式调度器 + scheduler.add_job(marketcap, 'cron', minute='0,5,10,15,20,25,30,35,40,45,50,55') # 设置每 5 分钟执行一次 + scheduler.start() # 启动调度器 + +if __name__ == "__main__": + create_table() # 程序启动时,先创建数据库表格(如果不存在) + schedule_job() # 启动定时任务,开始定时抓取数据并插入数据库 \ No newline at end of file diff --git a/coinbus/arh999_lyq.py b/coinbus/arh999_lyq.py new file mode 100644 index 0000000..350c6f1 --- /dev/null +++ b/coinbus/arh999_lyq.py @@ -0,0 +1,1005 @@ +# coding=utf-8 +import ujson +from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient +import time +import requests +from loguru import logger +import datetime +import pymysql +import math + +g_prices = {} +g_dbif = None +g_lastts = 0 +# 正在设置一个与币安的现货 Websocket API 交互以获取加密货币价格数据的环境。它还包括用于处理日志记录、发出 HTTP 请求以及与 MySQL 数据库交互的模块。以下是导入和初始化的细分: +# ujson:快速的JSON编码器和解码器。 +# SpotWebsocketClient:币安用于现货交易的 WebSocket 客户端。 +# time:用于时间相关功能的模块。 +# requests:用于发出请求的 HTTP 库。 +# logger:Python 的日志记录库。 +# datetime:用于操作日期和时间的模块。 +# pymysql:MySQL 客户端库。 +# math:提供数学函数的模块。 +# 该脚本初始化全局变量 、 和 。 似乎用于存储价格数据、与 MySQL 数据库交互以及存储上次操作的时间戳。g_prices g_dbif g_lastts g_pricesg_dbifg_lastts +def get_day60_rise(day, prices): + total = 0 + cnt = 0 + for i in range(60): + if str(day) in prices: + cur_price = prices[str(day)] + day = str(day - 3600 * 24) + if day in prices: + prev_price = prices[day] + try: + #print(((cur_price-prev_price)/prev_price), day, cur_price, prev_price) + total += (((cur_price-prev_price)/prev_price)) + cnt += 1 + except: + pass + # print(day, total, cnt) + day = int(day) + return total +# 此函数“get_day60_rise”根据提供的价格数据计算过去 60 天的价格上涨百分比。以下是其工作原理的细分: +# -**参数**: +# - 'day':需要计算价格上涨的当天。 +# - 'prices':包含价格数据的字典,其中键是时间戳,值是相应的价格。 +# -**功能性**: +# - 它初始化变量“total”和“cnt”,以跟踪总价格上涨和考虑的天数。 +# - 它从给定的“日期”向后迭代 60 天。 +# - 对于范围内的每一天,它会检查当天的价格数据是否在“价格”字典中可用。 +# - 如果当天和前一天的价格数据可用,它会计算价格上涨的百分比并将其添加到“总计”中。 +# - 它为每个有效的价格数据条目递增“cnt”变量。 +# - 最后,它返回过去 60 天的总百分比涨幅。 +# 此函数似乎可用于分析特定时期的价格趋势 +def get_days_rise(day, maxdays, prices): + total = 0 + cnt = 0 + for i in range(maxdays): + if str(day) in prices: + cur_price = prices[str(day)] + day = str(day - 3600 * 24) + if day in prices: + prev_price = prices[day] + try: + #print(((cur_price-prev_price)/prev_price), day, cur_price, prev_price) + total += (((cur_price-prev_price)/prev_price)) + cnt += 1 + except: + pass + # print(day, total, cnt) + day = int(day) + print("get_days_rise", day, maxdays,cnt, total) + return total +# “get_days_rise”函数是先前“get_day60_rise”函数的扩展。其工作原理如下: +# -**参数**: +# - 'day':需要计算价格上涨的当天。 +# - 'maxdays':计算价格上涨时要考虑的最大天数。 +# - 'prices':包含价格数据的字典,其中键是时间戳,值是相应的价格。 +# -**功能性**: +# - 它初始化变量“total”和“cnt”,以跟踪总价格上涨和考虑的天数。 +# - 它从给定的“day”向后循环到“maxdays”的天数范围。 +# - 对于范围内的每一天,它会检查当天的价格数据是否在“价格”字典中可用。 +# - 如果当天和前一天的价格数据可用,它会计算价格上涨的百分比并将其添加到“总计”中。 +# - 它为每个有效的价格数据条目递增“cnt”变量。 +# - 最后,它返回指定天数内的总百分比增长。 +# 此函数通过允许计算由“maxdays”参数指定的自定义天数范围内的价格上涨来提供灵活性。 +def append_jzr_day60(dbif, day, price, day60_rise, day7_rise, day30_rise, day90_rise): + dbif.append_jzr60(day, price, day60_rise, day7_rise, day30_rise, day90_rise) +# “append_jzr_day60”功能似乎是将数据附加到数据库的更大系统的一部分。以下是它的作用的细分: +# -**参数**: +# - 'dbif':与数据库交互的接口。 +# - 'day':当前日期或时间戳。 +# - 'price':与当天对应的价格值。 +# - “day60_rise”:过去 60 天内价格上涨的百分比。 +# - “day7_rise”:过去 7 天内价格上涨的百分比。 +# - “day30_rise”:过去 30 天内价格上涨的百分比。 +# - “day90_rise”:过去 90 天内价格上涨的百分比。 +# -**功能性**: +# - 它在 'dbif' 对象上调用方法 'append_jzr60',传递提供的参数。 +# - 此方法可能负责将数据追加或存储在数据库表或集合中。 +# 该功能似乎是管理和分析价格数据系统的一部分,专门计算和存储不同时间间隔(60天、7天、30天、90天)的价格上涨百分比,并将其保存到数据库中以供进一步分析或报告 +def sync_jzr_day60(dbif, prices): + for day in prices: + print(day, prices[day]) + day60_rise = get_days_rise(int(day), 60, prices) + day7_rise = get_days_rise(int(day), 7, prices) + day30_rise = get_days_rise(int(day), 30, prices) + day90_rise = get_days_rise(int(day), 90, prices) + print(day, day60_rise) + append_jzr_day60(dbif, day, prices[day], day60_rise, day7_rise, day30_rise, day90_rise) +# “sync_jzr_day60”功能似乎通过计算给定价格数据集中每天在不同时间间隔(60 天、7 天、30 天、90 天)内的价格上涨百分比来同步数据。以下是它的作用的细分: +# -**参数**: +# - 'dbif':与数据库交互的接口。 +# - 'prices':包含每天价格数据的字典。 +# -**功能性**: +# - 它在“价格”字典中每天都在迭代。 +# - 对于每一天,它使用“get_days_rise”函数计算过去 60、7、30 和 90 天的价格上涨百分比。 +# - 然后,它调用“append_jzr_day60”函数将此数据以及相应的日期和价格附加到数据库中。 +# 总体而言,此功能负责通过计算和存储不同时间间隔内价格上涨的百分比来与数据库同步价格数据 +def check_jzr60_sync(dbif): + return dbif.check_jzr60_sync() +# “check_jzr60_sync”功能,用于检查数据库中 jzr60 表的数据同步是否完成。以下是它的作用: +# -**参数**: +# - 'dbif':与数据库交互的接口。 +# -**功能性**: +# - 它可能会查询数据库以检查“jzr60”表的数据同步是否完成。 +# - 此函数的具体实现将取决于数据库接口 ('dbif') 的详细信息,但通常它涉及查询数据库以确定是否已同步所有必要的数据。 +# 此函数可用于验证“jzr60”表的数据同步过程是否完成,这对于确保应用程序使用的数据的完整性非常重要 +def append_jzr60day(dbif, day, price, day60_rise, day7_rise, day30_rise, day90_rise): + dbif.append_jzr60_day(day, price, day60_rise, day7_rise, day30_rise, day90_rise) +# “append_jzr60day”函数似乎将特定日期的数据附加到数据库中的“jzr60”表中。以下是它的作用: +# -**参数**: +# - 'dbif':与数据库交互的接口。 +# - 'day':追加数据的特定日期。 +# - 'price':与当天相关的价格。 +# - “day60_rise”:过去 60 天内的价格上涨。 +# - “day7_rise”:过去 7 天的价格上涨。 +# - “day30_rise”:过去 30 天的价格上涨。 +# - “day90_rise”:过去 90 天内的价格上涨。 +# -**功能性**: +# - 它可能会在数据库的“jzr60”表中插入一个新行,其中包含提供的数据。 +# - 该函数将使用“dbif”接口执行 SQL 查询以将数据插入数据库。 +# - 这允许存储不同时间间隔(60 天、7 天、30 天和 90 天)的历史价格上涨数据以及相应的日期和价格。 +# 此功能对于维护不同时间段内价格上涨数据的历史记录至关重要 +def append_jzr60(dbif, dayutc, price, day60_rise, day7_rise, day30_rise, day90_rise): + dbif.append_jzr60(dayutc, price, day60_rise, day7_rise, day30_rise, day90_rise) +# “append_jzr60”函数似乎将特定日期的数据附加到数据库中可能名为“jzr60”的表中。以下是其目的和功能的细分: +# -**参数**: +# - 'dbif':与数据库交互的接口。 +# - 'dayutc':附加数据的特定日期的时间戳(以 UTC 为单位)。 +# - 'price':与当天相关的价格。 +# - “day60_rise”:过去 60 天内的价格上涨。 +# - “day7_rise”:过去 7 天的价格上涨。 +# - “day30_rise”:过去 30 天的价格上涨。 +# - “day90_rise”:过去 90 天内的价格上涨。 +# -**功能性**: +# - 它可能会在数据库的“jzr60”表中插入一个新行,其中包含提供的数据。 +# - 该函数将使用“dbif”接口执行 SQL 查询以将数据插入数据库。 +# - 这允许存储不同时间间隔(60 天、7 天、30 天和 90 天)的历史价格上涨数据以及相应的时间戳和价格。 +# 此功能对于维护不同时间段内价格上涨数据的历史记录至关重要 +def clean_jzr60day(dbif, clean_day): + dbif.clean_jzr60_day(clean_day) +# “clean_jzr60day”函数可能用于从数据库的“jzr60”表中清理或删除与特定日期相关的数据。以下是它的作用: +# - ** 参数 **: +# - 'dbif':与数据库交互的接口。 +# - “clean_day”:要从“jzr60”表中清除或删除数据的日期的时间戳(以UTC为单位)。 +# - ** 功能性 **: +# - 它可能通过'dbif'接口执行SQL查询,以从'jzr60'表中删除与提供的'clean_day'关联的数据。 +# - 此操作通过删除过时或不相关的记录来帮助维护数据的完整性。 +# 出于各种原因,可能需要清理特定日期的数据,例如更正错误或删除重复条目。此功能可确保数据库保持最新和准确 +def handle_jzr_day60(dbif, day, dayutc, price, prices): + day60_rise = get_days_rise(dayutc, 60, prices) + day7_rise = get_days_rise(dayutc, 7, prices) + day30_rise = get_days_rise(dayutc, 30, prices) + day90_rise = get_days_rise(dayutc, 90, prices) + print(dayutc, price, day, day60_rise) + append_jzr60day(dbif, day, price, day60_rise, day7_rise, day30_rise, day90_rise) + append_jzr60(dbif, dayutc, price, day60_rise, day7_rise, day30_rise, day90_rise) + clean_day = dayutc - 3600 * 24 * 2 + clean_jzr60day(dbif, clean_day) +# “handle_jzr_day60”功能似乎用于管理与“jzr60”指标相关的数据的处理和存储,这可能表示 60 天的价格上涨。以下是它的作用的细分: +# -**参数**: +# - 'dbif':与数据库交互的接口。 +# - 'day':处理数据的日期的时间戳。 +# - 'dayutc':处理数据的当天的 UTC 时间戳。 +# - 'price':与当天相关的价格。 +# - 'prices':包含历史价格数据的字典。 +# -**功能性**: +# 1. 使用“get_days_rise”函数计算 60 天、7 天、30 天和 90 天的价格上涨。 +# 2. 打印 UTC 时间戳、价格和 60 天价格上涨,以便进行调试或监控。 +# 3. 使用“append_jzr60day”功能将计算出的指标('day60_rise'、'day7_rise'、'day30_rise'、'day90_rise')附加到数据库。 +# 4. 使用“append_jzr60”函数将相同的指标附加到数据库,但带有 UTC 时间戳 ('dayutc')。 +# 5. 使用“clean_jzr60day”功能清理当天 2 天(“clean_day”)的数据。 +# 总体而言,此函数可确保计算出的价格上涨指标存储在特定日期和相应的 UTC 时间戳的数据库中。此外,它还会清理数据以保持数据库干净 +class Arh99DbIf: + def __init__(self, host="172.17.0.1", port=4423, user="root", password="2GS@bPYcgiMyL14A", dbname="btcdb"): + self.conn = pymysql.connect(host=host, port=port, user=user, password=password, database=dbname, cursorclass=pymysql.cursors.DictCursor) + print("init arh99 db suceess!") + # “Arh99DbIf”类似乎是与数据库交互的接口。以下是其结构的细分: + # - ** 属性 **: + # - 'conn':此属性表示使用“pymysql.connect”方法与数据库的连接。它存储连接详细信息,例如主机、端口、用户名、密码、数据库名称和游标类。 + # - ** 方法 **: + # - '__init__':这是初始化 + # 'Arh99DbIf' + # 类的构造函数方法。它采用多个参数来获取数据库连接详细信息(主机、端口、用户、密码、dbname),并使用“pymysql.connect”建立与数据库的连接。连接成功后,它会打印一条消息,指示初始化成功。 + # 总的来说,这个类充当MySQL数据库连接的包装器,允许代码库的其他部分轻松地与数据库进行交互 + def check_sync(self): + synced = False + with self.conn.cursor() as cursor: + sql_query = "SELECT COUNT(id) FROM `arh99v3a`" + cursor.execute(sql_query) + result = cursor.fetchone() + print(result) + if result is not None: + if "COUNT(id)" in result: + if result["COUNT(id)"] > 0: + synced = True + self.conn.commit() + #print("synced", synced) + return synced + # “check_sync”方法检查数据库表“arh99v2a”是否已同步。以下是其功能的细分: + # - ** 方法签名 **: + # - 'check_sync(self)':此方法除“self”外不接受任何参数,表明它是“Arh99DbIf”类的实例方法。 + # - ** 功能性 **: + # - 它将'synced'变量初始化为'False',表示尚未确认同步。 + # - 在“with”语句中,它会打开一个光标来执行数据库操作。 + # - 它执行SQL查询,使用'COUNT(id)'函数计算'arh99v2a'表中的行数。 + # - 获取SQL查询的结果。 + # - 如果结果不是“None”,它会检查结果字典中是否存在键“COUNT(id)”,以及其值是否大于0。如果是这样,它会将“synced”设置为“True”。 + # - 它提交事务以应用游标所做的任何更改。 + # - 最后,它返回'synced'的值,指示同步是否已确认 ('True') 或未 ('False')。 + # 此方法提供了一种检查数据库表“arh99v2a”的同步状态的方法 + def append(self, day, price, arh99, arh99x): + with self.conn.cursor() as cursor: + sql_query = "SELECT COUNT(id) FROM `arh99v3a` WHERE unixdt=FROM_UNIXTIME(%s)" + cursor.execute(sql_query, (int(day),)) + result = cursor.fetchone() + #print(dt_utc) + #print(result) + if result is not None: + if "COUNT(id)" in result: + if result["COUNT(id)"] > 0: + sql_update = 'UPDATE arh99v3a SET `arh99`=%s, `arh99x`=%s, `price`=%s, `unixdt`=FROM_UNIXTIME(%s) WHERE unixdt=FROM_UNIXTIME(%s)' + print(sql_update) + cursor.execute(sql_update, (arh99, arh99x, price, int(day), int(day))) + else: + sql_insert = "INSERT INTO `arh99v3a` (`unixdt`, `price`, `arh99`, `arh99x`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, arh99, arh99x)) + self.conn.commit() + # “Arh99DbIf”类的“append”方法负责根据提供的参数添加或更新“arh99v2a”表中的记录。以下是其工作原理的解释: + # - ** 方法签名 **: + # - 'append(self, day, price, arh99, arh99x)':此方法采用四个参数: + # - 'day':Unix格式的当天时间戳。 + # - 'price':价格值。 + # - 'arh99':arh99的值。 + # - 'arh99x':arh99x的值。 + # - 'self':此参数是指'Arh99DbIf'类本身的实例。 + # - ** 功能性 **: + # - 在“with”语句中,它会打开一个光标来执行数据库操作。 + # - 它执行SQL查询以计算'arh99v2a'表中的行数,其中'unixdt' (Unix时间戳) 与提供的'day'匹配。 + # - 获取SQL查询的结果。 + # - 如果给定的“日期”有现有记录: + # - 它构造一个SQL'UPDATE'语句,使用新值'arh99'、'arh99x'、'price'和'unixdt'更新现有记录。 + # - 'UPDATE'语句更新与提供的'day'匹配的记录的'arh99'、'arh99x'、'price'和'unixdt'字段。 + # - 如果给定的“日期”没有现有记录: + # - 它构造一个SQL'INSERT'语句,将提供值为'day'、'price'、'arh99'和'arh99x'的新记录插入到'arh99v2a'表中。 + # - 'INSERT'语句在表中插入具有指定值的新记录。 + # - 执行SQL语句后,它会提交事务以应用游标所做的任何更改。 + # 此方法提供了根据提供的参数在“arh99v2a”表中添加新记录或更新现有记录的功能 + def append_day(self, day, price, arh99, arh99x): + with self.conn.cursor() as cursor: + sql_insert = "INSERT INTO `arh99v3aday` (`unixdt`, `price`, `arh99`, `arh99x`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, arh99, arh99x)) + self.conn.commit() + # “Arh99DbIf”类的“append_day”方法似乎与“append”方法具有类似的用途,但它专门在“arh99v2aday”表中插入了一条新记录。其工作原理如下: + # - ** 方法签名 **: + # - 'append_day(self, day, price, arh99, arh99x)':此方法采用四个参数: + # - 'day':Unix格式的当天时间戳。 + # - 'price':价格值。 + # - 'arh99':arh99的值。 + # - 'arh99x':arh99x的值。 + # - 'self':此参数是指'Arh99DbIf'类本身的实例。 + # - ** 功能性 **: + # - 在“with”语句中,它会打开一个光标来执行数据库操作。 + # - 它构造一个SQL'INSERT'语句,将提供值为'day'、'price'、'arh99'和'arh99x'的新记录插入到'arh99v2aday'表中。 + # - 'INSERT'语句在表中插入具有指定值的新记录。 + # - 执行SQL语句后,它会提交事务以应用游标所做的任何更改。 + # 此方法可用于专门将记录插入到“arh99v2aday”表中 + def clean_day(self, day): + with self.conn.cursor() as cursor: + sql_clean = "DELETE from arh99v3aday where unixdt 0: + synced = True + self.conn.commit() + #print("synced", synced) + return synced + # “Arh99DbIf”类中的“check_jzr60_sync”方法旨在确定“jzr60v2a”表是否已同步。以下是其功能的细分: + # - ** 方法签名 **: + # - 'check_jzr60_sync(self)':此方法不接受任何参数。 + # - ** 功能性 **: + # - 它将布尔变量'synced'初始化为'False',表示同步状态最初假定为未同步。 + # - 在'with'语句中,它会打开一个光标来执行SQL查询。 + # - 该方法构造一个SQL'SELECT'查询来计算'jzr60v2a'表中的记录数。 + # - 执行查询后,它使用'cursor.fetchone()'获取结果。 + # - 如果结果不是“None”,而是包含键“COUNT(id)”,则检查计数是否大于“0”。如果是这样,它会将“synced”设置为“True”,表示表已同步。 + # - 最后,它提交光标所做的任何更改。 + # 此方法的目的是提供有关“jzr60v2a”表是否已同步的信息 + def append_jzr60(self, day, price, jzr60, jzr7, jzr30, jzr90): + with self.conn.cursor() as cursor: + sql_query = "SELECT COUNT(id) FROM `jzr60v3a` WHERE unixdt=FROM_UNIXTIME(%s)" + cursor.execute(sql_query, (int(day),)) + result = cursor.fetchone() + #print(dt_utc) + #print(result) + if result is not None: + if "COUNT(id)" in result: + if result["COUNT(id)"] > 0: + sql_update = 'UPDATE jzr60v3a SET `jzr60`=%s,`jzr7`=%s,`jzr30`=%s,`jzr90`=%s,`price`=%s, `unixdt`=FROM_UNIXTIME(%s) WHERE unixdt=FROM_UNIXTIME(%s)' + print(sql_update) + cursor.execute(sql_update, (jzr60, jzr7, jzr30, jzr90, price, int(day), int(day))) + else: + sql_insert = "INSERT INTO `jzr60v3a` (`unixdt`, `price`, `jzr60`, `jzr7`, `jzr30`, `jzr90`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, jzr60, jzr7, jzr30, jzr90)) + self.conn.commit() + # “Arh99DbIf”类中的“append_jzr60”方法负责附加与“jzr60v2a”表相关的数据。以下是对其功能的分析: + # - ** 方法签名 **: + # - 'append_jzr60(self, day, price, jzr60, jzr7, jzr30, jzr90)':此方法采用以下参数: + # - 'day':表示日期的UNIX时间戳。 + # - 'price':与当天相关的价格。 + # - 'jzr60':JZR(60天上涨)值。 + # - 'jzr7':JZR(7天上涨)值。 + # - 'jzr30':JZR(30天上涨)值。 + # - 'jzr90':JZR(90天上涨)值。 + # - ** 功能性 **: + # - 该方法首先在“with”语句中初始化游标以执行 SQL 查询。 + # - 它构造一个SQL'SELECT'查询来计算指定日期的'jzr60v2a'表中的记录数。 + # - 执行查询后,它使用'cursor.fetchone()'获取结果。 + # - 如果结果不是“None”,而是包含键“COUNT(id)”,则检查计数是否大于“0”。如果为true,则使用SQL“UPDATE”查询使用提供的值更新现有记录。否则,它将使用SQL“INSERT”查询插入具有所提供值的新记录。 + # - 最后,它提交光标所做的任何更改。 + # 此方法允许根据提供的参数附加或更新与数据库中的“jzr60v2a”表相关的数据 + def append_jzr60_day(self, day, price, jzr60, jzr7, jzr30, jzr90): + with self.conn.cursor() as cursor: + sql_insert = "INSERT INTO `jzr60v3aday` (`unixdt`, `price`, `jzr60`, `jzr7`, `jzr30`, `jzr90`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, jzr60, jzr7, jzr30, jzr90)) + self.conn.commit() + # “Arh99DbIf”类中的“append_jzr60_day”方法负责附加与“jzr60v2aday”表相关的数据。以下是对其功能的分析: + # - ** 方法签名 **: + # - 'append_jzr60_day(self, day, price, jzr60, jzr7, jzr30, jzr90)':此方法采用以下参数: + # - 'day':表示日期的UNIX时间戳。 + # - 'price':与当天相关的价格。 + # - 'jzr60':JZR(60天上涨)值。 + # - 'jzr7':JZR(7天上涨)值。 + # - 'jzr30':JZR(30天上涨)值。 + # - 'jzr90':JZR(90天上涨)值。 + # - ** 功能性 **: + # - 该方法首先在“with”语句中初始化游标以执行 SQL 查询。 + # - 它构造一个SQL'INSERT'查询,以使用提供的值将新记录插入到'jzr60v2aday'表中。 + # - 构造查询后,它会打印SQL查询以进行调试。 + # - 然后,它使用提供的参数执行SQL“INSERT”查询。 + # - 最后,它提交光标所做的任何更改。 + # 此方法允许根据提供的参数附加与数据库中的“jzr60v2aday”表相关的新数据 + def clean_jzr60_day(self, day): + with self.conn.cursor() as cursor: + sql_clean = "DELETE from jzr60v3aday where unixdt 0: + synced = True + self.conn.commit() + #print("synced", synced) + return synced + # “Arh99DbIf”类中的“check_ma730_sync”方法负责检查“ma730v2a”表是否同步。以下是其功能的细分: + # - ** 方法签名 **: + # - 'check_ma730_sync(self)':此方法不接受任何参数。 + # - ** 功能性 **: + # - 它首先将布尔变量“synced”初始化为“False”。 + # - 在'with'语句中,它创建一个游标来执行SQL查询。 + # - 它构造一个SQL'SELECT'查询来计算'ma730v2a'表中的行数。 + # - 执行查询后,它使用'cursor.fetchone()'获取结果。 + # - 如果结果不是“None”,它会检查结果字典中是否存在键“COUNT(id)”,以及其值是否大于“0”。如果是这样,它会将“synced”设置为“True”。 + # - 它提交光标所做的任何更改。 + # - 最后,它返回'synced'的值,指示表是否同步。 + # 此方法提供了一种方法,通过检查“ma730v2a”表是否包含任何记录来确定它是否同步。如果是这样,则认为该表是同步的 + def append_ma730(self, day, price, ma730, ma365, ma200): + with self.conn.cursor() as cursor: + sql_query = "SELECT COUNT(id) FROM `ma730v3a` WHERE unixdt=FROM_UNIXTIME(%s)" + cursor.execute(sql_query, (int(day),)) + result = cursor.fetchone() + #print(dt_utc) + #print(result) + if result is not None: + if "COUNT(id)" in result: + ma730x5 = ma730*5 + if result["COUNT(id)"] > 0: + sql_update = 'UPDATE ma730v3a SET `ma730`=%s, `ma730x5`=%s, `ma365`=%s, `ma200`=%s, `price`=%s, `unixdt`=FROM_UNIXTIME(%s) WHERE unixdt=FROM_UNIXTIME(%s)' + print(sql_update) + cursor.execute(sql_update, (ma730, ma730x5, ma365, ma200, price, int(day), int(day))) + else: + sql_insert = "INSERT INTO `ma730v3a` (`unixdt`, `price`, `ma730`, `ma730x5`, `ma365`, `ma200`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, ma730, ma730x5, ma365, ma200)) + self.conn.commit() + # “Arh99DbIf”类中的“append_ma730”方法负责将与730天移动平均线相关的数据附加到数据库。以下是其功能的细分: + # - ** 方法签名 **: + # - 'append_ma730(self, day, price, ma730, ma365, ma200)':此方法采用表示日期(UNIX时间戳格式)、价格、730天移动平均线、365天移动平均线和200天移动平均线的参数。 + # - ** 功能性 **: + # - 它首先创建一个游标,以便在“with”语句中执行 SQL 查询。 + # - 该方法构造一个SQL“SELECT”查询,以计算“ma730v2a”表中“unixdt”列与给定日期匹配的行数。 + # - 执行查询后,它使用'cursor.fetchone()'获取结果。 + # - 如果结果不是“None”,则检查结果字典中是否存在键“COUNT(id)”。 + # - 它将“ma730x5”计算为“ma730”值的五倍。 + # - 如果记录计数大于'0',则构造SQL'UPDATE'语句,使用提供的数据更新现有记录。 + # - 如果给定日期没有记录,它将构造一个SQL'INSERT'语句,以插入包含所提供数据的新记录。 + # - 更改将提交到数据库,并且方法完成。 + # 此方法允许将与730天移动平均线相关的数据添加到数据库中的“ma730v2a”表中。如果给定日期的记录已存在,则会更新现有记录; + # 否则,它会插入一个新的记录 + def append_ma730_day(self, day, price, ma730, ma365, ma200): + with self.conn.cursor() as cursor: + ma730x5 = ma730*5 + sql_insert = "INSERT INTO `ma730v3aday` (`unixdt`, `price`, `ma730`, `ma730x5`, `ma365`, `ma200`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, ma730, ma730x5, ma365, ma200)) + self.conn.commit() + # “Arh99DbIf”类中的“append_ma730_day”方法负责将与730天移动平均线相关的每日数据附加到数据库。以下是其功能的细分: + # - ** 方法签名 **: + # - 'append_ma730_day(self, day, price, ma730, ma365, ma200)':此方法采用表示日期(UNIX时间戳格式)、价格、730天移动平均线、365天移动平均线和200天移动平均线的参数。 + # - ** 功能性 **: + # - 它首先创建一个游标,以便在“with”语句中执行 SQL 查询。 + # - 它将“ma730x5”计算为“ma730”值的五倍。 + # - 它构造一个SQL'INSERT'语句,将新记录插入到包含所提供数据的'ma730v2aday'表中。 + # - 打印SQL查询以进行调试。 + # - 该方法使用'cursor.execute()'使用提供的数据执行SQL'INSERT'语句。 + # - 更改将提交到数据库,并且方法完成。 + # 此方法允许将与730天移动平均线相关的每日数据添加到数据库中的“ma730v2aday”表中。每次调用此方法都会添加一条新记录,表示当天的数据 + def clean_ma730_day(self, day): + with self.conn.cursor() as cursor: + sql_clean = "DELETE from ma730v3aday where unixdt 0: + return total/cnt + return 0 +# 函数“cal_day200_price(prices, day)”计算比特币在 200 天内的平均价格。 +# - **功能签名**: +# - 'cal_day200_price(prices, day)':此函数采用两个参数: +# - 'prices':包含比特币历史价格的字典。 +# - 'day':需要计算平均价格的当天。 +# -**功能性**: +# - 它初始化变量“total”和“cnt”,分别存储价格总和和天数。 +# - 它在 200 天的范围内进行迭代,在每次迭代中将“天”减少一天。 +# - 对于每一天,如果“价格”字典中提供了当天的价格,则会将其添加到“总计”中并递增“cnt”。 +# - 循环后,它通过将“总计”除以“cnt”来计算平均价格。 +# - 如果 200 天内的任何一天没有可用的价格,则返回 0 作为平均价格。 +# - 否则,它将返回计算出的平均价格 +def cal_arh99(prices, day, price): + day200 = cal_day200_price(prices, day) + #print("day200", day200) + days = get_coin_days(day) + #print("days", days) + exp = get_coin_exp(days) + #print("exp", exp, price) + try: + arh99 = (float(price)/day200)*(float(price)/exp) + arh99x = (day200/float(price))*(exp/float(price))*3 + except: + arh99 = 0 + arh99x = 0 + #print("arh99", arh99) + print("cal_arh99", day, price, arh99, arh99x) + return arh99, arh99x +# 函数“cal_arh99(prices, day, price)”计算比特币给定日期和价格的 ARH99 和 ARH99x 值。 +# - **功能签名**: +# - 'cal_arh99(prices, day, price)':此函数采用三个参数: +# - 'prices':包含比特币历史价格的字典。 +# - 'day':需要计算 ARH99 和 ARH99x 值的当前日期。 +# - 'price':比特币在给定日期的价格。 +# -**功能性**: +# - 它首先使用“cal_day200_price()”函数计算比特币的 200 天平均价格。 +# - 然后,它使用“get_coin_days()”函数计算比特币诞生以来的天数。 +# - 接下来,它使用“get_coin_exp()”函数计算比特币的预期价格。 +# - 然后,它使用提供的公式计算 ARH99 和 ARH99x 值: +# - 'arh99 = (价格 / day200) * (价格 / exp)' +# - 'arh99x = (day200 / 价格) * (exp / 价格) * 3' +# - 如果在计算过程中出现任何错误(例如,除以零),则将“arh99”和“arh99x”都设置为0。 +# - 最后,它打印计算值并返回 'arh99' 和 'arh99x' +def check_sync(dbif): + return dbif.check_sync() +# “check_sync(dbif)”函数检查与提供的“dbif”对象关联的数据库表是否同步。以下是其功能的细分: +# - **功能签名**: +# - 'check_sync(dbif)':此函数采用单个参数: +# - 'dbif':表示数据库接口的对象。 +# -**功能性**: +# - 它调用提供的 'dbif' 对象的 'check_sync()' 方法,该方法可能检查数据库中的特定表是否同步。 +# - 该方法返回一个布尔值,指示同步是否完成 ('True') 或未完成 ('False')。 +# - 最后,该函数返回此布尔值。 +# 在“dbif”对象中实际实现“check_sync()”方法将涉及查询数据库以检查同步指示器,例如是否存在某些表示同步过程完成的记录或标志 +def append_arh99(dbif, day, price, arh99, arh99x): + dbif.append(day, price, arh99, arh99x) +# 'append_arh99(dbif, day, price, arh99, arh99x)' 函数用于通过提供的 'dbif' 对象将与 ARH99 和 ARH99X 值相关的数据附加到数据库中。以下是其功能的细分: +# - **功能签名**: +# - 'append_arh99(dbif, day, price, arh99, arh99x)':此函数采用五个参数: +# - 'dbif':表示数据库接口的对象。 +# - 'day':计算 ARH99 和 ARH99X 值的日期。 +# - 'price':与所提供日期相关的价格。 +# - 'arh99':计算出的 ARH99 值。 +# - 'arh99x':计算出的 ARH99X 值。 +# -**功能性**: +# - 它调用提供的 'dbif' 对象的 'append()' 方法,在数据库中插入或更新 ARH99 和 ARH99X 值。 +# - 'append()' 方法可能会执行 SQL 查询,以便在数据库中不存在指定日期的数据时插入新记录,或者更新现有记录(如果存在)。 +# - 传递给 'append()' 方法的参数包括 day、price、ARH99 和 ARH99X 值。 +# - 将数据追加到数据库后,将提交更改。 +# 在“dbif”对象中实际实现“append()”方法将涉及执行 SQL 查询以与数据库交互并相应地插入或更新 ARH99 和 ARH99X 值。 +def sync_arh99(dbif, prices): + for day in prices: + print(day, prices[day]) + arh99, arh99x = cal_arh99(prices, int(day), prices[day]) + print(day, arh99, arh99x) + append_arh99(dbif, day, prices[day], arh99, arh99x) +# “sync_arh99(dbif, prices)”功能似乎将 ARH99 和 ARH99X 数据与提供的价格数据同步。其工作原理如下: +# - **功能签名**: +# - 'sync_arh99(dbif, prices)':此函数采用两个参数: +# - 'dbif':表示数据库接口的对象。 +# - 'prices':包含不同日期价格数据的字典。 +# -**功能性**: +# - 它在“价格”字典中每天都会迭代。 +# - 对于每天,它使用“cal_arh99()”函数计算 ARH99 和 ARH99X 值。 +# - 'cal_arh99()' 函数根据特定日期的价格数据计算这些值。 +# - 计算完成后,它会调用 'append_arh99()' 函数,使用提供的 'dbif' 对象将 day、price、ARH99 和 ARH99X 值附加到数据库中。 +# - 在附加之前,它会打印出日期和相关价格,以及计算出的 ARH99 和 ARH99X 值,以便进行调试或监控。 +# 此功能实质上是将数据库中的 ARH99 和 ARH99X 数据与提供的价格数据同步,确保数据库反映最新的计算值 +def append_arh99day(dbif, day, price, arh99, arh99x): + dbif.append_day(day, price, arh99, arh99x) +# 'append_arh99day(dbif, day, price, arh99, arh99x)' 函数似乎将特定日期的 ARH99 和 ARH99X 数据附加到数据库中。其工作原理如下: +# - **功能签名**: +# - 'append_arh99day(dbif, day, price, arh99, arh99x)':此函数采用五个参数: +# - 'dbif':表示数据库接口的对象。 +# - 'day':追加 ARH99 和 ARH99X 数据的日期。 +# - 'price':指定日期的价格。 +# - 'arh99':指定日期的计算 ARH99 值。 +# - 'arh99x':指定日期的计算值 ARH99X。 +# -**功能性**: +# - 它调用 'dbif' 对象的 'append_day()' 方法,传递 'day'、'price'、'arh99' 和 'arh99x' 值作为参数。 +# - “append_day()”方法应处理将数据插入到与 ARH99 值关联的数据库表中。 +# - 此函数实质上是将指定日期的 ARH99 和 ARH99X 数据附加到数据库接口对象的任务委托给数据库接口对象。 +# 此函数可用于每天或特定时间间隔将 ARH99 和 ARH99X 数据附加到数据库 +def clean_arh99day(dbif, day): + dbif.clean_day(day) +# “clean_arh99day(dbif, day)”功能似乎从数据库中清除了特定日期的 ARH99 和 ARH99X 数据。其工作原理如下: +# - **功能签名**: +# - 'clean_arh99day(dbif, day)':此函数采用两个参数: +# - 'dbif':表示数据库接口的对象。 +# - 'day':清理 ARH99 和 ARH99X 数据的日期。 +# -**功能性**: +# - 它调用 'dbif' 对象的 'clean_day()' 方法,将 'day' 值作为参数传递。 +# - “clean_day()”方法应处理从数据库表中删除指定日期的 ARH99 和 ARH99X 数据。 +# - 该函数实质上是将清理指定日期的 ARH99 和 ARH99X 数据的任务委托给数据库接口对象。 +# 此功能可用于从数据库中删除过时或不必要的 ARH99 和 ARH99X 数据,以保持数据存储的高效和最新。 +def arh99_handler(message): + global g_prices + global g_dbif + global g_lastts + coin_data = message["data"] + #coin_symbol = coin_data["s"] + coin_ts = int(coin_data["E"]) + coin_price = float(coin_data["c"]) + #print((coin_ts/1000), int((coin_ts/1000)%60)) + if int((coin_ts/1000)%60) == 0: + #if coin_ts / 1000 / 60 != g_lastts: + if coin_ts/1000 - g_lastts >= 15: + #print(coin_ts, coin_price) + coin_ts2 = time.gmtime(coin_ts/1000) + daystr = time.strftime("%d %b %Y", coin_ts2) + print(daystr) + dayutc = int(time.mktime(time.strptime(daystr, "%d %b %Y"))) + g_prices[str(dayutc)] = coin_price + arh99, arh99x = cal_arh99(g_prices, dayutc, coin_price) + print(dayutc, coin_price, arh99, arh99x) + append_arh99day(g_dbif, coin_ts/1000, coin_price, arh99, arh99x) + append_arh99(g_dbif, dayutc, coin_price, arh99, arh99x) + clean_day = dayutc - 3600*24*2 + clean_arh99day(g_dbif, clean_day) + + handle_jzr_day60(g_dbif, coin_ts/1000, dayutc, coin_price, g_prices) + handle_ma_day730(g_dbif, coin_ts / 1000, dayutc, coin_price, g_prices) + g_lastts = coin_ts/1000 +# “arh99_handler(message)”函数似乎用于处理与 ARH99 计算相关的传入消息。以下是其功能的细分: +# - **功能签名**: +# - 'arh99_handler(message)':此函数采用单个参数 'message',该参数应为包含与 ARH99 计算相关的数据的字典。 +# - **全局变量**: +# - “g_prices”:存储一段时间内的价格数据。 +# - 'g_dbif':表示数据库接口对象。 +# - “g_lastts”:跟踪上次处理的消息的时间戳。 +# -**功能性**: +# - 从传入消息中提取相关数据,例如时间戳 ('coin_ts') 和价格 ('coin_price')。 +# - 检查当前时间戳是否与新小时的开始相对应。 +# - 如果是新小时的开始(当时间戳的秒部分为 0 时): +# - 使用相应日期的新价格更新“g_prices”。 +# - 使用“cal_arh99()”函数计算 ARH99 和 ARH99X 值。 +# - 使用“append_arh99day()”和“append_arh99()”函数将ARH99和ARH99X数据附加到数据库。 +# - 使用“clean_arh99day()”函数清理过时的 ARH99 和 ARH99X 数据。 +# - 调用 'handle_jzr_day60()' 和 'handle_马_day730()' 函数来处理相关计算。 +# - 使用当前时间戳更新“g_lastts”。 +# 此函数是处理 ARH99 相关消息、执行必要计算和相应更新数据库的主要入口点。 +def start_arh99(dbif, prices): + ws_client = WebsocketClient() + ws_client.start() + ws_client.instant_subscribe( + stream=['btcusdt@miniTicker'], + callback=arh99_handler, + ) +# “start_arh99”功能似乎启动了监控 ARH99 计算的实时数据的过程。下面是一个细分: +# - **功能签名**: +# - 'start_arh99(dbif, prices)':此函数采用两个参数: +# - 'dbif':用于数据存储的数据库接口对象。 +# - 'prices':存储历史价格数据的字典。 +# -**功能性**: +# - 启动 WebSocket 客户端 ('ws_client') 以连接到数据源。 +# - 启动 WebSocket 客户端。 +# - 订阅“btcusdt@miniTicker”流,该流可能会提供比特币兑 USDT 的实时价格更新。 +# - 指定 'arh99_handler' 作为回调函数,用于处理来自订阅流的传入消息。 +# 通过启动 WebSocket 客户端并订阅相关流,此功能可以持续监控比特币的实时价格数据,并触发“arh99_handler”功能来处理传入的消息并相应地更新数据库 +def arh99(): + global g_dbif + g_dbif = Arh99DbIf() + prices = get_history_price2(g_dbif) + # print(prices) + # list1 = [] + # list2 = [] + # for key, value in prices.items(): + # old_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(key))) + # new_time = old_time[0:10] + " 08:00:00" + # new_key = int(time.mktime(time.strptime(new_time, "%Y-%m-%d %H:%M:%S"))) + # list1.append(str(new_key)) + # list2.append(value) + # prices = dict(zip(list1, list2)) + #if not check_sync(g_dbif): + if True: + sync_arh99(g_dbif, prices) + #if not check_jzr60_sync(g_dbif): + if True: + sync_jzr_day60(g_dbif, prices) + #if not check_ma730_sync(g_dbif): + if True: + sync_ma_day730(g_dbif, prices) + start_arh99(g_dbif, prices) +# “arh99”功能似乎是启动 ARH99 监控过程的入口点。下面是一个细分: +# - **功能签名**: +# - 'arh99()':此函数不接受任何参数。 +# -**功能性**: +# - 使用“Arh99DbIf”实例初始化全局“g_dbif”变量,表示 ARH99 数据存储的数据库接口。 +# - 使用“get_history_price2”功能检索历史价格数据。 +# - 打印检索到的历史价格数据。 +# - 通过调用“sync_arh99”函数启动 ARH99 数据与数据库的同步。 +# - 通过调用“sync_jzr_day60”函数启动 JZR60 数据与数据库的同步。 +# - 通过调用“sync_马_day730”函数启动MA730数据与数据库的同步。 +# - 通过格数据的“start_a调用具有数据库接口和历史价rh99”函数来启动 ARH99 监控过程 +#2-year ma multiplier +def get_day730_rise(day, prices): + total = 0 + cnt = 0 + for i in range(730): + if str(day) in prices: + cur_price = prices[str(day)] + total += cur_price + cnt += 1 + day = str(day - 3600 * 24) + day = int(day) + if cnt > 0: + return total/cnt + print("get_day730_rise", day, total, cnt) + return 0 +# “get_day730_rise”函数计算 730 天内的平均价格上涨。 +# -**参数**: +# - 'day':计算平均价格上涨的起始日期。 +# - 'prices':包含历史价格数据的字典,其中键是时间戳,值是相应的价格。 +# -**功能性**: +# - 在给定“日”之前的 730 天范围内进行迭代。 +# - 检查“prices”字典中是否存在当天的价格。 +# - 如果价格存在,它会将价格添加到“总计”中,并递增“cnt”变量,该变量计算可用价格的天数。 +# - 在所有天数遍历后,它通过将价格总和除以天数 ('cnt') 来计算平均价格上涨。 +# - 如果没有可用的价格 ('cnt == 0'),则返回 0。 +# - 打印调试信息,包括日期、价格总和以及可用价格的天数 +def get_day365_rise(day, maxdays, prices): + total = 0 + cnt = 0 + for i in range(maxdays): + if str(day) in prices: + cur_price = prices[str(day)] + total += cur_price + cnt += 1 + day = str(day - 3600 * 24) + day = int(day) + if cnt > 0: + return total/cnt + print("get_day365_rise", total, cnt) + return 0 +# “get_day365_rise”功能计算指定天数(最多 365 天)的平均价格上涨。 +# -**参数**: +# - 'day':计算平均价格上涨的起始日期。 +# - “maxdays”:计算时要考虑的最大天数。 +# - 'prices':包含历史价格数据的字典,其中键是时间戳,值是相应的价格。 +# -**功能性**: +# - 遍历“maxdays”范围,表示给定“day”之前的天数。 +# - 检查“prices”字典中是否存在当天的价格。 +# - 如果价格存在,它会将价格添加到“总计”中,并递增“cnt”变量,该变量计算可用价格的天数。 +# - 在所有天数遍历后,它通过将价格总和除以天数 ('cnt') 来计算平均价格上涨。 +# - 如果没有可用的价格 ('cnt == 0'),则返回 0。 +# - 打印调试信息,包括价格总和以及可用价格的天数。 +def append_ma_day730(dbif, day, price, day730_rise, day365_rise, day200_rise): + dbif.append_ma730(day, price, day730_rise, day365_rise, day200_rise) +# “append_马_day730”函数似乎将与 730 天移动平均线相关的数据附加到数据库。 +# -**参数**: +# - 'dbif':数据库接口对象的实例。 +# - 'day':追加数据的日期的时间戳。 +# - 'price':与当天相关的价格值。 +# - “day730_rise”:计算出过去 730 天内的平均价格涨幅。 +# - “day365_rise”:过去 365 天内计算出的平均价格涨幅。 +# - “day200_rise”:过去 200 天内计算出的平均价格涨幅。 +# -**功能性**: +# - 使用“dbif”对象将提供的数据追加到与 730 天移动平均线相关的数据库表中。 +# - 附加数据包括当天的时间戳(“天”)、相应的价格(“价格”)以及计算出的平均价格在 730、365 和 200 天内的上涨。 +# 此函数看起来很简单,并且应该按预期工作,假设数据库接口 ('dbif') 具有处理数据追加的方法。 +def sync_ma_day730(dbif, prices): + for day in prices: + print(day, prices[day]) + day730_rise = get_day730_rise(int(day), prices) + day365_rise = get_day365_rise(int(day), 365, prices) + day200_rise = get_day365_rise(int(day), 200, prices) + print(day, day730_rise) + append_ma_day730(dbif, day, prices[day], day730_rise, day365_rise, day200_rise) +# “sync_ma_day730”函数似乎将与 730 天移动平均线相关的数据与数据库同步。 +# -**参数**: +# - 'dbif':数据库接口对象的实例。 +# - 'prices':包含历史价格数据的字典,其中键是时间戳,值是相应的价格值。 +# -**功能性**: +# - 在提供的“价格”字典中遍历每一天。 +# - 使用辅助函数计算过去 730 天 ('day730_rise')、365 天 ('day365_rise') 和 200 天 ('day200_rise') 的平均价格上涨。 +# - 使用“append_ma_day730”函数将计算数据追加到数据库。 +# 此功能应有效地将 730 天移动平均线数据与历史价格数据中每天的数据库同步。 +def check_ma730_sync(dbif): + return dbif.check_ma730_sync() +# “check_ma730_sync”功能显示,用于检查与730天移动平均线相关的数据是否与数据库同步。 +# -**参数**: +# - 'dbif':数据库接口对象的实例。 +# -**功能性**: +# - 查询数据库接口对象,查看730日移动平均线相关数据是否同步。 +# - 返回一个布尔值,指示同步检查是否成功。 +# 此函数允许您验证与 730 天移动平均线相关的数据是否是最新的并与数据库同步 +def append_ma730day(dbif, day, price, day730_rise, day365_rise, day200_rise): + dbif.append_ma730_day(day, price, day730_rise, day365_rise, day200_rise) +# “append_ma730day”函数似乎将与 730 天移动平均线相关的数据附加到数据库表中。 +# -**参数**: +# - 'dbif':数据库接口对象的实例。 +# - 'day':表示日期的 Unix 时间戳。 +# - 'price':价格值。 +# - “day730_rise”:过去 730 天的价格上涨。 +# - “day365_rise”:过去 365 天内的价格上涨。 +# - “day200_rise”:过去 200 天的价格上涨。 +# -**功能性**: +# - 使用提供的数据在数据库表中插入 730 天移动平均线的新记录。 +# 该功能负责将与 730 天移动平均线相关的每日数据附加到数据库中,确保存储历史价格信息以供分析和将来参考 +def append_ma730(dbif, dayutc, price, day730_rise, day365_rise, day200_rise): + dbif.append_ma730(dayutc, price, day730_rise, day365_rise, day200_rise) +# “append_ma730”函数似乎将与 730 天移动平均线相关的数据附加到数据库表中。 +# -**参数**: +# - 'dbif':数据库接口对象的实例。 +# - 'dayutc':表示日期的 Unix 时间戳。 +# - 'price':价格值。 +# - “day730_rise”:过去 730 天的价格上涨。 +# - “day365_rise”:过去 365 天内的价格上涨。 +# - “day200_rise”:过去 200 天的价格上涨。 +# -**功能性**: +# - 使用提供的数据在数据库表中插入 730 天移动平均线的新记录。 +# 此功能似乎是跟踪和分析不同时间段内价格走势的系统的一部分,特别关注 730 天的走势 +def clean_ma730day(dbif, clean_day): + dbif.clean_ma730_day(clean_day) +# “clean_ma730day”功能负责从数据库中清理与特定日期的 730 天移动平均线相关的数据。 +# -**参数**: +# - 'dbif':数据库接口对象的实例。 +# - 'clean_day':表示需要清理数据的日期的 Unix 时间戳。 +# -**功能性**: +# - 从数据库表中删除早于指定“clean_day”的 730 天移动平均线的记录。这有助于通过删除过时或不必要的数据来维护数据库 +def handle_ma_day730(dbif, day, dayutc, price, prices): + day730_rise = get_day730_rise(dayutc, prices) + day365_rise = get_day365_rise(dayutc, 365, prices) + day200_rise = get_day365_rise(dayutc, 200, prices) + print(dayutc, price, day, day730_rise) + append_ma730day(dbif, day, price, day730_rise, day365_rise, day200_rise) + append_ma730(dbif, dayutc, price, day730_rise, day365_rise, day200_rise) + clean_day = dayutc - 3600 * 24 * 2 + clean_ma730day(dbif, clean_day) +# “handle_ma_day730”函数计算并处理与特定日期的 730 天移动平均线相关的数据。 +# -**参数**: +# - 'dbif':数据库接口对象的实例。 +# - 'day':表示日期的 Unix 时间戳。 +# - 'dayutc':表示 UTC 日期的 Unix 时间戳。 +# - 'price':当天的价格值。 +# - 'prices':包含不同日期价格数据的字典。 +# -**功能性**: +# - 使用“get_day730_rise”和“get_day365_rise”函数计算给定“dayUTC”的 730 天移动平均线、365 天移动平均线和 200 天移动平均线。 +# - 使用“append_ma730day”和“append_ma730”函数将计算出的移动平均数据附加到相应的数据库表中。 +# - 使用“clean_ma730day”功能清理数据库中的过时数据,以确保数据的完整性和效率 +arh99() +# 此函数似乎处理与 ARH99(从比特币价格派生的指标)相关的数据同步、计算和附加到数据库的过程。arh99() +# 下面是该函数功能的简要概述:arh99() +# 它初始化数据库接口。 +# 它使用该函数检索历史价格数据。get_history_price2() +# 它检查数据库中的 ARH99 数据是否需要同步。 +# 如果需要同步,它会使用该功能同步 ARH99 数据。sync_arh99() +# 它检查数据库中 60 天的 JZR (Juzhen) 数据是否需要同步。 +# 如果需要同步,它将使用该函数同步 60 天的 JZR 数据。sync_jzr_day60() +# 它检查数据库中的 730 天移动平均线 (MA730) 数据是否需要同步。 +# 如果需要同步,它会使用该功能同步 MA730 数据。sync_ma_day730() +# 它启动 ARH99 websocket 客户端以侦听实时数据更新并使用该函数进行处理。arh99_handler() +# 该功能似乎协调了管理和更新与比特币价格及其衍生品相关的各种指标和数据点的整个过程 + diff --git a/coinbus/arh999eth_lyq.py b/coinbus/arh999eth_lyq.py new file mode 100644 index 0000000..b0fc2f1 --- /dev/null +++ b/coinbus/arh999eth_lyq.py @@ -0,0 +1,504 @@ +# coding=utf-8 +import ujson +from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient +import time +import requests +from loguru import logger +import datetime +import pymysql +import math +import csv + +g_prices = {} +g_dbif = None +g_lastts = 0 +def get_day60_rise(day, prices): + total = 0 + cnt = 0 + for i in range(60): + if str(day) in prices: + cur_price = prices[str(day)] + day = str(day - 3600 * 24) + if day in prices: + prev_price = prices[day] + try: + #print(((cur_price-prev_price)/prev_price), day, cur_price, prev_price) + total += (((cur_price-prev_price)/prev_price)) + cnt += 1 + except: + pass + # print(day, total, cnt) + day = int(day) + return total + +def get_days_rise(day, maxdays, prices): + total = 0 + cnt = 0 + for i in range(maxdays): + if str(day) in prices: + cur_price = prices[str(day)] + day = str(day - 3600 * 24) + if day in prices: + prev_price = prices[day] + try: + #print(((cur_price-prev_price)/prev_price), day, cur_price, prev_price) + total += (((cur_price-prev_price)/prev_price)) + cnt += 1 + except: + pass + # print(day, total, cnt) + day = int(day) + return total + +def append_jzr_day60(dbif, day, price, day60_rise, day7_rise, day30_rise, day90_rise): + dbif.append_jzr60(day, price, day60_rise, day7_rise, day30_rise, day90_rise) + +def sync_jzr_day60(dbif, prices): + for day in prices: + print(day, prices[day]) + day60_rise = get_days_rise(int(day), 60, prices) + day7_rise = get_days_rise(int(day), 7, prices) + day30_rise = get_days_rise(int(day), 30, prices) + day90_rise = get_days_rise(int(day), 90, prices) + print(day, day60_rise) + append_jzr_day60(dbif, day, prices[day], day60_rise, day7_rise, day30_rise, day90_rise) + +def check_jzr60_sync(dbif): + return dbif.check_jzr60_sync() + +def append_jzr60day(dbif, day, price, day60_rise, day7_rise, day30_rise, day90_rise): + dbif.append_jzr60_day(day, price, day60_rise, day7_rise, day30_rise, day90_rise) + +def append_jzr60(dbif, dayutc, price, day60_rise, day7_rise, day30_rise, day90_rise): + dbif.append_jzr60(dayutc, price, day60_rise, day7_rise, day30_rise, day90_rise) + +def clean_jzr60day(dbif, clean_day): + dbif.clean_jzr60_day(clean_day) + +def handle_jzr_day60(dbif, day, dayutc, price, prices): + day60_rise = get_days_rise(dayutc, 60, prices) + day7_rise = get_days_rise(dayutc, 7, prices) + day30_rise = get_days_rise(dayutc, 30, prices) + day90_rise = get_days_rise(dayutc, 90, prices) + print(dayutc, price, day, day60_rise) + append_jzr60day(dbif, day, price, day60_rise, day7_rise, day30_rise, day90_rise) + append_jzr60(dbif, dayutc, price, day60_rise, day7_rise, day30_rise, day90_rise) + clean_day = dayutc - 3600 * 24 * 2 + clean_jzr60day(dbif, clean_day) + +class Arh99DbIf: + def __init__(self, host="172.17.0.1", port=4423, user="root", password="2GS@bPYcgiMyL14A", dbname="ethdb"): + self.conn = pymysql.connect(host=host, port=port, user=user, password=password, database=dbname, cursorclass=pymysql.cursors.DictCursor) + print("init arh99 db suceess!") + + def check_sync(self): + synced = False + with self.conn.cursor() as cursor: + sql_query = "SELECT COUNT(id) FROM `arh99v3a`" + cursor.execute(sql_query) + result = cursor.fetchone() + print(result) + if result is not None: + if "COUNT(id)" in result: + if result["COUNT(id)"] > 0: + synced = True + self.conn.commit() + #print("synced", synced) + return synced + + def append(self, day, price, arh99, arh99x): + with self.conn.cursor() as cursor: + sql_query = "SELECT COUNT(id) FROM `arh99v3a` WHERE unixdt=FROM_UNIXTIME(%s)" + cursor.execute(sql_query, (int(day),)) + result = cursor.fetchone() + #print(dt_utc) + #print(result) + if result is not None: + if "COUNT(id)" in result: + if result["COUNT(id)"] > 0: + sql_update = 'UPDATE arh99v3a SET `arh99`=%s, `arh99x`=%s, `price`=%s, `unixdt`=FROM_UNIXTIME(%s) WHERE unixdt=FROM_UNIXTIME(%s)' + print(sql_update) + cursor.execute(sql_update, (arh99, arh99x, price, int(day), int(day))) + else: + sql_insert = "INSERT INTO `arh99v3a` (`unixdt`, `price`, `arh99`, `arh99x`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, arh99, arh99x)) + self.conn.commit() + + def append_day(self, day, price, arh99, arh99x): + with self.conn.cursor() as cursor: + sql_insert = "INSERT INTO `arh99v3aday` (`unixdt`, `price`, `arh99`, `arh99x`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, arh99, arh99x)) + self.conn.commit() + + def clean_day(self, day): + with self.conn.cursor() as cursor: + sql_clean = "DELETE from arh99v3aday where unixdt 0: + synced = True + self.conn.commit() + #print("synced", synced) + return synced + + def append_jzr60(self, day, price, jzr60, jzr7, jzr30, jzr90): + with self.conn.cursor() as cursor: + sql_query = "SELECT COUNT(id) FROM `jzr60v3a` WHERE unixdt=FROM_UNIXTIME(%s)" + cursor.execute(sql_query, (int(day),)) + result = cursor.fetchone() + #print(dt_utc) + #print(result) + if result is not None: + if "COUNT(id)" in result: + if result["COUNT(id)"] > 0: + sql_update = 'UPDATE jzr60v3a SET `jzr60`=%s,`jzr7`=%s,`jzr30`=%s,`jzr90`=%s,`price`=%s, `unixdt`=FROM_UNIXTIME(%s) WHERE unixdt=FROM_UNIXTIME(%s)' + print(sql_update) + cursor.execute(sql_update, (jzr60, jzr7, jzr30, jzr90, price, int(day), int(day))) + else: + sql_insert = "INSERT INTO `jzr60v3a` (`unixdt`, `price`, `jzr60`, `jzr7`, `jzr30`, `jzr90`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, jzr60, jzr7, jzr30, jzr90)) + self.conn.commit() + + def append_jzr60_day(self, day, price, jzr60, jzr7, jzr30, jzr90): + with self.conn.cursor() as cursor: + sql_insert = "INSERT INTO `jzr60v3aday` (`unixdt`, `price`, `jzr60`, `jzr7`, `jzr30`, `jzr90`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, jzr60, jzr7, jzr30, jzr90)) + self.conn.commit() + + def clean_jzr60_day(self, day): + with self.conn.cursor() as cursor: + sql_clean = "DELETE from jzr60v3aday where unixdt 0: + synced = True + self.conn.commit() + #print("synced", synced) + return synced + + def append_ma730(self, day, price, ma730, ma365, ma200): + with self.conn.cursor() as cursor: + sql_query = "SELECT COUNT(id) FROM `ma730v3a` WHERE unixdt=FROM_UNIXTIME(%s)" + cursor.execute(sql_query, (int(day),)) + result = cursor.fetchone() + #print(dt_utc) + #print(result) + if result is not None: + if "COUNT(id)" in result: + ma730x5 = ma730*5 + if result["COUNT(id)"] > 0: + sql_update = 'UPDATE ma730v3a SET `ma730`=%s, `ma730x5`=%s, `ma365`=%s, `ma200`=%s, `price`=%s, `unixdt`=FROM_UNIXTIME(%s) WHERE unixdt=FROM_UNIXTIME(%s)' + print(sql_update) + cursor.execute(sql_update, (ma730, ma730x5, ma365, ma200, price, int(day), int(day))) + else: + sql_insert = "INSERT INTO `ma730v3a` (`unixdt`, `price`, `ma730`, `ma730x5`, `ma365`, `ma200`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, ma730, ma730x5, ma365, ma200)) + self.conn.commit() + + def append_ma730_day(self, day, price, ma730, ma365, ma200): + with self.conn.cursor() as cursor: + ma730x5 = ma730*5 + sql_insert = "INSERT INTO `ma730v3aday` (`unixdt`, `price`, `ma730`, `ma730x5`, `ma365`, `ma200`) VALUES (FROM_UNIXTIME(%s), %s, %s, %s, %s, %s)" + print(sql_insert) + cursor.execute(sql_insert, (day, price, ma730, ma730x5, ma365, ma200)) + self.conn.commit() + + def clean_ma730_day(self, day): + with self.conn.cursor() as cursor: + sql_clean = "DELETE from ma730v3aday where unixdt 0: + return total/cnt + return 0 + +def cal_arh99(prices, day, price): + day200 = cal_day200_price(prices, day) + #print("day200", day200) + days = get_coin_days(day) + #print("days", days) + exp = get_coin_exp(days) + #print("exp", exp, price) + try: + arh99 = (float(price)/day200)*(float(price)/exp) + arh99x = (day200/float(price))*(exp/float(price))*3 + except: + arh99 = 0 + arh99x = 0 + #print("arh99", arh99) + + return arh99, arh99x + +def check_sync(dbif): + return dbif.check_sync() + +def append_arh99(dbif, day, price, arh99, arh99x): + dbif.append(day, price, arh99, arh99x) + +def sync_arh99(dbif, prices): + for day in prices: + print(day, prices[day]) + arh99, arh99x = cal_arh99(prices, int(day), prices[day]) + print(day, arh99, arh99x) + append_arh99(dbif, day, prices[day], arh99, arh99x) + +def append_arh99day(dbif, day, price, arh99, arh99x): + dbif.append_day(day, price, arh99, arh99x) + +def clean_arh99day(dbif, day): + dbif.clean_day(day) + +def arh99_handler(message): + global g_prices + global g_dbif + global g_lastts + coin_data = message["data"] + #coin_symbol = coin_data["s"] + coin_ts = int(coin_data["E"]) + coin_price = float(coin_data["c"]) + #print((coin_ts/1000), int((coin_ts/1000)%60)) + if int((coin_ts/1000)%60) == 0: + #if coin_ts/1000/60 != g_lastts: + if coin_ts/1000 - g_lastts >= 15: + #print(coin_ts, coin_price) + coin_ts2 = time.gmtime(coin_ts/1000) + daystr = time.strftime("%d %b %Y", coin_ts2) + print(daystr) + dayutc = int(time.mktime(time.strptime(daystr, "%d %b %Y"))) + g_prices[str(dayutc)] = coin_price + arh99, arh99x = cal_arh99(g_prices, dayutc, coin_price) + print(dayutc, coin_price, arh99, arh99x) + + append_arh99day(g_dbif, coin_ts/1000, coin_price, arh99, arh99x) + append_arh99(g_dbif, dayutc, coin_price, arh99, arh99x) + clean_day = dayutc - 3600*24*2 + clean_arh99day(g_dbif, clean_day) + + handle_jzr_day60(g_dbif, coin_ts/1000, dayutc, coin_price, g_prices) + handle_ma_day730(g_dbif, coin_ts / 1000, dayutc, coin_price, g_prices) + g_lastts = coin_ts/1000 + +def start_arh99(dbif, prices): + ws_client = WebsocketClient() + ws_client.start() + ws_client.instant_subscribe( + stream=['ethusdt@miniTicker'], + callback=arh99_handler, + ) + +def arh99(): + global g_dbif + g_dbif = Arh99DbIf() + prices = get_history_price2(g_dbif) + #if not check_sync(g_dbif): + if True: + sync_arh99(g_dbif, prices) + #if not check_jzr60_sync(g_dbif): + if True: + sync_jzr_day60(g_dbif, prices) + #if not check_ma730_sync(g_dbif): + if True: + sync_ma_day730(g_dbif, prices) + start_arh99(g_dbif, prices) + +#2-year ma multiplier +def get_day730_rise(day, prices): + total = 0 + cnt = 0 + for i in range(730): + if str(day) in prices: + cur_price = prices[str(day)] + total += cur_price + cnt += 1 + day = str(day - 3600 * 24) + day = int(day) + if cnt > 0: + return total/cnt + return 0 + +def get_day365_rise(day, maxdays, prices): + total = 0 + cnt = 0 + for i in range(maxdays): + if str(day) in prices: + cur_price = prices[str(day)] + total += cur_price + cnt += 1 + day = str(day - 3600 * 24) + day = int(day) + if cnt > 0: + return total/cnt + return 0 + +def append_ma_day730(dbif, day, price, day730_rise, day365_rise, day200_rise): + dbif.append_ma730(day, price, day730_rise, day365_rise, day200_rise) + +def sync_ma_day730(dbif, prices): + for day in prices: + print(day, prices[day]) + day730_rise = get_day730_rise(int(day), prices) + day365_rise = get_day365_rise(int(day), 365, prices) + day200_rise = get_day365_rise(int(day), 200, prices) + print(day, day730_rise) + append_ma_day730(dbif, day, prices[day], day730_rise, day365_rise, day200_rise) + +def check_ma730_sync(dbif): + return dbif.check_ma730_sync() + +def append_ma730day(dbif, day, price, day730_rise, day365_rise, day200_rise): + dbif.append_ma730_day(day, price, day730_rise, day365_rise, day200_rise) + +def append_ma730(dbif, dayutc, price, day730_rise, day365_rise, day200_rise): + dbif.append_ma730(dayutc, price, day730_rise, day365_rise, day200_rise) + +def clean_ma730day(dbif, clean_day): + dbif.clean_ma730_day(clean_day) + +def handle_ma_day730(dbif, day, dayutc, price, prices): + day730_rise = get_day730_rise(dayutc, prices) + day365_rise = get_day365_rise(dayutc, 365, prices) + day200_rise = get_day365_rise(dayutc, 200, prices) + print(dayutc, price, day, day730_rise) + append_ma730day(dbif, day, price, day730_rise, day365_rise, day200_rise) + append_ma730(dbif, dayutc, price, day730_rise, day365_rise, day200_rise) + clean_day = dayutc - 3600 * 24 * 2 + clean_ma730day(dbif, clean_day) + +arh99() + diff --git a/coinbus/check_order_lyq.py b/coinbus/check_order_lyq.py new file mode 100644 index 0000000..6bde9fe --- /dev/null +++ b/coinbus/check_order_lyq.py @@ -0,0 +1,184 @@ +# coding=utf-8 +import ujson +from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient +from binance.spot import Spot +import time +import requests +import datetime +import pymysql +import math +import pymongo + +g_spot_client = Spot() + + +class Pair: + def __init__(self): + pass + + depth_u = 0 + depth_U = 0 + depth_ts = 0 + bids = {} + asks = {} + + +g_btcusdt = None +g_btcusdt = None + + +def init_db(): + mc = pymongo.MongoClient("mongodb://127.0.0.1:27020/") + mdb = mc["border2"] + return mc, mdb + + +def get_depth(client, pair): + new_pair = Pair() + d = client.depth(pair, limit=5000) + new_pair.bids = d["bids"] + new_pair.asks = d["asks"] + new_pair.depth_u = d["lastUpdateId"] + print(pair, ": get_depth: init", new_pair.depth_u) + #print(new_pair.bids) + return new_pair + + +def dict2number(dict_in): + dict_out = {} + #print("dict2number", dict_in) + for id in dict_in: + #print("dict2number", id) + #price = (int(float(id[0])) / 100) * 100 + #price = float(id[0]) + quantity = float(id[1]) + #pricestr = str(price) + dict_out[id[0]] = quantity + return dict_out + + +def dict2save(mdb, pair, dict_in, ts): + mdbc = mdb[pair] + s_append = {} + s_append["unixdt"] = int(ts / 1000) + #cnt = 0 + for id in dict_in: + # print(cnt, id) + #if cnt >= 50: + #break + # bids_append[id] = top_bids[id] + s_append[id[0]] = id[1] + #cnt += 1 + print("dict2save", s_append) + mdbc.insert_one(s_append) + +def classify_order(dict_in): + dict_out = {} + for id in dict_in: + price = int(int(float(id))/100)*100 + pricestr = str(price) + if pricestr in dict_out: + dict_out[pricestr] = dict_out[pricestr]+dict_in[id] + else: + dict_out[pricestr] = dict_in[id] + return dict_out + +def stat_order(pair, bids_in, asks_in, ts, old_ts): + print(pair, ": stat_order cmp", ts, old_ts) + if ts - old_ts < 1000 * 60 * 5: + return False + bids = dict2number(bids_in) + asks = dict2number(asks_in) + + bids_classify = classify_order(bids) + asks_classify = classify_order(asks) + print("bids_classify", bids_classify) + top_bids = sorted(bids_classify.items(), key=lambda x: x[1], reverse=False) + top_asks = sorted(asks_classify.items(), key=lambda x: x[1], reverse=False) + print("top_bids", top_bids) + mc, mdb = init_db() + + dict2save(mdb, pair + "_bids", top_bids, ts) + dict2save(mdb, pair + "_asks", top_asks, ts) + print(pair, ": stat_order OK at", ts) + return True + + +def merge_order(dst, src): + new_dst = [] + for dst_item in dst: + found = False + for src_item in src: + #print("dst", dst_item, "src", src_item) + if dst_item[0] == src_item[0]: + new_dst.append(src_item) + found = True + break + if found is False: + #print("merge_order dst copy", dst_item) + new_dst.append(dst_item) + return new_dst + + +def handler_order(pair, pair_name, msg_in, client): + ts = msg_in["E"] + dU = msg_in["U"] + du = msg_in["u"] + need_reinit = False + if pair is not None: + if (dU == pair.depth_u + 1) or ( + (du > pair.depth_u) and (pair.depth_ts == 0) and (pair.depth_u != 0)): + bids = msg_in["b"] + asks = msg_in["a"] + #print("merge_order dst", pair.bids) + #print("merge_order src", bids) + #print("handle", pair_name, ts, dU, du, pair.depth_u) + pair.bids = merge_order(pair.bids, bids) + pair.asks = merge_order(pair.asks, asks) + pair.depth_U = dU + pair.depth_u = du + if stat_order(pair_name, pair.bids, pair.asks, ts, pair.depth_ts): + pair.depth_ts = ts + print(pair_name, ": append", du) + else: + if (dU != pair.depth_u + 1) and (pair.depth_u != 0): + need_reinit = True + else: + pass + if need_reinit: + pair = get_depth(client, pair_name) + print(pair_name, ": reinit", pair.depth_u, dU, pair.depth_ts) + return pair + + +def order_handler(message): + #print(message) + global g_spot_client + global g_btcusdt + global g_ethusdt + if message["stream"] == "btcusdt@depth": + ddata = message["data"] + if ddata["e"] == "depthUpdate": + g_btcusdt = handler_order(g_btcusdt, "BTCUSDT", ddata, g_spot_client) + elif message["stream"] == "ethusdt@depth": + ddata = message["data"] + if ddata["e"] == "depthUpdate": + g_ethusdt = handler_order(g_ethusdt, "ETHUSDT", ddata, g_spot_client) + else: + pass + +def check_order(): + global g_spot_client + global g_btcusdt + global g_ethusdt + ws_client = WebsocketClient() + ws_client.start() + ws_client.instant_subscribe( + stream=['btcusdt@depth', 'ethusdt@depth'], + callback=order_handler, + ) + g_btcusdt = get_depth(g_spot_client, "BTCUSDT") + g_ethusdt = get_depth(g_spot_client, "ETHUSDT") + + +check_order() diff --git a/coinbus/check_zone_lyq.py b/coinbus/check_zone_lyq.py new file mode 100644 index 0000000..3843ec6 --- /dev/null +++ b/coinbus/check_zone_lyq.py @@ -0,0 +1,146 @@ +# coding=utf-8 +import ujson +#from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient +from binance.spot import Spot +import time +import requests +import datetime +import pymysql +import math +#import pymongo + +g_btcusdt_prices = {} +g_ethusdt_prices = {} + +class ZoneDbIf: + def __init__(self, host="172.17.0.1", port=4423, user="root", password="2GS@bPYcgiMyL14A", dbname="btcdb"): + self.conn = pymysql.connect(host=host, port=port, user=user, password=password, database=dbname, cursorclass=pymysql.cursors.DictCursor) + print("init zone db suceess!") + + def save_zone_change(self, dayutc, change_us, change_asia, change_eu): + with self.conn.cursor() as cursor: + print( + dayutc, change_us, change_asia, change_eu) + sql_insert = "REPLACE INTO btczonechange3 (unixdt, change_us, change_asia, change_eu" + sql_insert = sql_insert + ") VALUES (FROM_UNIXTIME(%s), %s, %s, %s)" + cursor.execute(sql_insert, ( + dayutc, change_us, change_asia, change_eu)) + self.conn.commit() + +class EthZoneDbIf: + def __init__(self, host="172.17.0.1", port=4423, user="root", password="2GS@bPYcgiMyL14A", dbname="ethdb"): + self.conn = pymysql.connect(host=host, port=port, user=user, password=password, database=dbname, cursorclass=pymysql.cursors.DictCursor) + print("init zone db suceess!") + + def save_zone_change(self, dayutc, change_us, change_asia, change_eu): + with self.conn.cursor() as cursor: + print( + dayutc, change_us, change_asia, change_eu) + sql_insert = "REPLACE INTO ethzonechange3 (unixdt, change_us, change_asia, change_eu" + sql_insert = sql_insert + ") VALUES (FROM_UNIXTIME(%s), %s, %s, %s)" + cursor.execute(sql_insert, ( + dayutc, change_us, change_asia, change_eu)) + self.conn.commit() + +def get_history_price(spot_client, pair_name): + result = spot_client.klines(pair_name, "1h", limit=1000) + prices_open = {} + prices_close = {} + for price in result: + prices_open[str(price[0])] = float(price[1]) + prices_close[str(price[0])] = float(price[4]) + open_out = sorted(prices_open.items(), reverse=True) + close_out = sorted(prices_close.items(), reverse=True) + return open_out, close_out, prices_open, prices_close + +def get_last_price(spot_client, pair_name, cache_open, cache_close): + result = spot_client.klines(pair_name, "1h", limit=1) + for price in result: + cache_open[str(price[0])] = float(price[1]) + cache_close[str(price[0])] = float(price[4]) + open_out = sorted(cache_open.items(), reverse=True) + close_out = sorted(cache_close.items(), reverse=True) + return open_out, close_out, cache_open, cache_close + +def calc_zone(prices_open, price_close, zone_start, zone_end): + zone_total = 30*24 + zone_hours = 0 + zones = {} + price_start = 0 + price_end = 0 + dt_start = None + item_idx = 0 + for dt in prices_open: + tobj = time.gmtime(int(dt[0]) / 1000) + if tobj.tm_hour == zone_start: + price_start = dt[1] + dt_start = tobj + if zone_hours == 0 and tobj.tm_hour < zone_end: + zone_total = zone_total + tobj.tm_hour + 1 + close_list = price_close[item_idx] + price_end = close_list[1] + else: + if tobj.tm_hour == zone_end: + close_list = price_close[item_idx] + price_end = close_list[1] + if price_start > 0 and price_end > 0: + #zones[dt_end] = (price_end-price_start)/price_start + daystr = time.strftime("%d %b %Y", dt_start) + dayutc = int(time.mktime(time.strptime(daystr, "%d %b %Y"))) + zones[str(dayutc)] = price_end - price_start + price_start = 0 + price_end = 0 + item_idx = item_idx + 1 + zone_hours = zone_hours + 1 + if zone_hours >= zone_total: + break + return zones + + + +def check_zone(): + dbif = ZoneDbIf() + ethdbif = EthZoneDbIf() + spot_client = Spot() + prices_open, prices_close, cache_open, cache_close = get_history_price(spot_client, "BTCUSDT") + prices_open_eth, prices_close_eth, cache_open_eth, cache_close_eth = get_history_price(spot_client, "ETHUSDT") + prev_tm = time.gmtime(time.time()) + print("update", prev_tm.tm_hour) + while True: + zone_asia = calc_zone(prices_open, prices_close, 0, 12) + zone_eu = calc_zone(prices_open, prices_close, 6, 18) + zone_us = calc_zone(prices_open, prices_close, 12, 0) + zone_asia_eth = calc_zone(prices_open_eth, prices_close_eth, 0, 12) + zone_eu_eth = calc_zone(prices_open_eth, prices_close_eth, 6, 18) + zone_us_eth = calc_zone(prices_open_eth, prices_close_eth, 12, 0) + #print(zone_asia) + #print(zone_eu) + #print(zone_us) + for dt in zone_asia: + change_us = 0 + change_eu = 0 + if dt in zone_us: + change_us = zone_us[dt] + if dt in zone_eu: + change_eu = zone_eu[dt] + dbif.save_zone_change(dt, change_us, zone_asia[dt], change_eu) + change_us_eth = 0 + change_eu_eth = 0 + if dt in zone_us_eth: + change_us_eth = zone_us_eth[dt] + if dt in zone_eu_eth: + change_eu_eth = zone_eu_eth[dt] + ethdbif.save_zone_change(dt, change_us_eth, zone_asia_eth[dt], change_eu_eth) + while True: + time.sleep(60) + cur_tm = time.gmtime(time.time()) + if cur_tm.tm_hour != prev_tm.tm_hour: + prev_tm = cur_tm + time.sleep(60) + prices_open, prices_close, cache_open, cache_close = get_last_price(spot_client, "BTCUSDT", cache_open, cache_close) + prices_open_eth, prices_close_eth, cache_open_eth, cache_close_eth = get_last_price(spot_client, "ETHUSDT", cache_open_eth, + cache_close_eth) + print("update", cur_tm.tm_hour) + break + +check_zone() diff --git a/coinbus/exchangeRate_lyq.py b/coinbus/exchangeRate_lyq.py new file mode 100644 index 0000000..3d0ab48 --- /dev/null +++ b/coinbus/exchangeRate_lyq.py @@ -0,0 +1,117 @@ +import requests +import pymysql +import time +from datetime import datetime + +# 目标币种列表(RUB 仍写在这里,方便统一逻辑) +symbols = ["EUR", "GBP", "JPY", "CAD", "SEK", "CHF", "CNY", "RUB"] + +# 数据库配置 +db_config = { + "host": "127.0.0.1", + "user": "root", + "password": "2GS@bPYcgiMyL14A", + "database": "Macroeconomics", + "port": 4423 +} + +def fetch_rates_frankfurter(): + base = "USD" + url = f"https://api.frankfurter.app/latest?from={base}&to=" + ",".join([s for s in symbols if s != "RUB"]) + retries = 5 + while retries > 0: + try: + response = requests.get(url, timeout=10) + response.raise_for_status() + data = response.json() + rates = data.get("rates", {}) + if not rates: + raise ValueError("接口返回空数据") + return rates + except Exception as e: + retries -= 1 + print(f"Frankfurter 请求失败,重试中... 剩余次数: {retries}, 错误: {e}") + time.sleep(1) + print("Frankfurter 多次重试后失败,返回空数据") + return {} + +def fetch_rub(): + try: + url = "https://open.er-api.com/v6/latest/USD" + response = requests.get(url, timeout=10) + data = response.json() + if data.get("result") == "success": + rub_rate = data["rates"].get("RUB") + if rub_rate: + return rub_rate + except Exception as e: + print(f"获取 RUB 失败: {e}") + return None + +def calc_dxy(rates): + weights = { + "EUR": 0.576, + "JPY": 0.136, + "GBP": 0.119, + "CAD": 0.091, + "SEK": 0.042, + "CHF": 0.036 + } + weighted_sum = 0 + weight_total = 0 + for ccy, w in weights.items(): + rate = rates.get(ccy) + if rate: + weighted_sum += rate * w + weight_total += w + if weight_total > 0: + return weighted_sum / weight_total + return None + +def save_to_db(rates, dxy): + current_time = datetime.now().replace(second=0, microsecond=0) + data = {} + + for ccy in symbols: + rate = rates.get(ccy) + if rate is not None: + data[f"USD{ccy}"] = round(rate, 5) + if dxy is not None: + data["DXY"] = round(dxy, 5) + + connection = pymysql.connect(**db_config) + try: + with connection.cursor() as cursor: + for symbol, value in data.items(): + query = """INSERT INTO exchangeRate (date, symbol, _value) VALUES (%s, %s, %s)""" + cursor.execute(query, (current_time, symbol, value)) + connection.commit() + print(f"{current_time} 数据写入数据库成功") + except Exception as e: + print(f"写入数据库失败: {e}") + finally: + connection.close() + +if __name__ == "__main__": + while True: + rates = fetch_rates_frankfurter() + + # 获取 RUB 汇率 + rub = fetch_rub() + if rub: + rates["RUB"] = rub + else: + print("未获取到 RUB 汇率") + + if rates: + dxy = calc_dxy(rates) + print(f"汇率数据: {rates}") + if dxy: + print(f"美元指数近似值: {dxy:.5f}") + else: + print("美元指数近似值 无法计算") + save_to_db(rates, dxy) + else: + print("未获取到汇率数据") + + time.sleep(1800) # 每30分钟执行一次 \ No newline at end of file