This commit is contained in:
yyb
2025-05-12 17:00:06 +08:00
parent 9c5b5eba7c
commit 9c93dc1e10
855 changed files with 163903 additions and 0 deletions

View File

@@ -0,0 +1,139 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>jxy-modules</artifactId>
<groupId>com.jxy</groupId>
<version>3.5.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jxy-binance</artifactId>
<description>
jxy-binance 币安ws获取数据模块
</description>
<dependencies>
<!-- SpringCloud Alibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos Config -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- SpringCloud Alibaba Sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<!-- SpringBoot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Swagger UI -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.fox.version}</version>
</dependency>
<!-- Mysql Connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- WebSocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.5</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok-maven-plugin</artifactId>
<scope>provided</scope>
</dependency>
<!-- JXY Common DataSource -->
<dependency>
<groupId>com.jxy</groupId>
<artifactId>common-datasource</artifactId>
</dependency>
<!-- JXY Common DataScope -->
<dependency>
<groupId>com.jxy</groupId>
<artifactId>common-datascope</artifactId>
</dependency>
<!-- Mybatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<!-- JXY Common Log -->
<dependency>
<groupId>com.jxy</groupId>
<artifactId>common-log</artifactId>
</dependency>
<!-- JXY Common swagger -->
<dependency>
<groupId>com.jxy</groupId>
<artifactId>common-swagger</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.7</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.2</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,30 @@
package com.jxy.binance;
import com.jxy.common.security.annotation.EnableCustomConfig;
import com.jxy.common.security.annotation.EnableJXYFeignClients;
import com.jxy.common.swagger.annotation.EnableCustomSwagger2;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Description TODO
* @Date 2023/2/5 1:31
* @Author 杜懿
*/
@EnableCustomConfig
@EnableCustomSwagger2
@EnableJXYFeignClients
@SpringBootApplication
@MapperScan("com.jxy.binance.mapper")
public class JXYBinanceApplication {
public static void main(String[] args) {
try {
SpringApplication.run(JXYBinanceApplication.class);
System.out.println("jxy-binance启动成功");
}catch (Exception e){
System.out.println(e);
}
}
}

View File

@@ -0,0 +1,20 @@
package com.jxy.binance.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @Description websocket 配置
* @Date 2022/6/30 11:09
* @Author 杜懿
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter()
{
return new ServerEndpointExporter();
}
}

View File

@@ -0,0 +1,67 @@
package com.jxy.binance.controller;
/**
* @Description TODO
* @Date 2022/6/27 10:28
* @Author 杜懿
*/
import com.jxy.binance.dto.InsertDBDto;
import com.jxy.binance.dto.InsertTimeDataToDBDto;
import com.jxy.binance.dto.TestVo;
import com.jxy.binance.service.impl.BiAnServiceImpl;
import com.jxy.common.core.web.Result.AjaxResult;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* @Description btc查询
* @Date 2022/6/20 17:26
* @Author 杜懿
*/
@RestController
@RequestMapping("/bian")
@Api(tags = "bian查询接口")
public class TestController {
@Autowired
private BiAnServiceImpl biAnService;
@PostMapping("/B/allToDB")
@ApiOperation(value = "币安 币本位 补全数据",response = List.class)
public AjaxResult allBToDB(@RequestBody InsertDBDto dto){
return biAnService.allBDataToDb(dto);
}
@PostMapping("/U/allToDB")
@ApiOperation(value = "币安 U本位 补全数据",response = List.class)
public AjaxResult allUToDB(@RequestBody InsertDBDto dto){
return biAnService.allUDataToDb(dto);
}
@PostMapping("/S/allToDB")
@ApiOperation(value = "币安 现货 补全数据",response = List.class)
public AjaxResult allStockToDB(@RequestBody InsertDBDto dto){
return biAnService.allStockDataToDb(dto);
}
@PostMapping("/totalClosePrice")
@ApiOperation(value = "币安 条件查询",response = List.class)
public AjaxResult totalClosePrice(@RequestBody TestVo vo){
return biAnService.getTotalClosePrice(vo.getNum());
}
}

View File

@@ -0,0 +1,45 @@
package com.jxy.binance.dto;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
/**
* @Description 返回数据实体
* @Date 2022/5/30 11:17
* @Author 杜懿
*/
@Data
public class BiAnWSDto implements Serializable {
/** 时间 */
private Date date;
private Date openTime;
private Date closeTime;
private String symbol;
private BigDecimal closePrice;
private BigDecimal openPrice;
private BigDecimal highPrice;
private BigDecimal lowPrice;
private BigDecimal volume;
private BigDecimal quality;
private BigDecimal buyVolume;
private BigDecimal buyQuality;
private BigDecimal num;
private BigDecimal changeRate;
}

View File

@@ -0,0 +1,24 @@
package com.jxy.binance.dto;
import lombok.Data;
import java.util.Date;
/**
* @Description TODO
* @Date 2023/2/8 12:49
* @Author 杜懿
*/
@Data
public class InsertDBDto {
public String interval;
public String symbol;
public String table;
public String start;
public String end;
}

View File

@@ -0,0 +1,22 @@
package com.jxy.binance.dto;
import lombok.Data;
/**
* @Description TODO
* @Date 2023/2/8 12:49
* @Author 杜懿
*/
@Data
public class InsertTimeDataToDBDto {
public String interval;
public String symbol;
public String table;
public String start;
public String end;
}

View File

@@ -0,0 +1,14 @@
package com.jxy.binance.dto;
import lombok.Data;
/**
* @Description TODO
* @Date 2023/2/8 15:56
* @Author 杜懿
*/
@Data
public class TestVo {
public int num;
}

View File

@@ -0,0 +1,35 @@
package com.jxy.binance.mapper;
import com.jxy.binance.dto.BiAnWSDto;
import com.jxy.common.datasource.annotation.Master;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @Description TODO
* @Date 2022/6/20 17:36
* @Author 杜懿
*/
@Master
public interface BiAnMapper2 {
public void inserthBtcusdtPERP(@Param("dto") BiAnWSDto biAnWSDto);
public void insertUSDPERP(@Param("dto") BiAnWSDto biAnWSDto, @Param("table") String table);
public void insertUSDT(@Param("dto") BiAnWSDto biAnWSDto,@Param("table") String table);
public void insertStock(@Param("dto") BiAnWSDto biAnWSDto,@Param("table") String table);
public void batchUSDPERP(@Param("list") List<BiAnWSDto> list, @Param("table") String table);
public void batchUSDT(@Param("list") List<BiAnWSDto> list,@Param("table") String table);
public void batchStock(@Param("list") List<BiAnWSDto> list,@Param("table") String table);
public List<BiAnWSDto> getTotalBTCClosePrice();
public List<BiAnWSDto> getTotalETHClosePrice();
}

View File

@@ -0,0 +1,23 @@
package com.jxy.binance.service;
import com.jxy.binance.dto.InsertDBDto;
import com.jxy.binance.dto.InsertTimeDataToDBDto;
import com.jxy.common.core.web.Result.AjaxResult;
/**
* @Description 币安服务类
* @Date 2022/6/20 17:40
* @Author 杜懿
*/
public interface BiAnService{
public AjaxResult allBDataToDb(InsertDBDto dto);
public AjaxResult allUDataToDb(InsertDBDto dto);
public AjaxResult allStockDataToDb(InsertDBDto dto);
public AjaxResult getTotalClosePrice(int num);
}

View File

