import requests import pymysql import time from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime, timedelta # API 密钥和请求头 API_KEY = "83bf85c1-1bd8-426a-a043-6b67dad8bda5" headers = { "X-CMC_PRO_API_KEY": API_KEY } base_url = "https://pro-api.coinmarketcap.com" url = f"{base_url}/v1/cryptocurrency/listings/latest" # MySQL 数据库连接配置 db_config = { 'host': '127.0.0.1', # 数据库主机地址 'user': 'root', # 数据库用户名 'password': '2GS@bPYcgiMyL14A', # 数据库密码 'database': 'coinmarketcap', # 数据库名称 'port': 4423 # 数据库端口 } # 创建数据库表格(如果不存在) def create_table(): connection = pymysql.connect(**db_config) # 连接到数据库 cursor = connection.cursor() # 创建游标对象 # 创建表格的 SQL 语句 create_table_query = """ CREATE TABLE IF NOT EXISTS marketInfo ( id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, # 自增ID update_time DATETIME NOT NULL, # 更新时间 symbol CHAR(15) NOT NULL, # 币种符号 ranks INT NOT NULL, # 排名 price DOUBLE NOT NULL, # 当前价格 market_cap DOUBLE NOT NULL, # 市值 volume_24h DOUBLE NOT NULL, # 24小时交易量 volume_change_24h DOUBLE NOT NULL, # 24小时交易量变化 percent_change_1h DOUBLE NOT NULL, # 1小时价格变化 percent_change_24h DOUBLE NOT NULL, # 24小时价格变化 percent_change_7d DOUBLE NOT NULL, # 7天价格变化 percent_change_30d DOUBLE NOT NULL, # 30天价格变化 percent_change_60d DOUBLE NOT NULL, # 60天价格变化 percent_change_90d DOUBLE NOT NULL # 90天价格变化 ); """ cursor.execute(create_table_query) # 执行创建表格的 SQL 语句 connection.commit() # 提交事务 cursor.close() # 关闭游标 connection.close() # 关闭数据库连接 # 将 UTC 时间转换为北京时间 def bj_time(utc_time): """ 将 UTC 时间转换为北京时间 """ utc_time = datetime.strptime(utc_time, '%Y-%m-%dT%H:%M:%S.%fZ') # 将 UTC 时间字符串转换为 datetime 对象 beijing_time = utc_time + timedelta(hours=8) # 北京时间比 UTC 时间快 8 小时 return beijing_time.strftime('%Y-%m-%d %H:%M:%S') # 格式化成字符串 # 获取市场数据并插入到数据库 def marketcap(): try: # 向 CoinMarketCap API 发送请求,获取加密货币的市场数据 response = requests.get(url, headers=headers, params={"limit": 200}) response.raise_for_status() # 如果请求失败,抛出异常 except requests.RequestException: time.sleep(60) # 等待 1 分钟后重试 response = requests.get(url, headers=headers, params={"limit": 200}) data = response.json() # 将返回的 JSON 数据转换为 Python 字典 for item in data['data']: # 遍历获取的数据 quote = item['quote']['USD'] # 获取 USD 相关的市场数据 update_time = bj_time(quote['last_updated']) # 转换更新时间为北京时间 symbol = item['symbol'] # 获取币种符号 ranks = item['cmc_rank'] # 获取排名 price = quote['price'] # 获取价格 market_cap = quote['market_cap'] # 获取市值 volume_24h = quote['volume_24h'] # 获取 24 小时交易量 volume_change_24h = quote['volume_change_24h'] # 获取 24 小时交易量变化 percent_change_1h = quote['percent_change_1h'] # 获取 1 小时价格变化 percent_change_24h = quote['percent_change_24h'] # 获取 24 小时价格变化 percent_change_7d = quote['percent_change_7d'] # 获取 7 天价格变化 percent_change_30d = quote['percent_change_30d'] # 获取 30 天价格变化 percent_change_60d = quote['percent_change_60d'] # 获取 60 天价格变化 percent_change_90d = quote['percent_change_90d'] # 获取 90 天价格变化 # 将数据插入到 MySQL 数据库 connection = pymysql.connect(**db_config) # 连接到数据库 cursor = connection.cursor() # 创建游标对象 insert_query = """ INSERT INTO marketInfo ( update_time, symbol, ranks, price, market_cap, volume_24h, volume_change_24h, percent_change_1h, percent_change_24h, percent_change_7d, percent_change_30d, percent_change_60d, percent_change_90d ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ # 执行插入数据的 SQL 语句 cursor.execute(insert_query, ( update_time, symbol, ranks, price, market_cap, volume_24h, volume_change_24h, percent_change_1h, percent_change_24h, percent_change_7d, percent_change_30d, percent_change_60d, percent_change_90d )) connection.commit() # 提交事务 cursor.close() # 关闭游标 connection.close() # 关闭数据库连接 # 定时任务:每 5 分钟执行一次 marketcap 函数 def schedule_job(): scheduler = BlockingScheduler() # 创建一个阻塞式调度器 scheduler.add_job(marketcap, 'cron', minute='0,5,10,15,20,25,30,35,40,45,50,55') # 设置每 5 分钟执行一次 scheduler.start() # 启动调度器 if __name__ == "__main__": create_table() # 程序启动时,先创建数据库表格(如果不存在) schedule_job() # 启动定时任务,开始定时抓取数据并插入数据库