| | |
| | | private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); |
| | | |
| | | private static final String WS_URL = "wss://ws.okx.com:8443/ws/v5/public"; |
| | | private static final String CHANNEL = "mark-price"; |
| | | private static final String CHANNEL_MARK_PRICE = "mark-price"; |
| | | private static final String CHANNEL_INDEX_TICKERS = "index-tickers"; |
| | | private static final String CHANNEL_OPEN_INTEREST = "open-interest"; |
| | | |
| | | private static final String[] INST_IDS = { |
| | | "BTC-USDT", "ETH-USDT", "XRP-USDT", "LTC-USDT", "BCH-USDT", "ETC-USDT" |
| | | }; |
| | | //BTC-USDT-SWAP |
| | | private static final String[] INST_IDS_INTEREST = { |
| | | "BTC-USDT-SWAP", "ETH-USDT-SWAP", "XRP-USDT-SWAP", "LTC-USDT-SWAP", "BCH-USDT-SWAP", "ETC-USDT-SWAP" |
| | | }; |
| | | |
| | | // 心跳超时时间(秒),小于30秒 |
| | |
| | | log.info("OKX New Price WebSocket连接成功"); |
| | | resetHeartbeatTimer(); |
| | | subscribeChannels(); |
| | | subscribeIndexChannels(); |
| | | subscribeOpenInterestChannels(); |
| | | } |
| | | |
| | | @Override |
| | |
| | | JSONArray argsArray = new JSONArray(); |
| | | for (String instId : INST_IDS) { |
| | | JSONObject arg = new JSONObject(); |
| | | arg.put("channel", CHANNEL); |
| | | arg.put("channel", CHANNEL_MARK_PRICE); |
| | | arg.put("instId", instId); |
| | | argsArray.add(arg); |
| | | } |
| | | |
| | | subscribeMsg.put("args", argsArray); |
| | | webSocketClient.send(subscribeMsg.toJSONString()); |
| | | log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size()); |
| | | } |
| | | |
| | | /** |
| | | * 订阅指定交易对的价格通道。 |
| | | * 构造订阅请求并发送给服务端。 |
| | | */ |
| | | private void subscribeIndexChannels() { |
| | | JSONObject subscribeMsg = new JSONObject(); |
| | | subscribeMsg.put("op", "subscribe"); |
| | | |
| | | JSONArray argsArray = new JSONArray(); |
| | | for (String instId : INST_IDS) { |
| | | JSONObject arg = new JSONObject(); |
| | | arg.put("channel", CHANNEL_INDEX_TICKERS); |
| | | arg.put("instId", instId); |
| | | argsArray.add(arg); |
| | | } |
| | | |
| | | subscribeMsg.put("args", argsArray); |
| | | webSocketClient.send(subscribeMsg.toJSONString()); |
| | | log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size()); |
| | | } |
| | | /** |
| | | * 订阅指定交易对的价格通道。 |
| | | * 构造订阅请求并发送给服务端。 |
| | | */ |
| | | private void subscribeOpenInterestChannels() { |
| | | JSONObject subscribeMsg = new JSONObject(); |
| | | subscribeMsg.put("op", "subscribe"); |
| | | |
| | | JSONArray argsArray = new JSONArray(); |
| | | for (String instId : INST_IDS_INTEREST) { |
| | | JSONObject arg = new JSONObject(); |
| | | arg.put("channel", CHANNEL_OPEN_INTEREST); |
| | | arg.put("instId", instId); |
| | | argsArray.add(arg); |
| | | } |
| | |
| | | * @param response 包含价格数据的 JSON 对象 |
| | | */ |
| | | private void processPushData(JSONObject response) { |
| | | try { |
| | | JSONArray dataArray = response.getJSONArray("data"); |
| | | if (dataArray != null && !dataArray.isEmpty()) { |
| | | for (int i = 0; i < dataArray.size(); i++) { |
| | | try { |
| | | JSONObject priceData = dataArray.getJSONObject(i); |
| | | String instId = priceData.getString("instId"); |
| | | String markPx = priceData.getString("markPx"); |
| | | String ts = priceData.getString("ts"); |
| | | JSONObject arg = response.getJSONObject("arg"); |
| | | if (arg == null) { |
| | | log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response); |
| | | return; |
| | | } |
| | | |
| | | String redisKey = buildRedisKey(instId); |
| | | redisUtils.set(redisKey, markPx); |
| | | String channel = arg.getString("channel"); |
| | | if (channel == null) { |
| | | log.warn("无效的推送数据,缺少 'channel' 字段{}",response); |
| | | return; |
| | | } |
| | | |
| | | String symbol = CoinTypeConvert.okxConvert(instId); |
| | | if (symbol != null) { |
| | | redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx); |
| | | websocketPriceService.comparePriceAsc(symbol, markPx); |
| | | websocketPriceService.comparePriceDesc(symbol, markPx); |
| | | if (CHANNEL_MARK_PRICE.equals(channel)) { |
| | | try { |
| | | JSONArray dataArray = response.getJSONArray("data"); |
| | | if (dataArray != null && !dataArray.isEmpty()) { |
| | | for (int i = 0; i < dataArray.size(); i++) { |
| | | try { |
| | | JSONObject priceData = dataArray.getJSONObject(i); |
| | | String instId = priceData.getString("instId"); |
| | | String markPx = priceData.getString("markPx"); |
| | | String ts = priceData.getString("ts"); |
| | | |
| | | String redisKey = buildRedisKey(instId); |
| | | redisUtils.set(redisKey, markPx); |
| | | |
| | | String symbol = CoinTypeConvert.okxConvert(instId); |
| | | if (symbol != null) { |
| | | redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx); |
| | | websocketPriceService.comparePriceAsc(symbol, markPx); |
| | | websocketPriceService.comparePriceDesc(symbol, markPx); |
| | | } |
| | | |
| | | log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts); |
| | | } catch (Exception innerEx) { |
| | | log.warn("处理单条价格数据失败", innerEx); |
| | | } |
| | | |
| | | log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts); |
| | | } catch (Exception innerEx) { |
| | | log.warn("处理单条价格数据失败", innerEx); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("处理价格推送数据失败", e); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("处理价格推送数据失败", e); |
| | | }else if (CHANNEL_INDEX_TICKERS.equals(channel)) { |
| | | try { |
| | | JSONArray dataArray = response.getJSONArray("data"); |
| | | if (dataArray != null && !dataArray.isEmpty()) { |
| | | for (int i = 0; i < dataArray.size(); i++) { |
| | | try { |
| | | JSONObject priceData = dataArray.getJSONObject(i); |
| | | String instId = priceData.getString("instId"); |
| | | String open24h = priceData.getString("open24h"); |
| | | String high24h = priceData.getString("high24h"); |
| | | String low24h = priceData.getString("low24h"); |
| | | String sodUtc0 = priceData.getString("sodUtc0"); |
| | | String sodUtc8 = priceData.getString("sodUtc8"); |
| | | String ts = priceData.getString("ts"); |
| | | |
| | | String redisKey = "open:" + buildRedisKey(instId); |
| | | redisUtils.set(redisKey, open24h); |
| | | |
| | | String symbol = CoinTypeConvert.okxConvert(instId); |
| | | if (symbol != null) { |
| | | redisUtils.set(CoinTypeConvert.convertToOpenKey(symbol), open24h); |
| | | } |
| | | |
| | | log.debug("更新开仓价格: {} = {}, 时间: {}", redisKey, open24h, ts); |
| | | } catch (Exception innerEx) { |
| | | log.warn("处理单条价格数据失败", innerEx); |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("处理价格推送数据失败", e); |
| | | } |
| | | }else if (CHANNEL_OPEN_INTEREST.equals(channel)) { |
| | | try { |
| | | JSONArray dataArray = response.getJSONArray("data"); |
| | | if (dataArray != null && !dataArray.isEmpty()) { |
| | | for (int i = 0; i < dataArray.size(); i++) { |
| | | try { |
| | | JSONObject priceData = dataArray.getJSONObject(i); |
| | | String instId = priceData.getString("instId"); |
| | | String oiUsd = priceData.getString("oiUsd"); |
| | | String ts = priceData.getString("ts"); |
| | | |
| | | String redisKey = "volume:" + buildRedisKey(instId); |
| | | redisUtils.set(redisKey, oiUsd); |
| | | |
| | | log.debug("更新持仓量: {} = {}, 时间: {}", redisKey, oiUsd, ts); |
| | | } catch (Exception innerEx) { |
| | | log.warn("处理单条价格数据失败", innerEx); |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("处理价格推送数据失败", e); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | } |
| | | |
| | | /** |