@@ -0,0 +1,144 @@
package com.jxy.binance.service.impl;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.shaded.org.checkerframework.checker.units.qual.A;
import com.jxy.binance.dto.BiAnWSDto;
import com.jxy.binance.dto.InsertDBDto;
import com.jxy.binance.dto.InsertTimeDataToDBDto;
import com.jxy.binance.mapper.BiAnMapper2;
import com.jxy.binance.service.BiAnService;
import com.jxy.binance.thread.BstreamHistoryThread;
import com.jxy.binance.thread.SstreamHistoryThread;
import com.jxy.binance.thread.UstreamHistoryThread;
import com.jxy.common.core.text.Convert;
import com.jxy.common.core.utils.DateUtils;
import com.jxy.common.core.utils.StringUtils;
import com.jxy.common.core.web.Result.AjaxResult;
import org.apache.ibatis.annotations.Param;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
/**
* @Description BiAn查询业务处理
* @Date 2022/6/2017:51
* @Author 杜懿
*/
@Service("biAnService")
public class BiAnServiceImpl implements BiAnService {
@Autowired
private BiAnMapper2 biAnMapper2;
public void insertUSDPerp(BiAnWSDto dto, String table){
biAnMapper2.insertUSDPERP(dto,table);
}
public void insertUSDT(BiAnWSDto dto, String table){
biAnMapper2.insertUSDT(dto,table);
}
public void insertStock(BiAnWSDto dto, String table){
biAnMapper2.insertStock(dto,table);
}
public void batchUSDPerp(List<BiAnWSDto> list, String table){
biAnMapper2.batchUSDPERP(list,table);
}
public void batchUSDT( List<BiAnWSDto> list, String table){
biAnMapper2.batchUSDT(list,table);
}
public void batchStock(List<BiAnWSDto> list, String table){
biAnMapper2.batchStock(list,table);
}
@Override
public AjaxResult
allBDataToDb(InsertDBDto dto) {
String symbol = dto.getSymbol().toLowerCase();
long endTime = DateUtils.parseDate(dto.getEnd()).getTime() - 1 ;
long start = DateUtils.parseDate(dto.getStart()).getTime() - 1 ;
Thread thread= new Thread(new BstreamHistoryThread(symbol,start,endTime,dto.getTable(),dto.getInterval()),symbol+"历史数据获取线程");
thread.start();
return AjaxResult.success();
}
@Override
public AjaxResult allUDataToDb(InsertDBDto dto) {
String symbol = dto.getSymbol().toLowerCase();
long endTime = DateUtils.parseDate(dto.getEnd()).getTime() - 1 ;
long start = DateUtils.parseDate(dto.getStart()).getTime() - 1 ;
Thread thread= new Thread(new UstreamHistoryThread(symbol,start,endTime,dto.getTable(),dto.getInterval()),symbol+"历史数据获取线程");
thread.start();
return AjaxResult.success();
}
@Override
public AjaxResult allStockDataToDb(InsertDBDto dto) {
String symbol = dto.getSymbol().toLowerCase();
long endTime = DateUtils.parseDate(dto.getEnd()).getTime() - 1 ;
long start = DateUtils.parseDate(dto.getStart()).getTime() - 1 ;
Thread thread= new Thread(new SstreamHistoryThread(symbol,start,endTime,dto.getTable(),dto.getInterval()),symbol+"历史数据获取线程");
thread.start();
return AjaxResult.success();
}
@Override
public AjaxResult getTotalClosePrice(int num) {
List<BiAnWSDto> list1 = biAnMapper2.getTotalBTCClosePrice();
List<BiAnWSDto> list2 = biAnMapper2.getTotalETHClosePrice();
BigDecimal total1 = BigDecimal.ZERO;
BigDecimal total2 = BigDecimal.ZERO;
for(int i=0 ;i < num;i++){
total1 = total1.add(list1.get(i).getClosePrice());
}
for(int i=0 ;i < num;i++){
total2 = total2.add(list2.get(i).getClosePrice());
}
String msg = "BTC:"+total1+"\nETH:"+total2;
return AjaxResult.success(msg,list1);
}
}

View File

