| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
| 2025-12-17 | Administrator | ![]() |
src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
@@ -83,7 +83,7 @@ } private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/public"; private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/public"; private static final boolean isAccountType = true; private static final boolean isAccountType = false; /** * 建立与 OKX WebSocket 服务器的连接。 src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxNewPriceWebSocketClient.java
New file @@ -0,0 +1,453 @@ package com.xcong.excoin.modules.okxNewPrice; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService; import com.xcong.excoin.modules.okxNewPrice.OkxWebSocketClientManager; import com.xcong.excoin.modules.okxNewPrice.okxWs.TradeOrderWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums; import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig; import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * OKX 新价格 WebSocket 客户端类,用于连接 OKX 的 WebSocket 接口, * 实时获取并处理标记价格(mark price)数据,并将价格信息存储到 Redis 中。 * 同时支持心跳检测、自动重连以及异常恢复机制。 * @author Administrator */ @Slf4j public class OkxNewPriceWebSocketClient { private final RedisUtils redisUtils; private final CaoZuoService caoZuoService; private final OkxWebSocketClientManager clientManager; private WebSocketClient webSocketClient; private ScheduledExecutorService heartbeatExecutor; private volatile ScheduledFuture<?> pongTimeoutFuture; private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); // 连接状态标志 private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isConnecting = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false); private static final String CHANNEL = "mark-price"; // 心跳超时时间(秒),小于30秒 private static final int HEARTBEAT_TIMEOUT = 10; // 共享线程池用于重连等异步任务 private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "okx-ws-shared-worker"); t.setDaemon(true); return t; }); public OkxNewPriceWebSocketClient(RedisUtils redisUtils, CaoZuoService caoZuoService, OkxWebSocketClientManager clientManager) { this.redisUtils = redisUtils; this.caoZuoService = caoZuoService; this.clientManager = clientManager; } /** * 初始化方法,创建并初始化WebSocket客户端实例 */ public void init() { if (!isInitialized.compareAndSet(false, true)) { log.warn("OkxNewPriceWebSocketClient 已经初始化过,跳过重复初始化"); return; } connect(); startHeartbeat(); } /** * 销毁方法,关闭WebSocket连接和相关资源 */ public void destroy() { log.info("开始销毁OkxNewPriceWebSocketClient"); // 设置关闭标志,避免重连 if (sharedExecutor != null && !sharedExecutor.isShutdown()) { sharedExecutor.shutdown(); } if (webSocketClient != null && webSocketClient.isOpen()) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("关闭WebSocket连接时被中断"); } } shutdownExecutorGracefully(heartbeatExecutor); if (pongTimeoutFuture != null) { pongTimeoutFuture.cancel(true); } shutdownExecutorGracefully(sharedExecutor); log.info("OkxNewPriceWebSocketClient销毁完成"); } private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/public"; private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/public"; private static final boolean isAccountType = false; /** * 建立与 OKX WebSocket 服务器的连接。 * 设置回调函数以监听连接打开、接收消息、关闭和错误事件。 */ private void connect() { // 避免重复连接 if (isConnecting.get()) { log.info("连接已在进行中,跳过重复连接请求"); return; } if (!isConnecting.compareAndSet(false, true)) { log.info("连接已在进行中,跳过重复连接请求"); return; } try { SSLConfig.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); String WS_URL = WS_URL_MONIPAN; if (isAccountType){ WS_URL = WS_URL_SHIPAN; } URI uri = new URI(WS_URL); // 关闭之前的连接(如果存在) if (webSocketClient != null) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("关闭之前连接时被中断"); } } webSocketClient = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("OKX New Price WebSocket连接成功"); isConnected.set(true); isConnecting.set(false); // 检查应用是否正在关闭 if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); subscribeChannels(); } else { log.warn("应用正在关闭,忽略WebSocket连接成功回调"); } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleWebSocketMessage(message); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("OKX New Price WebSocket连接关闭: code={}, reason={}", code, reason); isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("重连线程被中断", e); } catch (Exception e) { log.error("重连失败", e); } }); } else { log.warn("共享线程池已关闭,无法执行重连任务"); } } @Override public void onError(Exception ex) { log.error("OKX New Price WebSocket发生错误", ex); isConnected.set(false); } }; webSocketClient.connect(); } catch (URISyntaxException e) { log.error("WebSocket URI格式错误", e); isConnecting.set(false); } } /** * 订阅指定交易对的价格通道。 * 构造订阅请求并发送给服务端。 */ private void subscribeChannels() { JSONObject subscribeMsg = new JSONObject(); subscribeMsg.put("op", "subscribe"); JSONArray argsArray = new JSONArray(); JSONObject arg = new JSONObject(); arg.put("channel", CHANNEL); arg.put("instId", CoinEnums.HE_YUE.getCode()); argsArray.add(arg); subscribeMsg.put("args", argsArray); webSocketClient.send(subscribeMsg.toJSONString()); log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size()); } /** * 处理从 WebSocket 收到的消息。 * 包括订阅确认、错误响应、心跳响应以及实际的数据推送。 * * @param message 来自 WebSocket 的原始字符串消息 */ private void handleWebSocketMessage(String message) { try { JSONObject response = JSON.parseObject(message); String event = response.getString("event"); if ("subscribe".equals(event)) { log.info("价格订阅成功: {}", response.getJSONObject("arg")); } else if ("error".equals(event)) { log.error("价格订阅错误: code={}, msg={}", response.getString("code"), response.getString("msg")); } else if ("pong".equals(event)) { log.debug("收到pong响应"); cancelPongTimeout(); } else { processPushData(response); } } catch (Exception e) { log.error("处理WebSocket消息失败: {}", message, e); } } /** * 解析并处理价格推送数据。 * 将最新的标记价格存入 Redis 并触发后续业务逻辑比较处理。 * 当价格变化时,调用CaoZuoService的caoZuo方法,触发所有账号的量化操作 * * @param response 包含价格数据的 JSON 对象 */ private void processPushData(JSONObject response) { try { JSONArray dataArray = response.getJSONArray("data"); if (dataArray != null && !dataArray.isEmpty()) { for (int i = 0; i < dataArray.size(); i++) { try { JSONObject priceData = dataArray.getJSONObject(i); String instId = priceData.getString("instId"); String markPx = priceData.getString("markPx"); // 保存价格到Redis redisUtils.set(CoinEnums.HE_YUE.getCode(), markPx); log.debug("更新最新价格: {} = {}, 币种: {}", CoinEnums.HE_YUE.getCode(), markPx, instId); // 价格变化时,触发所有账号的量化操作 triggerQuantOperations(markPx); } catch (Exception innerEx) { log.warn("处理单条价格数据失败", innerEx); } } } } catch (Exception e) { log.error("处理价格推送数据失败", e); } } /** * 触发所有账号的量化操作 * @param markPx 当前标记价格 */ private void triggerQuantOperations(String markPx) { try { // 获取所有OkxQuantWebSocketClient实例 for (OkxQuantWebSocketClient client : clientManager.getAllClients()) { // 由于OkxQuantWebSocketClient没有直接暴露账号名称的方法,我们需要从clientManager中获取 // 这里可以通过遍历clientMap的方式获取账号名称 // 或者修改OkxQuantWebSocketClient,添加getAccountName方法 // 暂时使用这种方式获取账号名称 String accountName = getAccountNameFromClient(client); if (accountName != null) { // 调用CaoZuoService的caoZuo方法,触发量化操作 String side = caoZuoService.caoZuo(accountName); TradeOrderWs.orderEvent(client.getWebSocketClient(), side, accountName); log.info("价格变化触发量化操作: 账号={}, 价格={}, 操作方向={}", accountName, markPx, side); } } } catch (Exception e) { log.error("触发量化操作失败", e); } } /** * 从OkxQuantWebSocketClient实例中获取账号名称 * 由于OkxQuantWebSocketClient没有直接暴露账号名称的方法,这里需要通过反射获取 * 更好的方式是修改OkxQuantWebSocketClient,添加getAccountName方法 */ private String getAccountNameFromClient(OkxQuantWebSocketClient client) { try { // 通过反射获取account字段的值 java.lang.reflect.Field accountField = OkxQuantWebSocketClient.class.getDeclaredField("account"); accountField.setAccessible(true); Object account = accountField.get(client); // 调用account的name()方法获取账号名称 java.lang.reflect.Method nameMethod = account.getClass().getMethod("name"); return (String) nameMethod.invoke(account); } catch (Exception e) { log.error("获取账号名称失败", e); return null; } } /** * 构建 Redis Key */ private String buildRedisKey(String instId) { return "PRICE_" + instId.replace("-", ""); } /** * 启动心跳检测任务。 * 使用 ScheduledExecutorService 定期检查是否需要发送 ping 请求来维持连接。 */ private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) { heartbeatExecutor.shutdownNow(); } heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "okx-newprice-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } /** * 重置心跳计时器。 * 当收到新消息或发送 ping 后取消当前超时任务并重新安排下一次超时检查。 */ private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } /** * 检查心跳超时情况。 * 若长时间未收到任何消息则主动发送 ping 请求保持连接活跃。 */ private void checkHeartbeatTimeout() { // 只有在连接状态下才检查心跳 if (!isConnected.get()) { return; } long currentTime = System.currentTimeMillis(); long lastTime = lastMessageTime.get(); if (currentTime - lastTime >= HEARTBEAT_TIMEOUT * 1000L) { sendPing(); } } /** * 发送 ping 请求至 WebSocket 服务端。 * 用于维持长连接有效性。 */ private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { JSONObject ping = new JSONObject(); ping.put("op", "ping"); webSocketClient.send(ping.toJSONString()); log.debug("发送ping请求"); } } catch (Exception e) { log.warn("发送ping失败", e); } } /** * 取消当前的心跳超时任务。 * 在收到 pong 或其他有效消息时调用此方法避免不必要的断开重连。 */ private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) { pongTimeoutFuture.cancel(true); } } /** * 执行 WebSocket 重连操作。 * 在连接意外中断后尝试重新建立连接。 */ private void reconnectWithBackoff() throws InterruptedException { int attempt = 0; int maxAttempts = 5; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } } log.error("超过最大重试次数({})仍未连接成功", maxAttempts); } /** * 优雅关闭线程池 */ private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) { return; } try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } } src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
@@ -31,8 +31,6 @@ */ @Slf4j public class OkxQuantWebSocketClient { private final WangGeService wangGeService; private final CaoZuoService caoZuoService; private final RedisUtils redisUtils; private final ExchangeInfoEnum account; @@ -45,11 +43,17 @@ private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isConnecting = new AtomicBoolean(false); public OkxQuantWebSocketClient(ExchangeInfoEnum account, WangGeService wangGeService, CaoZuoService caoZuoService, RedisUtils redisUtils) { /** * 获取WebSocketClient实例 * @return WebSocketClient实例 */ public WebSocketClient getWebSocketClient() { return webSocketClient; } public OkxQuantWebSocketClient(ExchangeInfoEnum account, RedisUtils redisUtils) { this.account = account; this.wangGeService = wangGeService; this.caoZuoService = caoZuoService; this.redisUtils = redisUtils; } @@ -372,8 +376,8 @@ OrderInfoWs.handleEvent(response, redisUtils, account.name()); }else if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) { AccountWs.handleEvent(response, account.name()); String side = caoZuoService.caoZuo(account.name()); TradeOrderWs.orderEvent(webSocketClient, side, account.name()); // String side = caoZuoService.caoZuo(account.name()); // TradeOrderWs.orderEvent(webSocketClient, side, account.name()); } else if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) { PositionsWs.handleEvent(response, account.name()); } else if (BalanceAndPositionWs.CHANNEL_NAME.equals(channel)) { src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
@@ -3,6 +3,7 @@ import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.ExchangeInfoEnum; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService; import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -22,14 +23,15 @@ @ConditionalOnProperty(prefix = "app", name = "quant", havingValue = "true") public class OkxWebSocketClientManager { @Autowired private WangGeService wangGeService; @Autowired private CaoZuoService caoZuoService; @Autowired private RedisUtils redisUtils; // 存储所有WebSocket客户端实例,key为账号类型名称 private final Map<String, OkxQuantWebSocketClient> clientMap = new ConcurrentHashMap<>(); // 存储所有OkxQuantWebSocketClient实例,key为账号类型名称 private final Map<String, OkxQuantWebSocketClient> quantClientMap = new ConcurrentHashMap<>(); // 存储OkxNewPriceWebSocketClient实例 private OkxNewPriceWebSocketClient newPriceClient; /** * 初始化方法,在Spring Bean构造完成后执行 @@ -39,14 +41,23 @@ public void init() { log.info("开始初始化OkxWebSocketClientManager"); // 初始化价格WebSocket客户端 try { newPriceClient = new OkxNewPriceWebSocketClient(redisUtils, caoZuoService, this); newPriceClient.init(); log.info("已初始化OkxNewPriceWebSocketClient"); } catch (Exception e) { log.error("初始化OkxNewPriceWebSocketClient失败", e); } // 获取所有ExchangeInfoEnum枚举值 ExchangeInfoEnum[] accounts = ExchangeInfoEnum.values(); // 为每个账号创建一个WebSocket客户端实例 for (ExchangeInfoEnum account : accounts) { try { OkxQuantWebSocketClient client = new OkxQuantWebSocketClient(account, wangGeService, caoZuoService, redisUtils); clientMap.put(account.name(), client); OkxQuantWebSocketClient client = new OkxQuantWebSocketClient(account, redisUtils); quantClientMap.put(account.name(), client); client.init(); log.info("已初始化账号 {} 的WebSocket客户端", account.name()); } catch (Exception e) { @@ -65,8 +76,18 @@ public void destroy() { log.info("开始销毁OkxWebSocketClientManager"); // 关闭所有客户端实例 for (Map.Entry<String, OkxQuantWebSocketClient> entry : clientMap.entrySet()) { // 关闭价格WebSocket客户端 if (newPriceClient != null) { try { newPriceClient.destroy(); log.info("已销毁OkxNewPriceWebSocketClient"); } catch (Exception e) { log.error("销毁OkxNewPriceWebSocketClient失败", e); } } // 关闭所有量化交易WebSocket客户端实例 for (Map.Entry<String, OkxQuantWebSocketClient> entry : quantClientMap.entrySet()) { try { OkxQuantWebSocketClient client = entry.getValue(); client.destroy(); @@ -77,25 +98,33 @@ } // 清空客户端映射 clientMap.clear(); quantClientMap.clear(); log.info("OkxWebSocketClientManager销毁完成"); } /** * 获取指定账号的WebSocket客户端实例 * 获取指定账号的OkxQuantWebSocketClient实例 * @param accountName 账号类型名称 * @return WebSocket客户端实例 */ public OkxQuantWebSocketClient getClient(String accountName) { return clientMap.get(accountName); return quantClientMap.get(accountName); } /** * 获取所有WebSocket客户端实例 * 获取所有OkxQuantWebSocketClient实例 * @return 所有客户端实例的集合 */ public Collection<OkxQuantWebSocketClient> getAllClients() { return clientMap.values(); return quantClientMap.values(); } /** * 获取OkxNewPriceWebSocketClient实例 * @return 价格WebSocket客户端实例 */ public OkxNewPriceWebSocketClient getNewPriceClient() { return newPriceClient; } } src/main/java/com/xcong/excoin/modules/okxNewPrice/celue/CaoZuoService.java
@@ -7,7 +7,7 @@ String caoZuo(String accountName); String caoZuoLong(String accountName); String caoZuoLong(String accountName,String markPx); String caoZuoShort(String accountName); String caoZuoShort(String accountName,String markPx); } src/main/java/com/xcong/excoin/modules/okxNewPrice/celue/CaoZuoServiceImpl.java
@@ -9,8 +9,6 @@ import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListQueue; import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListService; import com.xcong.excoin.modules.okxNewPrice.utils.WsMapBuild; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeQueue; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService; import com.xcong.excoin.rabbit.pricequeue.AscBigDecimal; import com.xcong.excoin.rabbit.pricequeue.DescBigDecimal; import com.xcong.excoin.utils.RedisUtils; @@ -34,7 +32,6 @@ @RequiredArgsConstructor public class CaoZuoServiceImpl implements CaoZuoService { private final WangGeService wangGeService; private final WangGeListService wangGeListService; private final RedisUtils redisUtils; @@ -52,17 +49,25 @@ return null; } String markPx = ObjectUtil.isEmpty(redisUtils.getString(CoinEnums.HE_YUE.getCode())) ? "0" : redisUtils.getString(CoinEnums.HE_YUE.getCode()); log.info("当前价格: {}", markPx); WangGeListEnum gridByPrice = WangGeListEnum.getGridByPrice(new BigDecimal(markPx)); if (gridByPrice == null){ log.error("没有获取到网格参数......"); return null; } log.info("当前网格: {}", gridByPrice.name()); Map<String, String> accountMap = InstrumentsWs.getAccountMap(accountName); String wanggeName = accountMap.get(CoinEnums.WANG_GE_OLD.name()); PriorityBlockingQueue<AscBigDecimal> ascBigDecimals = wangGeListService.initWangGe(markPx); if (ascBigDecimals == null){ log.error("没有获取到网格队列......"); return null; } /** * 如果下单的网格不属于同一个网格,则先止损掉老的网格的仓位 */ Map<String, String> accountMap = InstrumentsWs.getAccountMap(accountName); String wanggeName = accountMap.get(CoinEnums.WANG_GE_OLD.name()); if (StrUtil.isNotEmpty(wanggeName) && !wanggeName.equals(gridByPrice.name())){ log.error("正在止损老的网格仓位......"); WangGeListEnum oldWangge = WangGeListEnum.getByName(wanggeName); @@ -79,7 +84,29 @@ ? BigDecimal.ZERO : PositionsWs.getAccountMap(positionAccountName).get(CoinEnums.READY_STATE.name()); if (WsMapBuild.parseBigDecimalSafe(CoinEnums.READY_STATE_YES.getCode()).compareTo(positionsReadyState) != 0) { log.info("仓位{}通道未就绪,取消发送",positionAccountName); return null; // 判断是否保证金超标 if (PositionsWs.getAccountMap(positionAccountName).get("imr") == null){ log.error("没有获取到持仓信息,等待初始化......"); return null; } BigDecimal ordFrozImr = PositionsWs.getAccountMap(positionAccountName).get("imr"); BigDecimal totalOrderUsdt = WsMapBuild.parseBigDecimalSafe(AccountWs.getAccountMap(accountName).get(CoinEnums.TOTAL_ORDER_USDT.name())); if (ordFrozImr.compareTo(totalOrderUsdt) >= 0){ log.error("已满仓......"); return OrderParamEnums.HOLDING.getValue(); } if (PositionsWs.getAccountMap(positionAccountName).get("pos") == null){ log.error("没有获取到持仓信息,等待初始化......"); return null; } BigDecimal pos = PositionsWs.getAccountMap(positionAccountName).get("pos"); if (BigDecimal.ZERO.compareTo( pos) >= 0) { log.error("持仓数量为零,进行初始化订单"); return OrderParamEnums.INIT.getValue(); }else{ log.error("仓位有持仓,等待持仓更新"); return null; } } // 系统设置的开关,等于冷静中,则代表不开仓 String outStr = InstrumentsWs.getAccountMap(accountName).get(CoinEnums.OUT.name()); @@ -121,16 +148,6 @@ return OrderParamEnums.HOLDING.getValue(); } } if (PositionsWs.getAccountMap(positionAccountName).get("pos") == null){ log.error("没有获取到持仓信息,等待初始化......"); return null; } BigDecimal pos = PositionsWs.getAccountMap(positionAccountName).get("pos"); if (BigDecimal.ZERO.compareTo( pos) >= 0) { log.error("持仓数量为零,进行初始化订单"); return OrderParamEnums.INIT.getValue(); } // 判断是否保证金超标 if (PositionsWs.getAccountMap(positionAccountName).get("imr") == null){ log.error("没有获取到持仓信息,等待初始化......"); @@ -143,15 +160,19 @@ return OrderParamEnums.HOLDING.getValue(); } PriorityBlockingQueue<AscBigDecimal> ascBigDecimals = wangGeListService.initWangGe(markPx); if (ascBigDecimals == null){ log.error("没有获取到网格队列......"); if (PositionsWs.getAccountMap(positionAccountName).get("pos") == null){ log.error("没有获取到持仓信息,等待初始化......"); return null; } BigDecimal pos = PositionsWs.getAccountMap(positionAccountName).get("pos"); if (BigDecimal.ZERO.compareTo( pos) >= 0) { log.error("持仓数量为零,进行初始化订单"); return OrderParamEnums.INIT.getValue(); } if (CoinEnums.POSSIDE_LONG.getCode().equals(posSide)){ return caoZuoLong(accountName); return caoZuoLong(accountName,markPx); }else if (CoinEnums.POSSIDE_SHORT.getCode().equals(posSide)){ return caoZuoShort(accountName); return caoZuoShort(accountName,markPx); }else{ log.error("账户未设置持仓方向......"); return null; @@ -159,13 +180,14 @@ } @Override public String caoZuoLong(String accountName) { public String caoZuoLong(String accountName,String markPxStr) { log.info("开始看涨执行操作CaoZuoServiceImpl......"); try { String positionAccountName = PositionsWs.initAccountName(accountName, CoinEnums.POSSIDE_LONG.getCode()); // 获取标记价格和平均持仓价格 BigDecimal markPx = PositionsWs.getAccountMap(positionAccountName).get("markPx"); // BigDecimal markPx = PositionsWs.getAccountMap(positionAccountName).get("markPx"); BigDecimal markPx = new BigDecimal(markPxStr); BigDecimal avgPx = PositionsWs.getAccountMap(positionAccountName).get("avgPx"); log.info("开仓价格: {}, 当前价格:{},匹配队列中......", avgPx, markPx); @@ -176,16 +198,18 @@ // 处理订单价格在队列中的情况 String orderPrice = OrderInfoWs.getAccountMap(accountName).get("orderPrice"); log.info("订单价格: {}", orderPrice); handleOrderPriceInQueues(orderPrice, queueKaiCang, queuePingCang); // 判断是加仓还是减仓 if (avgPx.compareTo(markPx) > 0) { log.info("开始加仓..."); if (queueKaiCang.isEmpty()) { // 队列为空 // log.info("开始加仓,但是超出了网格设置..."); log.info("开始加仓,但是超出了网格设置..."); return OrderParamEnums.HOLDING.getValue(); } DescBigDecimal kaiCang = queueKaiCang.peek(); log.info("下限队列价格{}", kaiCang.getValue()); if (kaiCang != null && markPx.compareTo(kaiCang.getValue()) <= 0 && avgPx.compareTo(kaiCang.getValue()) >= 0) { log.info("开始加仓...下限队列价格大于当前价格{}>{}", kaiCang.getValue(), markPx); WsMapBuild.saveStringToMap(OrderInfoWs.getAccountMap(accountName), "orderPrice", String.valueOf(markPx)); @@ -209,10 +233,11 @@ return OrderParamEnums.HOLDING.getValue(); } AscBigDecimal pingCang = queuePingCang.peek(); log.info("上限队列价格:{}", pingCang.getValue()); if (pingCang != null && avgPx.compareTo(pingCang.getValue()) < 0) { log.info("开始减仓...上限队列价格大于开仓价格{}>{}", pingCang.getValue(), avgPx); // 手续费 BigDecimal feeValue = PositionsWs.getAccountMap(positionAccountName).get("fee").multiply(new BigDecimal("2")); BigDecimal feeValue = PositionsWs.getAccountMap(positionAccountName).get("fee"); //未实现收益 BigDecimal uplValue = PositionsWs.getAccountMap(positionAccountName).get("upl"); //已实现收益 @@ -258,14 +283,15 @@ } @Override public String caoZuoShort(String accountName) { public String caoZuoShort(String accountName,String markPxStr) { log.info("开始看空执行操作CaoZuoServiceImpl......"); try { String positionAccountName = PositionsWs.initAccountName(accountName, CoinEnums.POSSIDE_SHORT.getCode()); // 获取标记价格和平均持仓价格 BigDecimal markPx = PositionsWs.getAccountMap(positionAccountName).get("markPx"); // BigDecimal markPx = PositionsWs.getAccountMap(positionAccountName).get("markPx"); BigDecimal markPx = new BigDecimal(markPxStr); BigDecimal avgPx = PositionsWs.getAccountMap(positionAccountName).get("avgPx"); log.info("开仓价格: {}, 当前价格:{},匹配队列中......", avgPx, markPx); @@ -276,21 +302,23 @@ // 处理订单价格在队列中的情况 String orderPrice = OrderInfoWs.getAccountMap(accountName).get("orderPrice"); log.info("订单价格:{}", orderPrice); handleOrderPriceInQueues(orderPrice, queueKaiCang, queuePingCang); // 判断是加仓还是减仓 if (avgPx.compareTo(markPx) > 0) { log.info("开始减仓..."); if (queueKaiCang.isEmpty()) { // 队列为空 // log.info("开始加仓,但是超出了网格设置..."); log.info("开始减仓,但是超出了网格设置..."); return OrderParamEnums.HOLDING.getValue(); } DescBigDecimal kaiCang = queueKaiCang.peek(); log.info("下限队列价格{}", kaiCang.getValue()); if (kaiCang != null && avgPx.compareTo(kaiCang.getValue()) >= 0) { log.info("开始减仓...下限队列价格小于开仓价格{}<{}", kaiCang.getValue(), avgPx); // 手续费 BigDecimal feeValue = PositionsWs.getAccountMap(positionAccountName).get("fee").multiply(new BigDecimal("2")); BigDecimal feeValue = PositionsWs.getAccountMap(positionAccountName).get("fee"); //未实现收益 BigDecimal uplValue = PositionsWs.getAccountMap(positionAccountName).get("upl"); //已实现收益 @@ -334,6 +362,7 @@ return OrderParamEnums.HOLDING.getValue(); } AscBigDecimal pingCang = queuePingCang.peek(); log.info("上限队列价格: {}", pingCang.getValue()); if (pingCang != null && markPx.compareTo(pingCang.getValue()) >= 0 && avgPx.compareTo(pingCang.getValue()) < 0) { log.info("开始加仓...上限队列价格小于当前价格{}<={}", pingCang.getValue(), markPx); WsMapBuild.saveStringToMap(OrderInfoWs.getAccountMap(accountName), "orderPrice", String.valueOf(markPx)); @@ -361,8 +390,11 @@ private boolean buyCntTimeLongEvent(String accountName, BigDecimal avgPx, BigDecimal markPx){ //判断当前价格和开仓价格直接间隔除以间距,取整,获取的数量是否大于等于0,如果大于0,则下单基础张数*倍数 String buyCntTime = InstrumentsWs.getAccountMap(accountName).get(CoinEnums.BUY_CNT_TIME.name()); log.info("倍数次数间隔{}", buyCntTime); BigDecimal subtract = avgPx.subtract(markPx); log.info("倍数价格差距{}", subtract); BigDecimal divide = subtract.divide(new BigDecimal(buyCntTime), 0, RoundingMode.DOWN).add(BigDecimal.ONE); log.info("倍数次数{}", divide); if (divide.compareTo(BigDecimal.ZERO) <= 0){ log.warn("加仓次数间隔时间小于0,不加仓"); return false; @@ -373,8 +405,11 @@ private boolean buyCntTimeShortEvent(String accountName, BigDecimal avgPx, BigDecimal markPx){ //判断当前价格和开仓价格直接间隔除以间距,取整,获取的数量是否大于等于0,如果大于0,则下单基础张数*倍数 String buyCntTime = InstrumentsWs.getAccountMap(accountName).get(CoinEnums.BUY_CNT_TIME.name()); log.info("倍数次数间隔{}", buyCntTime); BigDecimal subtract = markPx.subtract(avgPx); log.info("倍数价格差距{}", subtract); BigDecimal divide = subtract.divide(new BigDecimal(buyCntTime), 0, RoundingMode.DOWN).add(BigDecimal.ONE); log.info("倍数次数{}", divide); if (divide.compareTo(BigDecimal.ZERO) <= 0){ log.warn("加仓次数间隔时间小于0,不加仓"); return false; @@ -410,34 +445,34 @@ queueKaiCang.removeIf(item -> item.getValue().compareTo(priceDecimal) >= 0); // 打印开仓队列 // StringBuilder kaiCangStr = new StringBuilder(); // kaiCangStr.append("开仓队列: ["); // boolean first = true; // for (DescBigDecimal item : queueKaiCang) { // if (!first) { // kaiCangStr.append(", "); // } // kaiCangStr.append(item.getValue()); // first = false; // } // kaiCangStr.append("]"); // log.info(kaiCangStr.toString()); StringBuilder kaiCangStr = new StringBuilder(); kaiCangStr.append("下限队列: ["); boolean first = true; for (DescBigDecimal item : queueKaiCang) { if (!first) { kaiCangStr.append(", "); } kaiCangStr.append(item.getValue()); first = false; } kaiCangStr.append("]"); log.info(kaiCangStr.toString()); // 删除比该价格小的数据 queuePingCang.removeIf(item -> item.getValue().compareTo(priceDecimal) <= 0); // 打印平仓队列 // StringBuilder pingCangStr = new StringBuilder(); // pingCangStr.append("平仓队列: ["); // first = true; // for (AscBigDecimal item : queuePingCang) { // if (!first) { // pingCangStr.append(", "); // } // pingCangStr.append(item.getValue()); // first = false; // } // pingCangStr.append("]"); // log.info(pingCangStr.toString()); StringBuilder pingCangStr = new StringBuilder(); pingCangStr.append("上限队列: ["); first = true; for (AscBigDecimal item : queuePingCang) { if (!first) { pingCangStr.append(", "); } pingCangStr.append(item.getValue()); first = false; } pingCangStr.append("]"); log.info(pingCangStr.toString()); } } src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/AccountWs.java
@@ -96,6 +96,9 @@ for (int j = 0; j < detailsArray.size(); j++) { JSONObject detail = detailsArray.getJSONObject(j); initParam(detail, accountName); Map<String, String> accountMap = getAccountMap(accountName); WsMapBuild.saveStringToMap(accountMap, CoinEnums.READY_STATE.name(), CoinEnums.READY_STATE_YES.getCode()); } } catch (Exception innerEx) { log.warn("处理账户频道数据失败", innerEx); src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/OrderInfoWs.java
@@ -109,6 +109,7 @@ Map<String, String> accountMap = getAccountMap(accountName); //记录成交均价 if (accountMap.get("orderPrice") == null){ log.info("成交均价: {}", avgPx); WsMapBuild.saveStringToMap(accountMap, "orderPrice",avgPx); } WsMapBuild.saveStringToMap(TradeOrderWs.getAccountMap(accountName), "state", CoinEnums.ORDER_LIVE.getCode()); @@ -116,6 +117,7 @@ //保存上一个网格信息 WangGeListEnum gridByPrice = WangGeListEnum.getGridByPrice(new BigDecimal(avgPx)); if (gridByPrice != null){ log.info("保存上一个网格: {}", gridByPrice.name()); Map<String, String> instrumentsMap = InstrumentsWs.getAccountMap(accountName); WsMapBuild.saveStringToMap(instrumentsMap, CoinEnums.WANG_GE_OLD.name(), gridByPrice.name()); } src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/PositionsWs.java
@@ -108,6 +108,10 @@ markPx,fee,fundingFee ); initParam(posData, accountName,posSide); String accountNamePositons = initAccountName(accountName, posSide); Map<String, BigDecimal> accountMap = getAccountMap(accountNamePositons); WsMapBuild.saveBigDecimalToMap(accountMap, CoinEnums.READY_STATE.name(), WsMapBuild.parseBigDecimalSafe(CoinEnums.READY_STATE_YES.getCode())); } } } catch (Exception e) { @@ -128,7 +132,5 @@ WsMapBuild.saveBigDecimalToMap(accountMap, "realizedPnl", WsMapBuild.parseBigDecimalSafe(posData.getString("realizedPnl"))); WsMapBuild.saveBigDecimalToMap(accountMap, "fee", WsMapBuild.parseBigDecimalSafe(posData.getString("fee"))); WsMapBuild.saveBigDecimalToMap(accountMap, "fundingFee", WsMapBuild.parseBigDecimalSafe(posData.getString("fundingFee"))); WsMapBuild.saveBigDecimalToMap(accountMap, CoinEnums.READY_STATE.name(), WsMapBuild.parseBigDecimalSafe(CoinEnums.READY_STATE_YES.getCode())); } } src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/wanggeList/WangGeListEnum.java
@@ -11,10 +11,10 @@ */ @Getter public enum WangGeListEnum { UP("上层做空", "2", "2950", "2940", "2", "short", "2950"), CENTER("中间指定一个方向", "2", "2940", "2930", "2", "long", "2930"), DOWN("下层做多", "2", "2930", "2920", "2", "long", "2920"), DOWN_ONE("下层做空", "2", "2920", "2910", "2", "short", "2920"); UP("上层做空", "2", "3100", "3000", "2", "short", "3100"), CENTER("中间指定一个方向", "2", "3000", "2950", "2", "long", "2950"), DOWN("下层做空", "2", "2950", "2920", "2", "short", "2950"), DOWN_ONE("下层做多", "2", "2920", "2900", "2", "long", "2900"); private String name; private String xiaoshu_weishu; src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/wanggeList/WangGeListServiceImpl.java
@@ -34,15 +34,20 @@ //获取WangGeListEnum全部网格参数 WangGeListEnum gridByPrice = WangGeListEnum.getGridByPrice(new BigDecimal(markPx)); log.info("获取的网格参数: {}", gridByPrice); if (gridByPrice == null){ log.error("没有获取到网格参数......"); return null; } String shangxianValue = gridByPrice.getJiage_shangxian(); log.info("价格上限: {}", shangxianValue); String xiaxianValue = gridByPrice.getJiage_xiaxian(); log.info("价格下限: {}", xiaxianValue); String jianjuValue = gridByPrice.getJian_ju(); log.info("价格间隔: {}", jianjuValue); String weishuValueStr = gridByPrice.getXiaoshu_weishu(); log.info("价格位数: {}", weishuValueStr); try { BigDecimal shangxian = new BigDecimal(shangxianValue); @@ -104,7 +109,7 @@ } } StringBuilder kaiCangStr = new StringBuilder(); kaiCangStr.append("下限队列: ["); kaiCangStr.append("初始化下限队列: ["); boolean first = true; for (DescBigDecimal item : queueKaiCang) { if (!first) { @@ -140,7 +145,7 @@ } StringBuilder pingCangStr = new StringBuilder(); pingCangStr.append("上限队列: ["); pingCangStr.append("初始化上限队列: ["); boolean first = true; for (AscBigDecimal item : queuePingCang) { if (!first) {