Administrator
2 days ago d09c2a58f85aafc92d6bff7a1131fd0b376eee03
feat(price): 新增开仓价和持仓量数据处理功能

- 在CoinTypeConvert工具类中添加convertToOpenKey和convertToVolumeKey方法
- 扩展OkxNewPriceWebSocketClient以支持指数价格和持仓量订阅
- 新增subscribeIndexChannels和subscribeOpenInterestChannels方法
- 修改processPushData方法以区分处理不同类型的数据推送
- 更新SymbolsServiceImpl以获取并展示开仓价和24小时持仓量数据
- 为不同WebSocket通道定义独立的常量标识符
- 完善Redis键值存储逻辑以适应新增数据类型
3 files modified
225 ■■■■ changed files
src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java 170 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/symbols/service/impl/SymbolsServiceImpl.java 13 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java 42 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
@@ -41,7 +41,9 @@
    private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
    private static final String WS_URL = "wss://ws.okx.com:8443/ws/v5/public";
    private static final String CHANNEL = "mark-price";
    private static final String CHANNEL_MARK_PRICE = "mark-price";
    private static final String CHANNEL_INDEX_TICKERS = "index-tickers";
    private static final String CHANNEL_OPEN_INTEREST = "open-interest";
    private static final String[] INST_IDS = {
            "EOS-USDT","BTC-USDT", "ETH-USDT", "XRP-USDT", "LTC-USDT", "BCH-USDT", "ETC-USDT"
@@ -98,6 +100,8 @@
                    log.info("OKX New Price WebSocket连接成功");
                    resetHeartbeatTimer();
                    subscribeChannels();
                    subscribeIndexChannels();
                    subscribeOpenInterestChannels();
                }
                @Override