@@ -0,0 +1,310 @@
package com.jxy.binance.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jxy.binance.dto.BiAnWSDto;
import com.jxy.binance.service.impl.BiAnServiceImpl;
import com.jxy.common.core.utils.SpringUtils;
import com.jxy.common.core.utils.StringUtils;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Description B本位WS
* @Date 2023/1/31 17:23
* @Author 杜懿
*/
/**
* baseurl为: wss://dstream.binance.com
* 订阅单一stream格式为 /ws/<streamName>
* 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
* 订阅组合streams时,事件payload会以这样的格式封装 {"stream":"<streamName>","data":<rawPayload>}
* stream名称中所有交易对,标的交易对,合约类型均为小写
* 每个链接有效期不超过24小时请妥善处理断线重连。
* 服务端每5分钟会发送ping帧客户端应当在15分钟内回复pong帧否则服务端会主动断开链接。允许客户端发送不成对的pong帧(即客户端可以以高于15分钟每次的频率发送pong帧保持链接)。
* Websocket服务器每秒最多接受10个订阅消息。
* 如果用户发送的消息超过限制连接会被断开连接。反复被断开连接的IP有可能被服务器屏蔽。
* 单个连接最多可以订阅 200 个Streams。
*/
/** 1m 3m 5m 15m 30m 1h 2h 4h 6h 8h 12h 1d 3d 1w 1M */
/**
* Stream Name:
* <symbol>@kline_<interval>
*/
@Component
@Order(value = 1)
public class BstreamTask implements ApplicationRunner {
private static BiAnServiceImpl biAnService;
/** 入库数据 */
private static Map<String,List<BiAnWSDto>> dbMap = new HashMap<>();
/** 推送数据 */
private static Map<String,BiAnWSDto> map = new HashMap<>();
private static Map<String,String> tableNameMap = new HashMap<>();
static {
tableNameMap.put("\"btcusdperp1m\"","BTCUSDPERP");
tableNameMap.put("\"btcusdperp3m\"","3mBTCUSDPERP");
tableNameMap.put("\"btcusdperp5m\"","5mBTCUSDPERP");
tableNameMap.put("\"btcusdperp15m\"","15mBTCUSDPERP");
tableNameMap.put("\"btcusdperp30m\"","30mBTCUSDPERP");
tableNameMap.put("\"btcusdperp1h\"","1hBTCUSDPERP");
tableNameMap.put("\"btcusdperp2h\"","2hBTCUSDPERP");
tableNameMap.put("\"btcusdperp4h\"","4hBTCUSDPERP");
tableNameMap.put("\"btcusdperp6h\"","6hBTCUSDPERP");
tableNameMap.put("\"btcusdperp8h\"","8hBTCUSDPERP");
tableNameMap.put("\"btcusdperp12h\"","12hBTCUSDPERP");
tableNameMap.put("\"btcusdperp1d\"","1dBTCUSDPERP");
tableNameMap.put("\"btcusdperp3d\"","3dBTCUSDPERP");
tableNameMap.put("\"btcusdperp1w\"","1wBTCUSDPERP");
tableNameMap.put("\"btcusdperp1M\"","1MBTCUSDPERP");
tableNameMap.put("\"ethusdperp1m\"","ETHUSDPERP");
tableNameMap.put("\"ethusdperp3m\"","3mETHUSDPERP");
tableNameMap.put("\"ethusdperp5m\"","5mETHUSDPERP");
tableNameMap.put("\"ethusdperp15m\"","15mETHUSDPERP");
tableNameMap.put("\"ethusdperp30m\"","30mETHUSDPERP");
tableNameMap.put("\"ethusdperp1h\"","1hETHUSDPERP");
tableNameMap.put("\"ethusdperp2h\"","2hETHUSDPERP");
tableNameMap.put("\"ethusdperp4h\"","4hETHUSDPERP");
tableNameMap.put("\"ethusdperp6h\"","6hETHUSDPERP");
tableNameMap.put("\"ethusdperp8h\"","8hETHUSDPERP");
tableNameMap.put("\"ethusdperp12h\"","12hETHUSDPERP");
tableNameMap.put("\"ethusdperp1d\"","1dETHUSDPERP");
tableNameMap.put("\"ethusdperp3d\"","3dETHUSDPERP");
tableNameMap.put("\"ethusdperp1w\"","1wETHUSDPERP");
tableNameMap.put("\"ethusdperp1M\"","1METHUSDPERP");
}
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("币本位连接");
String url = "wss://dstream.binance.com/stream?streams=btcusd_perp@kline_1m/btcusd_perp@kline_3m/btcusd_perp@kline_5m/btcusd_perp@kline_15m/btcusd_perp@kline_30m/" +
"btcusd_perp@kline_1h/btcusd_perp@kline_2h/btcusd_perp@kline_4h/btcusd_perp@kline_6h/btcusd_perp@kline_8h/btcusd_perp@kline_12h/" +
"btcusd_perp@kline_1d/btcusd_perp@kline_3d/btcusd_perp@kline_1w/btcusd_perp@kline_1M/" +
"ethusd_perp@kline_1m/ethusd_perp@kline_3m/ethusd_perp@kline_5m/ethusd_perp@kline_15m/ethusd_perp@kline_30m/" +
"ethusd_perp@kline_1h/ethusd_perp@kline_2h/ethusd_perp@kline_4h/ethusd_perp@kline_6h/ethusd_perp@kline_8h/ethusd_perp@kline_12h/" +
"ethusd_perp@kline_1d/ethusd_perp@kline_3d/ethusd_perp@kline_1w/ethusd_perp@kline_1M";
ScheduledExecutorService ses1 = Executors.newSingleThreadScheduledExecutor();
ses1.scheduleAtFixedRate(()->{
WebSocketClient ws = getWS(url);
ScheduledExecutorService ses2 = Executors.newSingleThreadScheduledExecutor();
ses2.scheduleAtFixedRate(()->{
ws.sendPing();
if(!ws.isConnecting()){
//如果连接断开重连
ws.connect();
}},0,3, TimeUnit.MINUTES);
ScheduledExecutorService ses3 = Executors.newSingleThreadScheduledExecutor();
ses3.schedule(()->{ws.close();},18, TimeUnit.HOURS);
},0,18,TimeUnit.HOURS);
//WebSocketClient ws = getWS(url);
//String url = "wss://dstream.binance.com/ws/btcusd_perp@kline_1h";
}
public static WebSocketClient getWS(String url){
try{
WebSocketClient wbc = new WebSocketClient(new URI(url)) {
@Override
public void onOpen(ServerHandshake handshakedata) {
}
@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
conn.sendFrame(f);
}
@Override
public void onMessage(String message) {
JSONObject jsonObject = JSON.parseObject(message);
String stream = JSONObject.toJSONString(jsonObject.get("stream"));
String key = stream.replace("_", "").replace("@kline", "");
String dataStr = JSONObject.toJSONString(jsonObject.get("data"));
String dateStr = JSONObject.toJSONString(JSON.parseObject(dataStr).get("E"));
Date date = JSON.parseObject(dateStr, Date.class);
String kStr = JSONObject.toJSONString(JSON.parseObject(dataStr).get("k"));
String openTimeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("t"));
Date openTime = JSON.parseObject(openTimeStr, Date.class);
Calendar openCa = Calendar.getInstance();
openCa.setTime(openTime);
openCa.add(Calendar.HOUR, +8);
openTime = openCa.getTime();
String closeTimeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("T"));
Date closeTime = JSON.parseObject(closeTimeStr, Date.class);
Calendar closeCa = Calendar.getInstance();
closeCa.setTime(closeTime);
closeCa.add(Calendar.HOUR, +8);
closeTime = closeCa.getTime();
String openPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("o"));
BigDecimal openPrice = JSON.parseObject(openPriceStr, BigDecimal.class);
String highPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("h"));
BigDecimal highPrice = JSON.parseObject(highPriceStr, BigDecimal.class);
String lowPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("l"));
BigDecimal lowPrice = JSON.parseObject(lowPriceStr, BigDecimal.class);
String closePriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("c"));
BigDecimal closePrice = JSON.parseObject(closePriceStr, BigDecimal.class);
String volumeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("v"));
BigDecimal volume = JSON.parseObject(volumeStr, BigDecimal.class);
String qualityStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("q"));
BigDecimal quality = JSON.parseObject(qualityStr, BigDecimal.class);
String buyVolumeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("V"));
BigDecimal buyVolume = JSON.parseObject(buyVolumeStr, BigDecimal.class);
String buyQualityStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("Q"));
BigDecimal buyQuality = JSON.parseObject(buyQualityStr, BigDecimal.class);
String numStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("n"));
BigDecimal num = JSON.parseObject(numStr, BigDecimal.class);
String symbolStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("s"));
String symbol = JSON.parseObject(symbolStr, String.class);
//赋值
BiAnWSDto dto = new BiAnWSDto();
dto.setDate(date);
dto.setOpenTime(openTime);
dto.setCloseTime(closeTime);
dto.setOpenPrice(openPrice);
dto.setHighPrice(highPrice);
dto.setLowPrice(lowPrice);
dto.setClosePrice(closePrice);
dto.setVolume(volume);
dto.setBuyVolume(buyVolume);
dto.setQuality(quality);
dto.setBuyQuality(buyQuality);
dto.setNum(num);
dto.setSymbol(symbol);
//取值对比
if(StringUtils.isNotNull(map.get(key))){
//stream不为空 说明map中已有key对应的数据
BiAnWSDto oldData = map.get(key);
if((oldData.getOpenTime().compareTo(dto.getOpenTime())) != 0){
//map中的数据和新传过来的数据开盘时间不一致 则把新数据放入dbMap中
if(StringUtils.isNull(dbMap.get(key))){
//dbMap中没数据
List<BiAnWSDto> list = new ArrayList<>();
list.add(oldData);
dbMap.put(key,list);
}else{
List<BiAnWSDto> list = dbMap.get(key);
list.add(oldData);
dbMap.put(key,list);
}
//存入数据库
insertDataToDb(oldData,key);
}
}
map.put(key,dto);
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("连接已断开");
}
@Override
public void onError(Exception ex) {
System.out.println("连接因为异常断开:");
ex.printStackTrace();
}
};
wbc.connect();
return wbc;
}
catch (Exception e){
e.printStackTrace();
}
return null;
}
public static boolean insertDataToDb(BiAnWSDto dto,String key){
String tableName = tableNameMap.get(key);
if(StringUtils.isNotNull(tableName)){
biAnService = SpringUtils.getBean("biAnService");
biAnService.insertUSDPerp(dto,tableName);
}
return true;
}
public static BiAnWSDto getRealDataBykey(String key){
return map.get(key);
}
public static List<BiAnWSDto> getDbDataByKey(String key){
return dbMap.get(key);
}
public static Map<String,BiAnWSDto> getAllRealData(){
return map;
}
public static Map<String, List<BiAnWSDto>> getAllDbData(){
return dbMap;
}
}

View File

