coinbus-data/coinbus/CoinmarketCap.py

115 lines
5.4 KiB
Python

import requests
import pymysql
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime, timedelta
# API 密钥和请求头
API_KEY = "83bf85c1-1bd8-426a-a043-6b67dad8bda5"
headers = { "X-CMC_PRO_API_KEY": API_KEY }
base_url = "https://pro-api.coinmarketcap.com"
url = f"{base_url}/v1/cryptocurrency/listings/latest"
# MySQL 数据库连接配置
db_config = {
'host': '127.0.0.1', # 数据库主机地址
'user': 'root', # 数据库用户名
'password': '2GS@bPYcgiMyL14A', # 数据库密码
'database': 'coinmarketcap', # 数据库名称
'port': 4423 # 数据库端口
}
# 创建数据库表格(如果不存在)
def create_table():
connection = pymysql.connect(**db_config) # 连接到数据库
cursor = connection.cursor() # 创建游标对象
# 创建表格的 SQL 语句
create_table_query = """
CREATE TABLE IF NOT EXISTS marketInfo (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, # 自增ID
update_time DATETIME NOT NULL, # 更新时间
symbol CHAR(15) NOT NULL, # 币种符号
ranks INT NOT NULL, # 排名
price DOUBLE NOT NULL, # 当前价格
market_cap DOUBLE NOT NULL, # 市值
volume_24h DOUBLE NOT NULL, # 24小时交易量
volume_change_24h DOUBLE NOT NULL, # 24小时交易量变化
percent_change_1h DOUBLE NOT NULL, # 1小时价格变化
percent_change_24h DOUBLE NOT NULL, # 24小时价格变化
percent_change_7d DOUBLE NOT NULL, # 7天价格变化
percent_change_30d DOUBLE NOT NULL, # 30天价格变化
percent_change_60d DOUBLE NOT NULL, # 60天价格变化
percent_change_90d DOUBLE NOT NULL # 90天价格变化
);
"""
cursor.execute(create_table_query) # 执行创建表格的 SQL 语句
connection.commit() # 提交事务
cursor.close() # 关闭游标
connection.close() # 关闭数据库连接
# 将 UTC 时间转换为北京时间
def bj_time(utc_time):
""" 将 UTC 时间转换为北京时间 """
utc_time = datetime.strptime(utc_time, '%Y-%m-%dT%H:%M:%S.%fZ') # 将 UTC 时间字符串转换为 datetime 对象
beijing_time = utc_time + timedelta(hours=8) # 北京时间比 UTC 时间快 8 小时
return beijing_time.strftime('%Y-%m-%d %H:%M:%S') # 格式化成字符串
# 获取市场数据并插入到数据库
def marketcap():
try:
# 向 CoinMarketCap API 发送请求,获取加密货币的市场数据
response = requests.get(url, headers=headers, params={"limit": 200})
response.raise_for_status() # 如果请求失败,抛出异常
except requests.RequestException:
time.sleep(60) # 等待 1 分钟后重试
response = requests.get(url, headers=headers, params={"limit": 200})
data = response.json() # 将返回的 JSON 数据转换为 Python 字典
for item in data['data']: # 遍历获取的数据
quote = item['quote']['USD'] # 获取 USD 相关的市场数据
update_time = bj_time(quote['last_updated']) # 转换更新时间为北京时间
symbol = item['symbol'] # 获取币种符号
ranks = item['cmc_rank'] # 获取排名
price = quote['price'] # 获取价格
market_cap = quote['market_cap'] # 获取市值
volume_24h = quote['volume_24h'] # 获取 24 小时交易量
volume_change_24h = quote['volume_change_24h'] # 获取 24 小时交易量变化
percent_change_1h = quote['percent_change_1h'] # 获取 1 小时价格变化
percent_change_24h = quote['percent_change_24h'] # 获取 24 小时价格变化
percent_change_7d = quote['percent_change_7d'] # 获取 7 天价格变化
percent_change_30d = quote['percent_change_30d'] # 获取 30 天价格变化
percent_change_60d = quote['percent_change_60d'] # 获取 60 天价格变化
percent_change_90d = quote['percent_change_90d'] # 获取 90 天价格变化
# 将数据插入到 MySQL 数据库
connection = pymysql.connect(**db_config) # 连接到数据库
cursor = connection.cursor() # 创建游标对象
insert_query = """
INSERT INTO marketInfo (
update_time, symbol, ranks, price, market_cap, volume_24h,
volume_change_24h, percent_change_1h, percent_change_24h,
percent_change_7d, percent_change_30d, percent_change_60d,
percent_change_90d
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""
# 执行插入数据的 SQL 语句
cursor.execute(insert_query, (
update_time, symbol, ranks, price, market_cap, volume_24h,
volume_change_24h, percent_change_1h, percent_change_24h,
percent_change_7d, percent_change_30d, percent_change_60d,
percent_change_90d
))
connection.commit() # 提交事务
cursor.close() # 关闭游标
connection.close() # 关闭数据库连接
# 定时任务:每 5 分钟执行一次 marketcap 函数
def schedule_job():
scheduler = BlockingScheduler() # 创建一个阻塞式调度器
scheduler.add_job(marketcap, 'cron', minute='0,5,10,15,20,25,30,35,40,45,50,55') # 设置每 5 分钟执行一次
scheduler.start() # 启动调度器
if __name__ == "__main__":
create_table() # 程序启动时,先创建数据库表格(如果不存在)
schedule_job() # 启动定时任务,开始定时抓取数据并插入数据库