Files
coinbus-data/coinbus/nochain_update_lyq.py

87 lines
5.4 KiB
Python
Raw Permalink Normal View History

2025-10-11 18:06:48 +08:00
# coding=utf-8
import ujson
from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient
import time
import requests
import datetime
import pymysql
import math
from stock_indicators import indicators
from stock_indicators.indicators.common.quote import Quote
class NochainDbIf:
def __init__(self, host="172.17.0.1", port=4423, user="root", password="2GS@bPYcgiMyL14A", dbname="btcdb"):
self.conn = pymysql.connect(host=host, port=port, user=user, password=password, database=dbname, cursorclass=pymysql.cursors.DictCursor)
print("init nochain db suceess!")
# 可能用于与数据库交互。该方法是构造函数它使用库设置与MySQL数据库的连接。
# 以下是该方法的作用的细分__init__
# 它需要几个参数这些参数用于连接到MySQL数据库。如果未提供这些参数则使用默认值。
# 在该方法中它使用提供的参数建立与MySQL数据库的连接。
# 该参数指定结果应作为字典而不是元组返回这样可以更方便地访问数据。cursorclass = pymysql.cursors.DictCursor
# 最后,它打印一条消息,指示数据库连接的初始化成功。
def get_ssr(self):
with self.conn.cursor() as cursor:
sql_cmd = "SELECT unixdt,ssr FROM nochainv3b order by unixdt"
cursor.execute(sql_cmd)
self.conn.commit()
return cursor.fetchall()
# 该方法似乎旨在从名为nochainv2b的MySQL数据库表中检索数据。下面是此方法的细分get_ssr
# 它使用上下文管理器 自动处理游标并确保在执行查询后正确关闭连接。with
# 在上下文管理器中它准备一个SQL命令来从nochainv2b表中选择数据。该命令选择unixdt和ssr列并按unixdt对结果进行排序。
# 它使用光标的方法执行SQL命令。execute
# 执行命令后,它使用.如果对数据库进行了任何需要永久保存的更改则此步骤是必需的。self.conn.commit()
# 最后,它返回查询的结果 它以字典列表的形式检索查询返回的所有行其中每个字典表示一行其中列名作为键和相应的值。cursor.fetchall()
def update_ssr(self, dt, ssrosc):
with self.conn.cursor() as cursor:
sql_update = "UPDATE nochainv3b SET unixdt='%s', ssrosc='%s' WHERE unixdt='%s'" %(dt, ssrosc, dt)
print(sql_update)
cursor.execute(sql_update)
self.conn.commit()
# 该方法将更新MySQL数据库nochainv2b表中的数据。下面是此方法的细分update_ssr
# 它需要两个参数dt可能表示时间戳和ssrosc可能表示与SSR相关的某个值
# 在该方法中它准备一个SQLUPDATE命令来更新nochainv2b表中特定列的列。
# 该字符串的格式为dt和ssrosc的值以动态构造SQL命令。这是潜在的风险因为它开启了SQL注入攻击的可能性。改用参数化查询更安全。
# 然后该方法打印出构造的SQL命令以进行调试。
# 它使用光标的方法执行SQL命令。execute
# 执行命令后它会提交事务使更改永久化。self.conn.commit()
# 但是SQL命令中存在一个潜在问题在设置新值和WHERE子句的条件时使用相同的值。这可能不按预期运行因为它将更新与给定.
# 如果打算更新特定行则可能需要相应地调整WHERE子句。
def rollback(self):
self.conn.rollback()
# 它对当前数据库连接执行回滚操作。以下是它的作用rollback
# 它在数据库连接 上调用该方法。rollback self.conn
# 此操作将当前事务期间所做的任何未提交的更改还原到以前的状态。
# 当事务期间出现错误时,通常使用回滚,允许您还原在错误发生之前所做的任何更改。
def nochain():
try:
dbif = NochainDbIf()
ssr_ma = dbif.get_ssr()
#print(ssr_ma)
quotes_list = []
for item in ssr_ma:
print(item["unixdt"])
quotes_list.append(Quote(item["unixdt"],0,0,0,item["ssr"],0))
#print(quotes_list)
ssr_osc = indicators.get_bollinger_bands(quotes_list, 200, 2)
for item in ssr_osc:
if item.z_score is not None:
#print(item.date, item.sma, item.percent_b, item.z_score, item.width)
#dbif.update_ssr(item.date, item.z_score)
break
print("ok")
except Exception as e:
#dbif.rollback();
print(e)
finally:
print("end")
nochain()
# 可能使用该nochain类计算从 MySQL 数据库获取的 SSR大概是卖空比率数据上的布林带。NochainDbIf
# 以下是该函数功能的细分:
# 它首先创建类的实例以建立与数据库的连接。NochainDbIf
# 它使用实例的方法从数据库中检索 SSR 数据。get_ssr NochainDbIf
# 它使用检索到的 SSR 数据构造对象列表。Quote
# 它使用模块中的函数计算布林带。get_bollinger_bandsindicators
# 它遍历计算出的布林带,并打印有关每个带的一些信息。
# 目前,它似乎已注释掉了使用计算出的布林带更新数据库中 SSR 数据的部分。
# 如果在此过程中发生任何异常,它将捕获异常,打印它,然后继续。
# 最后,无论该过程是成功还是遇到错误,它都会打印“结束”