@@ -0,0 +1,310 @@
package com.jxy.binance.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jxy.binance.dto.BiAnWSDto;
import com.jxy.binance.service.impl.BiAnServiceImpl;
import com.jxy.common.core.utils.SpringUtils;
import com.jxy.common.core.utils.StringUtils;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.exceptions.InvalidDataException;
import org.java_websocket.framing.Framedata;
import org.java_websocket.framing.FramedataImpl1;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Description B本位WS
* @Date 2023/1/31 17:23
* @Author 杜懿
*/
/**
* 本篇所列出的所有wss接口的baseurl为: wss://stream.binance.com:9443 或者 wss://stream.binance.com:443
* Streams有单一原始 stream 或组合 stream
* 单一原始 streams 格式为 /ws/<streamName>
* 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
* 订阅组合streams时事件payload会以这样的格式封装: {"stream":"<streamName>","data":<rawPayload>}
* stream名称中所有交易对均为 小写
* 每个到 stream.binance.com 的链接有效期不超过24小时请妥善处理断线重连。
* 每3分钟服务端会发送ping帧客户端应当在10分钟内回复pong帧否则服务端会主动断开链接。允许客户端发送不成对的pong帧(即客户端可以以高于10分钟每次的频率发送pong帧保持链接)。
* wss://data-stream.binance.com 可以用来订阅市场信息的数据流。 用户信息无法从此URL获得。
*/
/** 1m 3m 5m 15m 30m 1h 2h 4h 6h 8h 12h 1d 3d 1w 1M */
/**
* Stream Name:
* <symbol>@kline_<interval>
*/
@Component
@Order(value = 3)
public class SstreamTask implements ApplicationRunner {
private static BiAnServiceImpl biAnService;
/** 入库数据 */
private static Map<String,List<BiAnWSDto>> dbMap = new HashMap<>();
/** 推送数据 */
private static Map<String,BiAnWSDto> map = new HashMap<>();
private static Map<String,String> tableNameMap = new HashMap<>();
static {
tableNameMap.put("\"btcusdt1m\"","BTCUSDT");
tableNameMap.put("\"btcusdt3m\"","3mBTCUSDT");
tableNameMap.put("\"btcusdt5m\"","5mBTCUSDT");
tableNameMap.put("\"btcusdt15m\"","15mBTCUSDT");
tableNameMap.put("\"btcusdt30m\"","30mBTCUSDT");
tableNameMap.put("\"btcusdt1h\"","1hBTCUSDT");
tableNameMap.put("\"btcusdt2h\"","2hBTCUSDT");
tableNameMap.put("\"btcusdt4h\"","4hBTCUSDT");
tableNameMap.put("\"btcusdt6h\"","6hBTCUSDT");
tableNameMap.put("\"btcusdt8h\"","8hBTCUSDT");
tableNameMap.put("\"btcusdt12h\"","12hBTCUSDT");
tableNameMap.put("\"btcusdt1d\"","1dBTCUSDT");
tableNameMap.put("\"btcusdt3d\"","3dBTCUSDT");
tableNameMap.put("\"btcusdt1w\"","1wBTCUSDT");
tableNameMap.put("\"btcusdt1M\"","1MBTCUSDT");
tableNameMap.put("\"ethusdt1m\"","ETHUSDT");
tableNameMap.put("\"ethusdt3m\"","3mETHUSDT");
tableNameMap.put("\"ethusdt5m\"","5mETHUSDT");
tableNameMap.put("\"ethusdt15m\"","15mETHUSDT");
tableNameMap.put("\"ethusdt30m\"","30mETHUSDT");
tableNameMap.put("\"ethusdt1h\"","1hETHUSDT");
tableNameMap.put("\"ethusdt2h\"","2hETHUSDT");
tableNameMap.put("\"ethusdt4h\"","4hETHUSDT");
tableNameMap.put("\"ethusdt6h\"","6hETHUSDT");
tableNameMap.put("\"ethusdt8h\"","8hETHUSDT");
tableNameMap.put("\"ethusdt12h\"","12hETHUSDT");
tableNameMap.put("\"ethusdt1d\"","1dETHUSDT");
tableNameMap.put("\"ethusdt3d\"","3dETHUSDT");
tableNameMap.put("\"ethusdt1w\"","1wETHUSDT");
tableNameMap.put("\"ethusdt1M\"","1METHUSDT");
}
@Override
public void run(ApplicationArguments args) throws Exception {
String url = "wss://stream.binance.com:9443/stream?streams=btcusdt@kline_1m/btcusdt@kline_3m/btcusdt@kline_5m/btcusdt@kline_15m/btcusdt@kline_30m/" +
"btcusdt@kline_1h/btcusdt@kline_2h/btcusdt@kline_4h/btcusdt@kline_6h/btcusdt@kline_8h/btcusdt@kline_12h/" +
"btcusdt@kline_1d/btcusdt@kline_3d/btcusdt@kline_1w/btcusdt@kline_1M/" +
"ethusdt@kline_1m/ethusdt@kline_3m/ethusdt@kline_5m/ethusdt@kline_15m/ethusdt@kline_30m/" +
"ethusdt@kline_1h/ethusdt@kline_2h/ethusdt@kline_4h/ethusdt@kline_6h/ethusdt@kline_8h/ethusdt@kline_12h/" +
"ethusdt@kline_1d/ethusdt@kline_3d/ethusdt@kline_1w/ethusdt@kline_1M";
ScheduledExecutorService ses1 = Executors.newSingleThreadScheduledExecutor();
ses1.scheduleAtFixedRate(()->{
WebSocketClient ws = getWS(url);
ScheduledExecutorService ses2 = Executors.newSingleThreadScheduledExecutor();
ses2.scheduleAtFixedRate(()->{
ws.sendPing();
if(!ws.isConnecting()){
//如果连接断开重连
ws.connect();
}},0,3, TimeUnit.MINUTES);
ScheduledExecutorService ses3 = Executors.newSingleThreadScheduledExecutor();
ses3.schedule(()->{ws.close();},18, TimeUnit.HOURS);
},0,18,TimeUnit.HOURS);
//WebSocketClient ws = getWS(url);
//String url = "wss://dstream.binance.com/ws/btcusd_perp@kline_1h";
}
public static WebSocketClient getWS(String url){
try{
WebSocketClient wbc = new WebSocketClient(new URI(url)) {
@Override
public void onOpen(ServerHandshake handshakedata) {
}
@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
conn.sendFrame(f);
}
@Override
public void onMessage(String message) {
JSONObject jsonObject = JSON.parseObject(message);
String stream = JSONObject.toJSONString(jsonObject.get("stream"));
String key = stream.replace("_", "").replace("@kline", "");
String dataStr = JSONObject.toJSONString(jsonObject.get("data"));
String dateStr = JSONObject.toJSONString(JSON.parseObject(dataStr).get("E"));
Date date = JSON.parseObject(dateStr, Date.class);
String kStr = JSONObject.toJSONString(JSON.parseObject(dataStr).get("k"));
String openTimeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("t"));
Date openTime = JSON.parseObject(openTimeStr, Date.class);
Calendar openCa = Calendar.getInstance();
openCa.setTime(openTime);
openCa.add(Calendar.HOUR, +8);
openTime = openCa.getTime();
String closeTimeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("T"));
Date closeTime = JSON.parseObject(closeTimeStr, Date.class);
Calendar closeCa = Calendar.getInstance();
closeCa.setTime(closeTime);
closeCa.add(Calendar.HOUR, +8);
closeTime = closeCa.getTime();
String openPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("o"));
BigDecimal openPrice = JSON.parseObject(openPriceStr, BigDecimal.class);
String highPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("h"));
BigDecimal highPrice = JSON.parseObject(highPriceStr, BigDecimal.class);
String lowPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("l"));
BigDecimal lowPrice = JSON.parseObject(lowPriceStr, BigDecimal.class);
String closePriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("c"));
BigDecimal closePrice = JSON.parseObject(closePriceStr, BigDecimal.class);
String volumeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("v"));
BigDecimal volume = JSON.parseObject(volumeStr, BigDecimal.class);
String qualityStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("q"));
BigDecimal quality = JSON.parseObject(qualityStr, BigDecimal.class);
String buyVolumeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("V"));
BigDecimal buyVolume = JSON.parseObject(buyVolumeStr, BigDecimal.class);
String buyQualityStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("Q"));
BigDecimal buyQuality = JSON.parseObject(buyQualityStr, BigDecimal.class);
String numStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("n"));
BigDecimal num = JSON.parseObject(numStr, BigDecimal.class);
String symbolStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("s"));
String symbol = JSON.parseObject(symbolStr, String.class);
//赋值
BiAnWSDto dto = new BiAnWSDto();
dto.setDate(date);
dto.setOpenTime(openTime);
dto.setCloseTime(closeTime);
dto.setOpenPrice(openPrice);
dto.setHighPrice(highPrice);
dto.setLowPrice(lowPrice);
dto.setClosePrice(closePrice);
dto.setVolume(volume);
dto.setBuyVolume(buyVolume);
dto.setQuality(quality);
dto.setBuyQuality(buyQuality);
dto.setNum(num);
dto.setSymbol(symbol);
//取值对比
if(StringUtils.isNotNull(map.get(key))){
//stream不为空 说明map中已有key对应的数据
BiAnWSDto oldData = map.get(key);
if((oldData.getOpenTime().compareTo(dto.getOpenTime())) != 0){
//map中的数据和新传过来的数据开盘时间不一致 则把新数据放入dbMap中
if(StringUtils.isNull(dbMap.get(key))){
//dbMap中没数据
List<BiAnWSDto> list = new ArrayList<>();
list.add(oldData);
dbMap.put(key,list);
}else{
List<BiAnWSDto> list = dbMap.get(key);
list.add(oldData);
dbMap.put(key,list);
}
//存入数据库
insertDataToDb(oldData,key);
}
}
map.put(key,dto);
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("连接已断开");
}
@Override
public void onError(Exception ex) {
System.out.println("连接因为异常断开:");
ex.printStackTrace();
}
};
wbc.connect();
return wbc;
}
catch (Exception e){
e.printStackTrace();
}
return null;
}
public static boolean insertDataToDb(BiAnWSDto dto,String key){
String tableName = tableNameMap.get(key);
if(StringUtils.isNotNull(tableName)){
biAnService = SpringUtils.getBean("biAnService");
biAnService.insertStock(dto,tableName);
}
return true;
}
public static BiAnWSDto getRealDataBykey(String key){
return map.get(key);
}
public static List<BiAnWSDto> getDbDataByKey(String key){
return dbMap.get(key);
}
public static Map<String,BiAnWSDto> getAllRealData(){
return map;
}
public static Map<String, List<BiAnWSDto>> getAllDbData(){
return dbMap;
}
}

