package com.xcong.excoin.modules.newPrice; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; /** * OKX 新价格 WebSocket 客户端类,用于连接 OKX 的 WebSocket 接口, * 实时获取并处理标记价格(mark price)数据,并将价格信息存储到 Redis 中。 * 同时支持心跳检测、自动重连以及异常恢复机制。 * @author Administrator */ @Slf4j @Component @ConditionalOnProperty(prefix = "app", name = "websocket", havingValue = "true") public class OkxNewPriceWebSocketClient { // @Resource // private WebsocketPriceService websocketPriceService; // @Resource // private RedisUtils redisUtils; // // private WebSocketClient webSocketClient; // private ScheduledExecutorService heartbeatExecutor; // private volatile ScheduledFuture pongTimeoutFuture; // private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); // // private static final String WS_URL = "wss://ws.okx.com:8443/ws/v5/public"; // private static final String CHANNEL_MARK_PRICE = "mark-price"; // private static final String CHANNEL_INDEX_TICKERS = "index-tickers"; // private static final String CHANNEL_OPEN_INTEREST = "open-interest"; // // private static final String[] INST_IDS = { // "BTC-USDT", "ETH-USDT", "XRP-USDT", "LTC-USDT", "BCH-USDT", "ETC-USDT" // }; // //BTC-USDT-SWAP // private static final String[] INST_IDS_INTEREST = { // "BTC-USDT-SWAP", "ETH-USDT-SWAP", "XRP-USDT-SWAP", "LTC-USDT-SWAP", "BCH-USDT-SWAP", "ETC-USDT-SWAP" // }; // // // 心跳超时时间(秒),小于30秒 // private static final int HEARTBEAT_TIMEOUT = 10; // // // 共享线程池用于重连等异步任务 // private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { // Thread t = new Thread(r, "okx-ws-shared-worker"); // t.setDaemon(true); // return t; // }); // // /** // * 初始化方法,在 Spring Bean 构造完成后执行。 // * 负责建立 WebSocket 连接并启动心跳检测任务。 // */ // @PostConstruct // public void init() { // connect(); // startHeartbeat(); // } // // /** // * 销毁方法,在 Spring Bean 销毁前执行。 // * 关闭 WebSocket 连接、停止心跳定时器及相关的线程资源。 // */ // @PreDestroy // public void destroy() { // if (webSocketClient != null && webSocketClient.isOpen()) { // webSocketClient.close(); // } // if (heartbeatExecutor != null) { // heartbeatExecutor.shutdownNow(); // } // if (pongTimeoutFuture != null) { // pongTimeoutFuture.cancel(true); // } // sharedExecutor.shutdownNow(); // } // // /** // * 建立与 OKX WebSocket 服务器的连接。 // * 设置回调函数以监听连接打开、接收消息、关闭和错误事件。 // */ // private void connect() { // try { // URI uri = new URI(WS_URL); // webSocketClient = new WebSocketClient(uri) { // @Override // public void onOpen(ServerHandshake handshake) { // log.info("OKX New Price WebSocket连接成功"); // resetHeartbeatTimer(); // subscribeChannels(); // subscribeIndexChannels(); // subscribeOpenInterestChannels(); // } // // @Override // public void onMessage(String message) { // lastMessageTime.set(System.currentTimeMillis()); // handleWebSocketMessage(message); // resetHeartbeatTimer(); // } // // @Override // public void onClose(int code, String reason, boolean remote) { // log.warn("OKX New Price WebSocket连接关闭: code={}, reason={}", code, reason); // cancelPongTimeout(); // // sharedExecutor.execute(() -> { // try { // reconnectWithBackoff(); // } catch (InterruptedException ignored) { // Thread.currentThread().interrupt(); // } catch (Exception e) { // log.error("重连失败", e); // } // }); // } // // @Override // public void onError(Exception ex) { // log.error("OKX New Price WebSocket发生错误", ex); // } // }; // // webSocketClient.connect(); // } catch (URISyntaxException e) { // log.error("WebSocket URI格式错误", e); // } // } // // /** // * 订阅指定交易对的价格通道。 // * 构造订阅请求并发送给服务端。 // */ // private void subscribeChannels() { // JSONObject subscribeMsg = new JSONObject(); // subscribeMsg.put("op", "subscribe"); // // JSONArray argsArray = new JSONArray(); // for (String instId : INST_IDS) { // JSONObject arg = new JSONObject(); // arg.put("channel", CHANNEL_MARK_PRICE); // arg.put("instId", instId); // argsArray.add(arg); // } // // subscribeMsg.put("args", argsArray); // webSocketClient.send(subscribeMsg.toJSONString()); // log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size()); // } // // /** // * 订阅指定交易对的价格通道。 // * 构造订阅请求并发送给服务端。 // */ // private void subscribeIndexChannels() { // JSONObject subscribeMsg = new JSONObject(); // subscribeMsg.put("op", "subscribe"); // // JSONArray argsArray = new JSONArray(); // for (String instId : INST_IDS) { // JSONObject arg = new JSONObject(); // arg.put("channel", CHANNEL_INDEX_TICKERS); // arg.put("instId", instId); // argsArray.add(arg); // } // // subscribeMsg.put("args", argsArray); // webSocketClient.send(subscribeMsg.toJSONString()); // log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size()); // } // /** // * 订阅指定交易对的价格通道。 // * 构造订阅请求并发送给服务端。 // */ // private void subscribeOpenInterestChannels() { // JSONObject subscribeMsg = new JSONObject(); // subscribeMsg.put("op", "subscribe"); // // JSONArray argsArray = new JSONArray(); // for (String instId : INST_IDS_INTEREST) { // JSONObject arg = new JSONObject(); // arg.put("channel", CHANNEL_OPEN_INTEREST); // arg.put("instId", instId); // argsArray.add(arg); // } // // subscribeMsg.put("args", argsArray); // webSocketClient.send(subscribeMsg.toJSONString()); // log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size()); // } // // /** // * 处理从 WebSocket 收到的消息。 // * 包括订阅确认、错误响应、心跳响应以及实际的数据推送。 // * // * @param message 来自 WebSocket 的原始字符串消息 // */ // private void handleWebSocketMessage(String message) { // try { // // if ("pong".equals(message)) { // log.debug("{}: 收到心跳响应"); // cancelPongTimeout(); // return; // } // JSONObject response = JSON.parseObject(message); // String event = response.getString("event"); // // if ("subscribe".equals(event)) { // log.info("价格订阅成功: {}", response.getJSONObject("arg")); // } else if ("error".equals(event)) { // log.error("价格订阅错误: code={}, msg={}", // response.getString("code"), response.getString("msg")); // } else { // processPushData(response); // } // } catch (Exception e) { // log.error("处理WebSocket消息失败: {}", message, e); // } // } // // /** // * 解析并处理价格推送数据。 // * 将最新的标记价格存入 Redis 并触发后续业务逻辑比较处理。 // * // * @param response 包含价格数据的 JSON 对象 // */ // private void processPushData(JSONObject response) { // JSONObject arg = response.getJSONObject("arg"); // if (arg == null) { // log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response); // return; // } // // String channel = arg.getString("channel"); // if (channel == null) { // log.warn("无效的推送数据,缺少 'channel' 字段{}",response); // return; // } // // if (CHANNEL_MARK_PRICE.equals(channel)) { // try { // JSONArray dataArray = response.getJSONArray("data"); // if (dataArray != null && !dataArray.isEmpty()) { // for (int i = 0; i < dataArray.size(); i++) { // try { // JSONObject priceData = dataArray.getJSONObject(i); // String instId = priceData.getString("instId"); // String markPx = priceData.getString("markPx"); // String ts = priceData.getString("ts"); // // String redisKey = buildRedisKey(instId); // redisUtils.set(redisKey, markPx); // // String symbol = CoinTypeConvert.okxConvert(instId); // if (symbol != null) { // redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx); // websocketPriceService.comparePriceAsc(symbol, markPx); // websocketPriceService.comparePriceDesc(symbol, markPx); // } // // log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts); // } catch (Exception innerEx) { // log.warn("处理单条价格数据失败", innerEx); // } // } // } // } catch (Exception e) { // log.error("处理价格推送数据失败", e); // } // }else if (CHANNEL_INDEX_TICKERS.equals(channel)) { // try { // JSONArray dataArray = response.getJSONArray("data"); // if (dataArray != null && !dataArray.isEmpty()) { // for (int i = 0; i < dataArray.size(); i++) { // try { // JSONObject priceData = dataArray.getJSONObject(i); // String instId = priceData.getString("instId"); // String open24h = priceData.getString("open24h"); // String high24h = priceData.getString("high24h"); // String low24h = priceData.getString("low24h"); // String sodUtc0 = priceData.getString("sodUtc0"); // String sodUtc8 = priceData.getString("sodUtc8"); // String ts = priceData.getString("ts"); // // String redisKey = "open:" + buildRedisKey(instId); // redisUtils.set(redisKey, open24h); // // String symbol = CoinTypeConvert.okxConvert(instId); // if (symbol != null) { // redisUtils.set(CoinTypeConvert.convertToOpenKey(symbol), open24h); // } // // log.debug("更新开仓价格: {} = {}, 时间: {}", redisKey, open24h, ts); // } catch (Exception innerEx) { // log.warn("处理单条价格数据失败", innerEx); // } // } // } // } catch (Exception e) { // log.error("处理价格推送数据失败", e); // } // }else if (CHANNEL_OPEN_INTEREST.equals(channel)) { // try { // JSONArray dataArray = response.getJSONArray("data"); // if (dataArray != null && !dataArray.isEmpty()) { // for (int i = 0; i < dataArray.size(); i++) { // try { // JSONObject priceData = dataArray.getJSONObject(i); // String instId = priceData.getString("instId"); // String oiUsd = priceData.getString("oiUsd"); // String ts = priceData.getString("ts"); // // String redisKey = "volume:" + buildRedisKey(instId); // redisUtils.set(redisKey, oiUsd); // // log.debug("更新持仓量: {} = {}, 时间: {}", redisKey, oiUsd, ts); // } catch (Exception innerEx) { // log.warn("处理单条价格数据失败", innerEx); // } // } // } // } catch (Exception e) { // log.error("处理价格推送数据失败", e); // } // } // // // // } // // /** // * 构建 Redis Key // */ // private String buildRedisKey(String instId) { // return "PRICE_" + instId.replace("-", ""); // } // // /** // * 启动心跳检测任务。 // * 使用 ScheduledExecutorService 定期检查是否需要发送 ping 请求来维持连接。 // */ // private void startHeartbeat() { // if (heartbeatExecutor != null) { // heartbeatExecutor.shutdownNow(); // } // // heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { // Thread t = new Thread(r, "okx-newprice-heartbeat"); // t.setDaemon(true); // return t; // }); // // heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); // } // // /** // * 重置心跳计时器。 // * 当收到新消息或发送 ping 后取消当前超时任务并重新安排下一次超时检查。 // */ // private synchronized void resetHeartbeatTimer() { // cancelPongTimeout(); // // if (heartbeatExecutor != null) { // pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, // HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); // } // } // // /** // * 检查心跳超时情况。 // * 若长时间未收到任何消息则主动发送 ping 请求保持连接活跃。 // */ // private void checkHeartbeatTimeout() { // long currentTime = System.currentTimeMillis(); // long lastTime = lastMessageTime.get(); // // if (currentTime - lastTime >= HEARTBEAT_TIMEOUT * 1000L) { // sendPing(); // } // } // // /** // * 发送 ping 请求至 WebSocket 服务端。 // * 用于维持长连接有效性。 // */ // private void sendPing() { // try { // if (webSocketClient != null && webSocketClient.isOpen()) { // webSocketClient.send("ping"); // log.debug("发送ping请求"); // } // } catch (Exception e) { // log.warn("发送ping失败", e); // } // } // // /** // * 取消当前的心跳超时任务。 // * 在收到 pong 或其他有效消息时调用此方法避免不必要的断开重连。 // */ // private synchronized void cancelPongTimeout() { // if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) { // pongTimeoutFuture.cancel(true); // } // } // // /** // * 执行 WebSocket 重连操作。 // * 在连接意外中断后尝试重新建立连接。 // */ // private void reconnectWithBackoff() throws InterruptedException { // int attempt = 0; // int maxAttempts = 5; // long delayMs = 5000; // // while (attempt < maxAttempts) { // try { // Thread.sleep(delayMs); // connect(); // return; // } catch (Exception e) { // log.warn("第{}次重连失败", attempt + 1, e); // delayMs *= 2; // attempt++; // } // } // // log.error("超过最大重试次数({})仍未连接成功", maxAttempts); // } }