@@ -146,7 +150,48 @@
        JSONArray argsArray = new JSONArray();
        for (String instId : INST_IDS) {
            JSONObject arg = new JSONObject();
            arg.put("channel", CHANNEL);
            arg.put("channel", CHANNEL_MARK_PRICE);
            arg.put("instId", instId);
            argsArray.add(arg);
        }
        subscribeMsg.put("args", argsArray);
        webSocketClient.send(subscribeMsg.toJSONString());
        log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size());
    }
    /**
     * 订阅指定交易对的价格通道。
     * 构造订阅请求并发送给服务端。
     */
    private void subscribeIndexChannels() {
        JSONObject subscribeMsg = new JSONObject();
        subscribeMsg.put("op", "subscribe");
        JSONArray argsArray = new JSONArray();
        for (String instId : INST_IDS) {
            JSONObject arg = new JSONObject();
            arg.put("channel", CHANNEL_INDEX_TICKERS);
            arg.put("instId", instId);
            argsArray.add(arg);
        }
        subscribeMsg.put("args", argsArray);
        webSocketClient.send(subscribeMsg.toJSONString());
        log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size());
    }
    /**
     * 订阅指定交易对的价格通道。
     * 构造订阅请求并发送给服务端。
     */
    private void subscribeOpenInterestChannels() {
        JSONObject subscribeMsg = new JSONObject();
        subscribeMsg.put("op", "subscribe");
        JSONArray argsArray = new JSONArray();
        for (String instId : INST_IDS) {
            JSONObject arg = new JSONObject();
            arg.put("channel", CHANNEL_OPEN_INTEREST);
            arg.put("instId", instId);
            argsArray.add(arg);
        }
@@ -190,35 +235,112 @@
     * @param response 包含价格数据的 JSON 对象
     */
    private void processPushData(JSONObject response) {
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray != null && !dataArray.isEmpty()) {
                for (int i = 0; i < dataArray.size(); i++) {
                    try {
                        JSONObject priceData = dataArray.getJSONObject(i);
                        String instId = priceData.getString("instId");
                        String markPx = priceData.getString("markPx");
                        String ts = priceData.getString("ts");
        JSONObject arg = response.getJSONObject("arg");
        if (arg == null) {
            log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response);
            return;
        }
                        String redisKey = buildRedisKey(instId);
                        redisUtils.set(redisKey, markPx);
        String channel = arg.getString("channel");
        if (channel == null) {
            log.warn("无效的推送数据,缺少 'channel' 字段{}",response);
            return;
        }
                        String symbol = CoinTypeConvert.okxConvert(instId);
                        if (symbol != null) {
                            redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
                            websocketPriceService.comparePriceAsc(symbol, markPx);
                            websocketPriceService.comparePriceDesc(symbol, markPx);
        if (CHANNEL_MARK_PRICE.equals(channel)) {
            try {
                JSONArray dataArray = response.getJSONArray("data");
                if (dataArray != null && !dataArray.isEmpty()) {
                    for (int i = 0; i < dataArray.size(); i++) {
                        try {
                            JSONObject priceData = dataArray.getJSONObject(i);
                            String instId = priceData.getString("instId");
                            String markPx = priceData.getString("markPx");
                            String ts = priceData.getString("ts");
                            String redisKey = buildRedisKey(instId);
                            redisUtils.set(redisKey, markPx);
                            String symbol = CoinTypeConvert.okxConvert(instId);
                            if (symbol != null) {
                                redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
                                websocketPriceService.comparePriceAsc(symbol, markPx);
                                websocketPriceService.comparePriceDesc(symbol, markPx);
                            }
                            log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
                        } catch (Exception innerEx) {
                            log.warn("处理单条价格数据失败", innerEx);
                        }
                        log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
                    } catch (Exception innerEx) {
                        log.warn("处理单条价格数据失败", innerEx);
                    }
                }
            } catch (Exception e) {
                log.error("处理价格推送数据失败", e);
            }
        } catch (Exception e) {
            log.error("处理价格推送数据失败", e);
        }else if (CHANNEL_INDEX_TICKERS.equals(channel)) {
            try {
                JSONArray dataArray = response.getJSONArray("data");
                if (dataArray != null && !dataArray.isEmpty()) {
                    for (int i = 0; i < dataArray.size(); i++) {
                        try {
                            JSONObject priceData = dataArray.getJSONObject(i);
                            String instId = priceData.getString("instId");
                            String open24h = priceData.getString("open24h");
                            String high24h = priceData.getString("high24h");
                            String low24h = priceData.getString("low24h");
                            String sodUtc0 = priceData.getString("sodUtc0");
                            String sodUtc8 = priceData.getString("sodUtc8");
                            String ts = priceData.getString("ts");
                            String redisKey = "open:" + buildRedisKey(instId);
                            redisUtils.set(redisKey, open24h);
                            String symbol = CoinTypeConvert.okxConvert(instId);
                            if (symbol != null) {
                                redisUtils.set(CoinTypeConvert.convertToOpenKey(symbol), open24h);
                            }
                            log.debug("更新开仓价格: {} = {}, 时间: {}", redisKey, open24h, ts);
                        } catch (Exception innerEx) {
                            log.warn("处理单条价格数据失败", innerEx);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("处理价格推送数据失败", e);
            }
        }else if (CHANNEL_OPEN_INTEREST.equals(channel)) {
            try {
                JSONArray dataArray = response.getJSONArray("data");
                if (dataArray != null && !dataArray.isEmpty()) {
                    for (int i = 0; i < dataArray.size(); i++) {
                        try {
                            JSONObject priceData = dataArray.getJSONObject(i);
                            String instId = priceData.getString("instId");
                            String oiUsd = priceData.getString("oiUsd");
                            String ts = priceData.getString("ts");
                            String redisKey = "volume:" + buildRedisKey(instId);
                            redisUtils.set(redisKey, oiUsd);
                            String symbol = CoinTypeConvert.okxConvert(instId);
                            if (symbol != null) {
                                redisUtils.set(CoinTypeConvert.convertToVolumeKey(symbol), oiUsd);
                            }
                            log.debug("更新持仓量: {} = {}, 时间: {}", redisKey, oiUsd, ts);
                        } catch (Exception innerEx) {
                            log.warn("处理单条价格数据失败", innerEx);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("处理价格推送数据失败", e);
            }
        }
    }
    /**
src/main/java/com/xcong/excoin/modules/symbols/service/impl/SymbolsServiceImpl.java
@@ -126,20 +126,17 @@
    public HomeSymbolsVo getSymbolReturnData(String symbol) {
        PlatformCnyUsdtExchangeEntity cnyUsdtExchange = platformCnyUsdtExchangeDao.getCNYAndUSDTOne();
        // 获取当日k线数据
//        Candlestick symbolObject = (Candlestick) redisUtils.get(symbol);
        // 获取当前币种最新价
        BigDecimal newestPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol)));
        // 获取当日k线的开盘价
//        BigDecimal openPrice = symbolObject.getOpen();
//        BigDecimal upOrDown = newestPrice.subtract(openPrice).divide(openPrice, 8, BigDecimal.ROUND_HALF_UP);
        BigDecimal openPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToOpenKey(symbol)));
        BigDecimal volume = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToVolumeKey(symbol)));
        BigDecimal upOrDown = newestPrice.subtract(openPrice).divide(openPrice, 8, BigDecimal.ROUND_HALF_UP);
        HomeSymbolsVo homeSymbolsVo = new HomeSymbolsVo();
        homeSymbolsVo.setSymbol(symbol);
        homeSymbolsVo.setCurrentPrice(newestPrice);
//        homeSymbolsVo.setUpOrDown(upOrDown);
//        homeSymbolsVo.setVolume(symbolObject.getAmount());
        homeSymbolsVo.setUpOrDown(upOrDown);
        homeSymbolsVo.setVolume(volume);
        if (cnyUsdtExchange != null) {
            BigDecimal cnyPrice = newestPrice.multiply(cnyUsdtExchange.getValue()).setScale(2, BigDecimal.ROUND_HALF_UP);
            homeSymbolsVo.setCnyPrice(cnyPrice);
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -53,6 +53,48 @@
        }
    }
    public static String convertToOpenKey(String symbol) {
        switch (symbol) {
            case "BTC/USDT":
                return "open:BTC_NEW_PRICE";
            case "ETH/USDT":
                return "open:ETH_NEW_PRICE";
            case "XRP/USDT":
                return "open:XRP_NEW_PRICE";
            case "LTC/USDT":
                return "open:LTC_NEW_PRICE";
            case "BCH/USDT":
                return "open:BCH_NEW_PRICE";
            case "ETC/USDT":
                return "open:ETC_NEW_PRICE";
            default:
                return null;
        }
    }
    public static String convertToVolumeKey(String symbol) {
        switch (symbol) {
            case "BTC/USDT":
                return "volume:BTC_NEW_PRICE";
            case "ETH/USDT":
                return "volume:ETH_NEW_PRICE";
            case "XRP/USDT":
                return "volume:XRP_NEW_PRICE";
            case "LTC/USDT":
                return "volume:LTC_NEW_PRICE";
            case "BCH/USDT":
                return "volume:BCH_NEW_PRICE";
            case "ETC/USDT":
                return "volume:ETC_NEW_PRICE";
            default:
                return null;
        }
    }
    public static String convertContractTypeToCoin(String symbol) {
        if (symbol.indexOf("/") > 0) {
            return symbol.substring(0, symbol.indexOf("/"));