View File

@@ -0,0 +1,316 @@
package com.jxy.binance.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jxy.binance.dto.BiAnWSDto;
import com.jxy.binance.service.impl.BiAnServiceImpl;
import com.jxy.common.core.utils.SpringUtils;
import com.jxy.common.core.utils.StringUtils;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.net.URI;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @Description U本位WS
* @Date 2023/1/31 17:23
* @Author 杜懿
*/
/**
* 连接方式:
* Base Urlwss://fstream.binance.com
* 订阅单一stream格式为 /ws/<streamName>
* 组合streams的URL格式为 /stream?streams=<streamName1>/<streamName2>/<streamName3>
* 连接样例:
* wss://fstream.binance.com/ws/bnbusdt@aggTrade
* wss://fstream.binance.com/stream?streams=bnbusdt@aggTrade/btcusdt@markPrice
* 每个链接有效期不超过24小时请妥善处理断线重连。
*
* 服务端每5分钟会发送ping帧客户端应当在15分钟内回复pong帧否则服务端会主动断开链接。允许客户端发送不成对的pong帧(即客户端可以以高于15分钟每次的频率发送pong帧保持链接)。
*
* Websocket服务器每秒最多接受10个订阅消息。
*
* 如果用户发送的消息超过限制连接会被断开连接。反复被断开连接的IP有可能被服务器屏蔽。
*
* 单个连接最多可以订阅 200 个Streams。
*/
@Component
@Order(value = 2)
public class UstreamTask implements ApplicationRunner {
private static BiAnServiceImpl biAnService;
/** 入库数据 */
private static Map<String,List<BiAnWSDto>> dbMap = new HashMap<>();
/** 推送数据 */
private static Map<String,BiAnWSDto> map = new HashMap<>();
private static Map<String,String> tableNameMap = new HashMap<>();
static {
tableNameMap.put("\"btcusdt1m\"","BTCUSDT");
tableNameMap.put("\"btcusdt3m\"","3mBTCUSDT");
tableNameMap.put("\"btcusdt5m\"","5mBTCUSDT");
tableNameMap.put("\"btcusdt15m\"","15mBTCUSDT");
tableNameMap.put("\"btcusdt30m\"","30mBTCUSDT");
tableNameMap.put("\"btcusdt1h\"","1hBTCUSDT");
tableNameMap.put("\"btcusdt2h\"","2hBTCUSDT");
tableNameMap.put("\"btcusdt4h\"","4hBTCUSDT");
tableNameMap.put("\"btcusdt6h\"","6hBTCUSDT");
tableNameMap.put("\"btcusdt8h\"","8hBTCUSDT");
tableNameMap.put("\"btcusdt12h\"","12hBTCUSDT");
tableNameMap.put("\"btcusdt1d\"","1dBTCUSDT");
tableNameMap.put("\"btcusdt3d\"","3dBTCUSDT");
tableNameMap.put("\"btcusdt1w\"","1wBTCUSDT");
tableNameMap.put("\"btcusdt1M\"","1MBTCUSDT");
tableNameMap.put("\"ethusdt1m\"","ETHUSDT");
tableNameMap.put("\"ethusdt3m\"","3mETHUSDT");
tableNameMap.put("\"ethusdt5m\"","5mETHUSDT");
tableNameMap.put("\"ethusdt15m\"","15mETHUSDT");
tableNameMap.put("\"ethusdt30m\"","30mETHUSDT");
tableNameMap.put("\"ethusdt1h\"","1hETHUSDT");
tableNameMap.put("\"ethusdt2h\"","2hETHUSDT");
tableNameMap.put("\"ethusdt4h\"","4hETHUSDT");
tableNameMap.put("\"ethusdt6h\"","6hETHUSDT");
tableNameMap.put("\"ethusdt8h\"","8hETHUSDT");
tableNameMap.put("\"ethusdt12h\"","12hETHUSDT");
tableNameMap.put("\"ethusdt1d\"","1dETHUSDT");
tableNameMap.put("\"ethusdt3d\"","3dETHUSDT");
tableNameMap.put("\"ethusdt1w\"","1wETHUSDT");
tableNameMap.put("\"ethusdt1M\"","1METHUSDT");
}
@Override
public void run(ApplicationArguments args) throws Exception {
String url = "wss://fstream.binance.com/stream?streams=btcusdt@kline_1m/btcusdt@kline_3m/btcusdt@kline_5m/btcusdt@kline_15m/btcusdt@kline_30m/" +
"btcusdt@kline_1h/btcusdt@kline_2h/btcusdt@kline_4h/btcusdt@kline_6h/btcusdt@kline_8h/btcusdt@kline_12h/" +
"btcusdt@kline_1d/btcusdt@kline_3d/btcusdt@kline_1w/btcusdt@kline_1M/" +
"ethusdt@kline_1m/ethusdt@kline_3m/ethusdt@kline_5m/ethusdt@kline_15m/ethusdt@kline_30m/" +
"ethusdt@kline_1h/ethusdt@kline_2h/ethusdt@kline_4h/ethusdt@kline_6h/ethusdt@kline_8h/ethusdt@kline_12h/" +
"ethusdt@kline_1d/ethusdt@kline_3d/ethusdt@kline_1w/ethusdt@kline_1M";
ScheduledExecutorService ses1 = Executors.newSingleThreadScheduledExecutor();
ses1.scheduleAtFixedRate(()->{
WebSocketClient ws = getWS(url);
ScheduledExecutorService ses2 = Executors.newSingleThreadScheduledExecutor();
ses2.scheduleAtFixedRate(()->{
ws.sendPing();
if(!ws.isConnecting()){
//如果连接断开重连
ws.connect();
}},0,3, TimeUnit.MINUTES);
ScheduledExecutorService ses3 = Executors.newSingleThreadScheduledExecutor();
ses3.schedule(()->{ws.close();},18, TimeUnit.HOURS);
},0,18,TimeUnit.HOURS);
//WebSocketClient ws = getWS(url);
//String url = "wss://dstream.binance.com/ws/btcusd_perp@kline_1h";
}
public static WebSocketClient getWS(String url){
try{
WebSocketClient wbc = new WebSocketClient(new URI(url)) {
@Override
public void onOpen(ServerHandshake handshakedata) {
}
@Override
public void onWebsocketPing(WebSocket conn, Framedata f) {
conn.sendFrame(f);
}
@Override
public void onMessage(String message) {
JSONObject jsonObject = JSON.parseObject(message);
String stream = JSONObject.toJSONString(jsonObject.get("stream"));
//stream:"ethusdt@kline_1m"
String key = stream.replace("_", "").replace("@kline", "");
String dataStr = JSONObject.toJSONString(jsonObject.get("data"));
String dateStr = JSONObject.toJSONString(JSON.parseObject(dataStr).get("E"));
Date date = JSON.parseObject(dateStr, Date.class);
String kStr = JSONObject.toJSONString(JSON.parseObject(dataStr).get("k"));
String openTimeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("t"));
Date openTime = JSON.parseObject(openTimeStr, Date.class);
Calendar openCa = Calendar.getInstance();
openCa.setTime(openTime);
openCa.add(Calendar.HOUR, +8);
openTime = openCa.getTime();
String closeTimeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("T"));
Date closeTime = JSON.parseObject(closeTimeStr, Date.class);
Calendar closeCa = Calendar.getInstance();
closeCa.setTime(closeTime);
closeCa.add(Calendar.HOUR, +8);
closeTime = closeCa.getTime();
String openPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("o"));
BigDecimal openPrice = JSON.parseObject(openPriceStr, BigDecimal.class);
String highPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("h"));
BigDecimal highPrice = JSON.parseObject(highPriceStr, BigDecimal.class);
String lowPriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("l"));
BigDecimal lowPrice = JSON.parseObject(lowPriceStr, BigDecimal.class);
String closePriceStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("c"));
BigDecimal closePrice = JSON.parseObject(closePriceStr, BigDecimal.class);
String volumeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("v"));
BigDecimal volume = JSON.parseObject(volumeStr, BigDecimal.class);
String qualityStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("q"));
BigDecimal quality = JSON.parseObject(qualityStr, BigDecimal.class);
String buyVolumeStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("V"));
BigDecimal buyVolume = JSON.parseObject(buyVolumeStr, BigDecimal.class);
String buyQualityStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("Q"));
BigDecimal buyQuality = JSON.parseObject(buyQualityStr, BigDecimal.class);
String numStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("n"));
BigDecimal num = JSON.parseObject(numStr, BigDecimal.class);
String symbolStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("s"));
String symbol = JSON.parseObject(symbolStr, String.class);
String flagStr = JSONObject.toJSONString(JSON.parseObject(kStr).get("x"));
boolean flag = JSON.parseObject(flagStr, Boolean.class);
//赋值
BiAnWSDto dto = new BiAnWSDto();
dto.setDate(date);
dto.setOpenTime(openTime);
dto.setCloseTime(closeTime);
dto.setOpenPrice(openPrice);
dto.setHighPrice(highPrice);
dto.setLowPrice(lowPrice);
dto.setClosePrice(closePrice);
dto.setVolume(volume);
dto.setBuyVolume(buyVolume);
dto.setQuality(quality);
dto.setBuyQuality(buyQuality);
dto.setNum(num);
dto.setSymbol(symbol);
//取值对比
if(flag){
//是K线最后一条数据 即不再变化的数据
//存入数据库
insertDataToDb(dto,key);
}
if(StringUtils.isNotNull(map.get(key))){
//stream不为空 说明map中已有key对应的数据
BiAnWSDto oldData = map.get(key);
if((oldData.getOpenTime().compareTo(dto.getOpenTime())) != 0){
//map中的数据和新传过来的数据开盘时间不一致 则把新数据放入dbMap中
if(StringUtils.isNull(dbMap.get(key))){
//dbMap中没数据
List<BiAnWSDto> list = new ArrayList<>();
list.add(oldData);
dbMap.put(key,list);
}else{
List<BiAnWSDto> list = dbMap.get(key);
list.add(oldData);
dbMap.put(key,list);
}
//存入数据库
insertDataToDb(oldData,key);
}
}
map.put(key,dto);
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("连接已断开");
}
@Override
public void onError(Exception ex) {
System.out.println("连接因为异常断开:");
ex.printStackTrace();
}
};
wbc.connect();
return wbc;
}
catch (Exception e){
e.printStackTrace();
}
return null;
}
public static boolean insertDataToDb(BiAnWSDto dto,String key){
String tableName = tableNameMap.get(key);
if(StringUtils.isNotNull(tableName)){
biAnService = SpringUtils.getBean("biAnService");
biAnService.insertUSDT(dto,tableName);
}
return true;
}
public static BiAnWSDto getRealDataBykey(String key){
return map.get(key);
}
public static List<BiAnWSDto> getDbDataByKey(String key){
return dbMap.get(key);
}
public static Map<String,BiAnWSDto> getAllRealData(){
return map;
}
public static Map<String, List<BiAnWSDto>> getAllDbData(){
return dbMap;
}
}

