| | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONArray; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums; |
| | | import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig; |
| | | import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; |
| | | import com.xcong.excoin.utils.CoinTypeConvert; |
| | | import com.xcong.excoin.utils.RedisUtils; |
| | |
| | | */ |
| | | @Slf4j |
| | | @Component |
| | | @ConditionalOnProperty(prefix = "app", name = "websocket", havingValue = "true") |
| | | @ConditionalOnProperty(prefix = "app", name = "quant", havingValue = "true") |
| | | public class OkxNewPriceWebSocketClient { |
| | | @Resource |
| | | private WebsocketPriceService websocketPriceService; |
| | |
| | | private volatile ScheduledFuture<?> pongTimeoutFuture; |
| | | private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); |
| | | |
| | | private static final String WS_URL = "wss://ws.okx.com:8443/ws/v5/public"; |
| | | private static final String CHANNEL = "mark-price"; |
| | | |
| | | private static final String[] INST_IDS = { |
| | | "BTC-USDT", "ETH-USDT", "XRP-USDT", "LTC-USDT", "BCH-USDT", "ETC-USDT" |
| | | }; |
| | | |
| | | // 心跳超时时间(秒),小于30秒 |
| | | private static final int HEARTBEAT_TIMEOUT = 10; |
| | |
| | | } |
| | | sharedExecutor.shutdownNow(); |
| | | } |
| | | private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/public"; |
| | | private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/public"; |
| | | private static final boolean isAccountType = true; |
| | | |
| | | /** |
| | | * 建立与 OKX WebSocket 服务器的连接。 |
| | |
| | | */ |
| | | private void connect() { |
| | | try { |
| | | SSLConfig.configureSSL(); |
| | | System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); |
| | | String WS_URL = WS_URL_MONIPAN; |
| | | if (isAccountType){ |
| | | WS_URL = WS_URL_SHIPAN; |
| | | } |
| | | URI uri = new URI(WS_URL); |
| | | webSocketClient = new WebSocketClient(uri) { |
| | | @Override |
| | |
| | | subscribeMsg.put("op", "subscribe"); |
| | | |
| | | JSONArray argsArray = new JSONArray(); |
| | | for (String instId : INST_IDS) { |
| | | JSONObject arg = new JSONObject(); |
| | | arg.put("channel", CHANNEL); |
| | | arg.put("instId", instId); |
| | | argsArray.add(arg); |
| | | } |
| | | JSONObject arg = new JSONObject(); |
| | | arg.put("channel", CHANNEL); |
| | | arg.put("instId", CoinEnums.HE_YUE.getCode()); |
| | | argsArray.add(arg); |
| | | |
| | | subscribeMsg.put("args", argsArray); |
| | | webSocketClient.send(subscribeMsg.toJSONString()); |
| | |
| | | String markPx = priceData.getString("markPx"); |
| | | String ts = priceData.getString("ts"); |
| | | |
| | | String redisKey = buildRedisKey(instId); |
| | | redisUtils.set(redisKey, markPx); |
| | | redisUtils.set(CoinEnums.HE_YUE.getCode(), markPx); |
| | | |
| | | String symbol = CoinTypeConvert.okxConvert(instId); |
| | | if (symbol != null) { |
| | | redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx); |
| | | websocketPriceService.comparePriceAsc(symbol, markPx); |
| | | websocketPriceService.comparePriceDesc(symbol, markPx); |
| | | } |
| | | |
| | | log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts); |
| | | log.debug("更新最新价格: {} = {}, 币种: {}", CoinEnums.HE_YUE.getCode(), markPx, instId); |
| | | } catch (Exception innerEx) { |
| | | log.warn("处理单条价格数据失败", innerEx); |
| | | } |