src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxNewPriceWebSocketClient.java
@@ -4,12 +4,16 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService; import com.xcong.excoin.modules.okxNewPrice.okxWs.AccountWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.PositionsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.TradeOrderWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.OrderParamEnums; import com.xcong.excoin.modules.okxNewPrice.okxWs.param.TradeRequestParam; import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListEnum; import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListService; import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig; import com.xcong.excoin.modules.okxNewPrice.utils.WsMapBuild; import com.xcong.excoin.utils.RedisUtils; import java.math.BigDecimal; import lombok.extern.slf4j.Slf4j; @@ -313,6 +317,43 @@ String accountName = client.getAccountName(); if (accountName != null) { log.info("当前价格{}属于网格: {}-{}({}-{})", markPx, gridByPriceNew.getName(),gridByPriceNew.getFang_xiang(), gridByPriceNew.getJiage_xiaxian(), gridByPriceNew.getJiage_shangxian()); //处理历史网格信息 String fangXiang = gridByPriceNew.getFang_xiang(); String posSideOld = null; if (CoinEnums.POSSIDE_LONG.getCode().equals(fangXiang)) { posSideOld = CoinEnums.POSSIDE_SHORT.getCode(); } if (CoinEnums.POSSIDE_SHORT.getCode().equals(fangXiang)) { posSideOld = CoinEnums.POSSIDE_LONG.getCode(); } String positionAccountName = PositionsWs.initAccountName(accountName, posSideOld); // 判断是否保证金超标 if ( PositionsWs.getAccountMap(positionAccountName).get("pos") != null && PositionsWs.getAccountMap(positionAccountName).get("pos").compareTo(BigDecimal.ZERO) > 0 ){ BigDecimal avgPx = PositionsWs.getAccountMap(positionAccountName).get("avgPx"); WangGeListEnum gridByPriceOld = WangGeListEnum.getGridByPrice(avgPx); if (gridByPriceOld != null){ String zhiSunDian = gridByPriceOld.getZhi_sun_dian(); if (CoinEnums.POSSIDE_SHORT.getCode().equals(posSideOld)) { if (new BigDecimal(markPx).compareTo(new BigDecimal(zhiSunDian)) > 0){ TradeRequestParam tradeRequestParam = caoZuoService.caoZuoZhiSunEvent(accountName, markPx, posSideOld); TradeOrderWs.orderEvent(client.getWebSocketClient(), tradeRequestParam); } } if (CoinEnums.POSSIDE_LONG.getCode().equals(posSideOld)) { if (new BigDecimal(markPx).compareTo(new BigDecimal(zhiSunDian)) < 0){ TradeRequestParam tradeRequestParam = caoZuoService.caoZuoZhiSunEvent(accountName, markPx, posSideOld); TradeOrderWs.orderEvent(client.getWebSocketClient(), tradeRequestParam); } } } } //当前下单 wangGeListService.initWangGe(markPx); TradeRequestParam tradeRequestParam = caoZuoService.caoZuoHandler(accountName, markPx, gridByPriceNew.getFang_xiang()); TradeOrderWs.orderEvent(client.getWebSocketClient(), tradeRequestParam); src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
@@ -120,22 +120,6 @@ * 销毁方法,在 Spring Bean 销毁前执行。 * 关闭 WebSocket 连接、停止心跳定时器及相关的线程资源。 */ // @PreDestroy // public void destroy() { // if (webSocketClient != null && webSocketClient.isOpen()) { // subscribeAccountChannel(UNSUBSCRIBE); // subscribePositionChannel(UNSUBSCRIBE); // subscribeOrderInfoChannel(UNSUBSCRIBE); // webSocketClient.close(); // } // shutdownExecutorGracefully(heartbeatExecutor); // if (pongTimeoutFuture != null) { // pongTimeoutFuture.cancel(true); // } // shutdownExecutorGracefully(sharedExecutor); // // // 移除了 reconnectScheduler 的关闭操作 // } @PreDestroy public void destroy() { log.info("开始销毁OkxQuantWebSocketClient"); @@ -395,8 +379,8 @@ // 这会导致多账号之间的数据冲突。需要进一步修改这些类的设计,让数据存储与特定账号关联 if (OrderInfoWs.ORDERINFOWS_CHANNEL.equals(channel)) { List<TradeRequestParam> tradeRequestParams = OrderInfoWs.handleEvent(response, redisUtils, account.name()); TradeOrderWs.orderZhiYingEvent(webSocketClient, tradeRequestParams); TradeRequestParam tradeRequestParam = OrderInfoWs.handleEvent(response, redisUtils, account.name()); TradeOrderWs.orderZhiYingEvent(webSocketClient, tradeRequestParam); }else if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) { AccountWs.handleEvent(response, account.name()); src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/AccountWs.java
@@ -55,14 +55,12 @@ String connId = MallUtils.getOrderNum(ACCOUNTWS_CHANNEL); JSONObject jsonObject = WsParamBuild.buildJsonObject(connId, option, argsArray); webSocketClient.send(jsonObject.toJSONString()); // log.info("发送账户频道:{}", option); } catch (Exception e) { log.error("订阅账户频道构建失败", e); } } public static void initEvent(JSONObject response, String accountName) { // log.info("订阅成功: {}", response.getJSONObject("arg")); JSONObject arg = response.getJSONObject("arg"); initParam(arg, accountName); } @@ -76,7 +74,6 @@ public static void handleEvent(JSONObject response, String accountName) { // log.info("开始执行AccountWs......{}",ACCOUNTWS_CHANNEL); try { JSONArray dataArray = response.getJSONArray("data"); if (dataArray == null || dataArray.isEmpty()) { src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/OrderInfoWs.java
@@ -71,7 +71,7 @@ private static final String STATE_KEY = "state"; private static final String FILLFEE_KEY = "fillFee"; private static final String POSSIDE_KEY = "posSide"; public static List<TradeRequestParam> handleEvent(JSONObject response, RedisUtils redisUtils, String accountName) { public static TradeRequestParam handleEvent(JSONObject response, RedisUtils redisUtils, String accountName) { log.info("开始执行OrderInfoWs......"); try { @@ -142,8 +142,6 @@ log.info("{}: 订单详情已完成: {}, 自定义编号: {}", accountName, CoinEnums.HE_YUE.getCode(), clOrdId); List<TradeRequestParam> tradeRequestParamList = new ArrayList<>(); TradeRequestParam tradeRequestParam = new TradeRequestParam(); tradeRequestParam.setAccountName(accountName); BigDecimal zhiYingPx = getZhiYingPx( @@ -178,9 +176,8 @@ tradeRequestParam.setSide(CoinEnums.POSSIDE_LONG.getCode().equals(posSide) ? CoinEnums.SIDE_SELL.getCode() : CoinEnums.SIDE_BUY.getCode()); tradeRequestParam.setClOrdId(WsParamBuild.getOrderNum(side)); tradeRequestParam.setSz(accFillSz); tradeRequestParamList.add(tradeRequestParam); return tradeRequestParamList; return tradeRequestParam; } return null; src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/TradeOrderWs.java
@@ -130,84 +130,82 @@ } } public static void orderZhiYingEvent(WebSocketClient webSocketClient, List<TradeRequestParam> tradeRequestParams) { public static void orderZhiYingEvent(WebSocketClient webSocketClient, TradeRequestParam tradeRequestParam) { log.info("开始执行限价{}......",JSONUtil.parse(tradeRequestParams)); if (tradeRequestParams == null){ log.info("开始执行限价{}......",JSONUtil.parse(tradeRequestParam)); if (tradeRequestParam == null){ log.warn("限价下单参数缺失,取消发送"); return; } for (TradeRequestParam tradeRequestParam : tradeRequestParams){ String accountName = tradeRequestParam.getAccountName(); String markPx = tradeRequestParam.getMarkPx(); String instId = tradeRequestParam.getInstId(); String tdMode = tradeRequestParam.getTdMode(); String posSide = tradeRequestParam.getPosSide(); String ordType = tradeRequestParam.getOrdType(); String accountName = tradeRequestParam.getAccountName(); String markPx = tradeRequestParam.getMarkPx(); String instId = tradeRequestParam.getInstId(); String tdMode = tradeRequestParam.getTdMode(); String posSide = tradeRequestParam.getPosSide(); String ordType = tradeRequestParam.getOrdType(); String tradeType = tradeRequestParam.getTradeType(); String tradeType = tradeRequestParam.getTradeType(); String clOrdId = tradeRequestParam.getClOrdId(); String side = tradeRequestParam.getSide(); String sz = tradeRequestParam.getSz(); /** * 校验必要参数 * 验证下单参数是否存在空值 */ if ( StrUtil.isBlank(accountName) || StrUtil.isBlank(instId) || StrUtil.isBlank(tdMode) || StrUtil.isBlank(posSide) || StrUtil.isBlank(ordType) || StrUtil.isBlank(clOrdId) || StrUtil.isBlank(side) || StrUtil.isBlank(sz) || StrUtil.isBlank(markPx) String clOrdId = tradeRequestParam.getClOrdId(); String side = tradeRequestParam.getSide(); String sz = tradeRequestParam.getSz(); /** * 校验必要参数 * 验证下单参数是否存在空值 */ if ( StrUtil.isBlank(accountName) || StrUtil.isBlank(instId) || StrUtil.isBlank(tdMode) || StrUtil.isBlank(posSide) || StrUtil.isBlank(ordType) || StrUtil.isBlank(clOrdId) || StrUtil.isBlank(side) || StrUtil.isBlank(sz) || StrUtil.isBlank(markPx) ){ log.warn("下单参数缺失,取消发送"); return; } log.info("账户:{},触发价格:{},币种:{},方向:{},买卖:{},数量:{},是否允许下单:{},编号:{},", accountName, markPx, instId, posSide,side, sz, tradeType, clOrdId); //验证是否允许下单 if (StrUtil.isNotEmpty(tradeType) && OrderParamEnums.TRADE_NO.getValue().equals(tradeType)) { log.warn("账户{}不允许下单,取消发送", accountName); return; } ){ log.warn("下单参数缺失,取消发送"); return; } log.info("账户:{},触发价格:{},币种:{},方向:{},买卖:{},数量:{},是否允许下单:{},编号:{},", accountName, markPx, instId, posSide,side, sz, tradeType, clOrdId); //验证是否允许下单 if (StrUtil.isNotEmpty(tradeType) && OrderParamEnums.TRADE_NO.getValue().equals(tradeType)) { log.warn("账户{}不允许下单,取消发送", accountName); return; } /** * 检验账户和仓位是否准备就绪 * 开多:买入开多(side 填写 buy; posSide 填写 long ) * 开空:卖出开空(side 填写 sell; posSide 填写 short ) 需要检验账户通道是否准备就绪 * 平多:卖出平多(side 填写 sell;posSide 填写 long ) * 平空:买入平空(side 填写 buy; posSide 填写 short ) 需要检验仓位通道是否准备就绪 */ /** * 检验账户和仓位是否准备就绪 * 开多:买入开多(side 填写 buy; posSide 填写 long ) * 开空:卖出开空(side 填写 sell; posSide 填写 short ) 需要检验账户通道是否准备就绪 * 平多:卖出平多(side 填写 sell;posSide 填写 long ) * 平空:买入平空(side 填写 buy; posSide 填写 short ) 需要检验仓位通道是否准备就绪 */ try { JSONArray argsArray = new JSONArray(); JSONObject args = new JSONObject(); args.put("instId", instId); args.put("tdMode", tdMode); args.put("clOrdId", clOrdId); args.put("side", side); try { JSONArray argsArray = new JSONArray(); JSONObject args = new JSONObject(); args.put("instId", instId); args.put("tdMode", tdMode); args.put("clOrdId", clOrdId); args.put("side", side); args.put("posSide", posSide); args.put("ordType", ordType); args.put("sz", sz); args.put("px", markPx); argsArray.add(args); args.put("posSide", posSide); args.put("ordType", ordType); args.put("sz", sz); args.put("px", markPx); argsArray.add(args); String connId = WsParamBuild.getOrderNum(ORDERWS_CHANNEL); JSONObject jsonObject = WsParamBuild.buildJsonObject(connId, ORDERWS_CHANNEL, argsArray); webSocketClient.send(jsonObject.toJSONString()); log.info("发送下单频道:{},数量:{}", side, sz); String connId = WsParamBuild.getOrderNum(ORDERWS_CHANNEL); JSONObject jsonObject = WsParamBuild.buildJsonObject(connId, ORDERWS_CHANNEL, argsArray); webSocketClient.send(jsonObject.toJSONString()); log.info("发送下单频道:{},数量:{}", side, sz); } catch (Exception e) { log.error("下单构建失败", e); } } catch (Exception e) { log.error("下单构建失败", e); } }