View File

@@ -0,0 +1,111 @@
package com.jxy.binance.thread;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSONArray;
import com.jxy.binance.dto.BiAnWSDto;
import com.jxy.binance.service.impl.BiAnServiceImpl;
import com.jxy.common.core.text.Convert;
import com.jxy.common.core.utils.SpringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @Description TODO
* @Date 2023/2/7 17:15
* @Author 杜懿
*/
public class BstreamHistoryThread implements Runnable {
private static BiAnServiceImpl biAnService;
private static long start;
private static long endtime;
private static String table ;
private static String symbol ;
private static String interval;
public BstreamHistoryThread(String symbol,Long start,Long endtime,String table,String interval) {
this.table = table;
this.endtime = endtime;
this.symbol = symbol;
this.start = start;
this.interval = interval;
}
@Override
public void run() {
biAnService = SpringUtils.getBean("biAnService");
System.out.println("线程开始执行");
try {
while (!Thread.currentThread().isInterrupted()){
List<BiAnWSDto> list = getData(symbol,interval,endtime);
if(list.size() > 0){
//入库
biAnService.batchUSDPerp(list,table);
endtime = list.get(0).getOpenTime().getTime() - 28800000 - 1;
if(start > endtime){
Date startDate = new Date(start);
System.out.println("开始时间:"+startDate);
Date endDate = new Date(endtime);
System.out.println("结束时间"+endDate);
System.out.println("数据获取完成 停止线程");
break;
}
}else{
System.out.println("数据获取完成 停止线程");
Thread.currentThread();
break;
}
Thread.sleep(1000);
}
}catch (InterruptedException e){
e.printStackTrace();
Thread curr = Thread.currentThread();
//再次调用interrupt方法中断自己将中断状态设置为“中断”
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
curr.interrupt();
}
}
public List<BiAnWSDto> getData(String symbol,String interval,Long endTime){
String url = "https://dapi.binance.com/dapi/v1/klines?symbol="+symbol+"&interval="+interval+"&endTime="+endTime+"&limit="+500;
String result = HttpUtil.get(url);
List<String> list = JSONArray.parseArray(result,String.class);
List<BiAnWSDto> returnList = new ArrayList<>();
for(int i = 0 ; i < list.size();i++){
Object[] objects = JSONArray.parseArray(list.get(i)).stream().toArray();
BiAnWSDto biAnWSDto = new BiAnWSDto();
biAnWSDto.setOpenTime(new Date(Convert.toLong(objects[0])+28800000));
biAnWSDto.setOpenPrice(Convert.toBigDecimal(objects[1]));
biAnWSDto.setHighPrice(Convert.toBigDecimal(objects[2]));
biAnWSDto.setLowPrice(Convert.toBigDecimal(objects[3]));
biAnWSDto.setClosePrice(Convert.toBigDecimal(objects[4]));
biAnWSDto.setVolume(Convert.toBigDecimal(objects[5]));
biAnWSDto.setCloseTime(new Date(Convert.toLong(objects[6])+28800000+1));
biAnWSDto.setQuality(Convert.toBigDecimal(objects[7]));
biAnWSDto.setNum(Convert.toBigDecimal(objects[8]));
biAnWSDto.setBuyVolume(Convert.toBigDecimal(objects[9]));
biAnWSDto.setBuyQuality(Convert.toBigDecimal(objects[10]));
biAnWSDto.setSymbol(symbol.toUpperCase());
returnList.add(biAnWSDto);
}
return returnList;
}
}

View File

@@ -0,0 +1,111 @@
package com.jxy.binance.thread;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSONArray;
import com.jxy.binance.dto.BiAnWSDto;
import com.jxy.binance.service.impl.BiAnServiceImpl;
import com.jxy.common.core.text.Convert;
import com.jxy.common.core.utils.SpringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @Description TODO
* @Date 2023/2/7 17:15
* @Author 杜懿
*/
public class SstreamHistoryThread implements Runnable {
private static BiAnServiceImpl biAnService;
private static long start;
private static long endtime;
private static String table ;
private static String symbol;
private static String interval;
public SstreamHistoryThread(String symbol,Long start,Long endtime,String table,String interval) {
this.table = table;
this.endtime = endtime;
this.symbol = symbol;
this.interval = interval;
this.start = start;
}
@Override
public void run() {
biAnService = SpringUtils.getBean("biAnService");
System.out.println("线程开始执行");
try {
while (!Thread.currentThread().isInterrupted()){
List<BiAnWSDto> list = getData(symbol,interval,endtime);
if(list.size() > 0){
//入库
biAnService.batchStock(list,table);
endtime = list.get(0).getOpenTime().getTime() - 1;
if(start > endtime){
Date startDate = new Date(start);
System.out.println("开始时间:"+startDate);
Date endDate = new Date(endtime);
System.out.println("结束时间"+endDate);
System.out.println("数据获取完成 停止线程");
break;
}
}else{
System.out.println("数据获取完成 停止线程");
Thread.currentThread();
break;
}
Thread.sleep(1000);
}
}catch (InterruptedException e){
e.printStackTrace();
Thread curr = Thread.currentThread();
//再次调用interrupt方法中断自己将中断状态设置为“中断”
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
curr.interrupt();
}
}
public List<BiAnWSDto> getData(String symbol,String interval,Long endTime){
String url = "https://data.binance.com/api/v3/klines?symbol="+symbol+"&interval="+interval+"&endTime="+endTime+"&limit="+500;
String result = HttpUtil.get(url);
List<String> list = JSONArray.parseArray(result,String.class);
List<BiAnWSDto> returnList = new ArrayList<>();
for(int i = 0 ; i < list.size();i++){
Object[] objects = JSONArray.parseArray(list.get(i)).stream().toArray();
BiAnWSDto biAnWSDto = new BiAnWSDto();
biAnWSDto.setOpenTime(new Date(Convert.toLong(objects[0])+28800000));
biAnWSDto.setOpenPrice(Convert.toBigDecimal(objects[1]));
biAnWSDto.setHighPrice(Convert.toBigDecimal(objects[2]));
biAnWSDto.setLowPrice(Convert.toBigDecimal(objects[3]));
biAnWSDto.setClosePrice(Convert.toBigDecimal(objects[4]));
biAnWSDto.setVolume(Convert.toBigDecimal(objects[5]));
biAnWSDto.setCloseTime(new Date(Convert.toLong(objects[6])+28800000+1));
biAnWSDto.setQuality(Convert.toBigDecimal(objects[7]));
biAnWSDto.setNum(Convert.toBigDecimal(objects[8]));
biAnWSDto.setBuyVolume(Convert.toBigDecimal(objects[9]));
biAnWSDto.setBuyQuality(Convert.toBigDecimal(objects[10]));
biAnWSDto.setSymbol(symbol.toUpperCase());
returnList.add(biAnWSDto);
}
return returnList;
}
}

View File

@@ -0,0 +1,113 @@
package com.jxy.binance.thread;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSONArray;
import com.jxy.binance.dto.BiAnWSDto;
import com.jxy.binance.service.impl.BiAnServiceImpl;
import com.jxy.common.core.text.Convert;
import com.jxy.common.core.utils.SpringUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @Description TODO
* @Date 2023/2/7 17:15
* @Author 杜懿
*/
public class UstreamHistoryThread implements Runnable {
private static BiAnServiceImpl biAnService;
private static long start;
private static long endtime;
private static String table ;
private static String symbol;
private static String interval;
public UstreamHistoryThread(String symbol,Long start,Long endtime,String table,String interval) {
this.table = table;
this.endtime = endtime;
this.symbol = symbol;
this.start = start;
this.interval = interval;
}
@Override
public void run() {
biAnService = SpringUtils.getBean("biAnService");
System.out.println("线程开始执行");
System.out.println("往数据表"+table+"中插入"+start+""+endtime+"的数据,间隔"+interval);
try {
while (!Thread.currentThread().isInterrupted()){
List<BiAnWSDto> list = getData(symbol,interval,endtime);
if(list.size() > 0){
//入库
biAnService.batchUSDT(list,table);
//开始时间-时区修正值
endtime = list.get(0).getOpenTime().getTime() - 28800000 - 1;
if(start > endtime){
Date startDate = new Date(start);
System.out.println("开始时间:"+startDate);
Date endDate = new Date(endtime);
System.out.println("结束时间"+endDate);
System.out.println("数据获取完成 停止线程");
break;
}
}else{
System.out.println("数据获取完成 停止线程");
Thread.currentThread();
break;
}
Thread.sleep(250);
}
}catch (InterruptedException e){
e.printStackTrace();
Thread curr = Thread.currentThread();
//再次调用interrupt方法中断自己将中断状态设置为“中断”
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
curr.interrupt();
}
}
public List<BiAnWSDto> getData(String symbol,String interval,Long endTime){
String url = "https://fapi.binance.com/fapi/v1/klines?symbol="+symbol+"&interval="+interval+"&endTime="+endTime+"&limit="+500;
String result = HttpUtil.get(url);
List<String> list = JSONArray.parseArray(result,String.class);
List<BiAnWSDto> returnList = new ArrayList<>();
for(int i = 0 ; i < list.size();i++){
Object[] objects = JSONArray.parseArray(list.get(i)).stream().toArray();
BiAnWSDto biAnWSDto = new BiAnWSDto();
biAnWSDto.setOpenTime(new Date(Convert.toLong(objects[0])+28800000));
biAnWSDto.setOpenPrice(Convert.toBigDecimal(objects[1]));
biAnWSDto.setHighPrice(Convert.toBigDecimal(objects[2]));
biAnWSDto.setLowPrice(Convert.toBigDecimal(objects[3]));
biAnWSDto.setClosePrice(Convert.toBigDecimal(objects[4]));
biAnWSDto.setVolume(Convert.toBigDecimal(objects[5]));
biAnWSDto.setCloseTime(new Date(Convert.toLong(objects[6])+28800000+1));
biAnWSDto.setQuality(Convert.toBigDecimal(objects[7]));
biAnWSDto.setNum(Convert.toBigDecimal(objects[8]));
biAnWSDto.setBuyVolume(Convert.toBigDecimal(objects[9]));
biAnWSDto.setBuyQuality(Convert.toBigDecimal(objects[10]));
biAnWSDto.setSymbol(symbol.toUpperCase());
returnList.add(biAnWSDto);
}
return returnList;
}
}

View File

@@ -0,0 +1,25 @@
# Tomcat
server:
port: 9205
# Spring
spring:
application:
# 应用名称
name: jxy-binance
profiles:
# 环境配置
active: dev
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: 127.0.0.1:8848
config:
# 配置中心地址
server-addr: 127.0.0.1:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-configs:
- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

View File

@@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志存放路径 -->
<property name="log.path" value="logs/jxy-binance" />
<!-- 日志输出格式 -->
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
<!-- 控制台输出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
</appender>
<!-- 系统日志输出 -->
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/info.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>INFO</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>ERROR</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 系统模块日志级别控制 -->
<logger name="com.jxy" level="info" />
<!-- Spring日志级别控制 -->
<logger name="org.springframework" level="warn" />
<root level="info">
<appender-ref ref="console" />
</root>
<!--系统操作日志-->
<root level="info">
<appender-ref ref="file_info" />
<appender-ref ref="file_error" />
</root>
</configuration>

View File

@@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jxy.binance.mapper.BiAnMapper2">
<insert id="inserthBtcusdtPERP" parameterType="com.jxy.binance.dto.BiAnWSDto">
insert into binance_usd.1hBTCUSDPERP(open_time,close_time,open_price,high_price,
low_price,close_price,volume,quality,buy_volume,buy_quality,symbol,num )
values (#{dto.openTime}, #{dto.closeTime}, #{dto.openPrice},
#{dto.highPrice}, #{dto.lowPrice}, #{dto.closePrice},
#{dto.volume}, #{dto.quality}, #{dto.buyVolume}, #{dto.buyQuality}, #{dto.symbol}, #{dto.num})
</insert>
<insert id="insertUSDPERP" parameterType="com.jxy.binance.dto.BiAnWSDto">
insert into binance_usd.${table}(open_time,close_time,open_price,high_price,
low_price,close_price,volume,quality,buy_volume,buy_quality,symbol,num )
values (#{dto.openTime}, #{dto.closeTime}, #{dto.openPrice},
#{dto.highPrice}, #{dto.lowPrice}, #{dto.closePrice},
#{dto.volume}, #{dto.quality}, #{dto.buyVolume}, #{dto.buyQuality}, #{dto.symbol}, #{dto.num})
ON duplicate key update open_time=open_time
</insert>
<insert id="insertStock">
insert into binance_stocks.${table}(open_time,close_time,open_price,high_price,
low_price,close_price,volume,quality,buy_volume,buy_quality,symbol,num )
values (#{dto.openTime}, #{dto.closeTime}, #{dto.openPrice},
#{dto.highPrice}, #{dto.lowPrice}, #{dto.closePrice},
#{dto.volume}, #{dto.quality}, #{dto.buyVolume}, #{dto.buyQuality}, #{dto.symbol}, #{dto.num})
ON duplicate key update open_time=open_time
</insert>
<insert id="insertUSDT">
insert into binance_usdt.${table}(open_time,close_time,open_price,high_price,
low_price,close_price,volume,quality,buy_volume,buy_quality,symbol,num,now_date)
values (#{dto.openTime}, #{dto.closeTime}, #{dto.openPrice},
#{dto.highPrice}, #{dto.lowPrice}, #{dto.closePrice},
#{dto.volume}, #{dto.quality}, #{dto.buyVolume}, #{dto.buyQuality}, #{dto.symbol}, #{dto.num},sysdate())
ON duplicate key update open_time=open_time,open_price = #{dto.openPrice},high_price = #{dto.highPrice},
low_price = #{dto.lowPrice},close_price = #{dto.closePrice},volume = #{dto.volume},quality = #{dto.quality},buy_volume = #{dto.buyVolume}, buy_quality = #{dto.buyQuality},symbol = #{dto.symbol},num =#{dto.num}
</insert>
<insert id="batchUSDPERP">
insert into binance_usd.${table}(open_time,close_time,open_price,high_price,
low_price,close_price,volume,quality,buy_volume,buy_quality,symbol,num ) values
<foreach collection="list" separator="," item="dto" index="index">
(#{dto.openTime}, #{dto.closeTime}, #{dto.openPrice},
#{dto.highPrice}, #{dto.lowPrice}, #{dto.closePrice},
#{dto.volume}, #{dto.quality}, #{dto.buyVolume}, #{dto.buyQuality}, #{dto.symbol}, #{dto.num})
</foreach>
ON duplicate key update open_time=open_time
</insert>
<insert id="batchUSDT" parameterType="java.util.List">
insert into binance_usdt.${table}(open_time,close_time,open_price,high_price,
low_price,close_price,volume,quality,buy_volume,buy_quality,symbol,num ) values
<foreach collection="list" separator="," item="dto" index="index">
(#{dto.openTime}, #{dto.closeTime}, #{dto.openPrice},
#{dto.highPrice}, #{dto.lowPrice}, #{dto.closePrice},
#{dto.volume}, #{dto.quality}, #{dto.buyVolume}, #{dto.buyQuality}, #{dto.symbol}, #{dto.num})
</foreach>
ON duplicate key update open_time=open_time
</insert>
<insert id="batchStock">
insert into binance_stocks.${table}(open_time,close_time,open_price,high_price,
low_price,close_price,volume,quality,buy_volume,buy_quality,symbol,num ) values
<foreach collection="list" separator="," item="dto" index="index">
(#{dto.openTime}, #{dto.closeTime}, #{dto.openPrice},
#{dto.highPrice}, #{dto.lowPrice}, #{dto.closePrice},
#{dto.volume}, #{dto.quality}, #{dto.buyVolume}, #{dto.buyQuality}, #{dto.symbol}, #{dto.num})
</foreach>
ON duplicate key update open_time=open_time
</insert>
<select id="getTotalBTCClosePrice" resultType="com.jxy.binance.dto.BiAnWSDto">
select * from binance_usd.1hBTCUSDPERP order by open_time desc;
</select>
<select id="getTotalETHClosePrice" resultType="com.jxy.binance.dto.BiAnWSDto">
select * from binance_usd.1hETHUSDPERP order by open_time desc;
</select>
</mapper>