zainali5120
2020-09-14 74ca5bc0f40e3b91464c8972392271d24dd5f066
撮合交易代码提交
9 files modified
100 ■■■■ changed files
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java 4 ●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java 8 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java 9 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java 10 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java 17 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java 12 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/CoinTrader.java 10 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java 12 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml 18 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
@@ -15,5 +15,7 @@
    OrderCoinsEntity findWalletCoinOrderByOrderNo(@Param("orderNo")String orderNo);
    List<OrderCoinsEntity> selectAllEntrustingCoinOrderList();
    List<OrderCoinsEntity> selectAllEntrustingCoinOrderList(List<String> list);
    List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list);
}
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -744,7 +744,9 @@
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void dealEntrustCoinOrder() {
        List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList();
        List<String> ignoreTypes = new ArrayList<>();
        ignoreTypes.add("NEKK");
        List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList(ignoreTypes);
        if (CollUtil.isNotEmpty(list)) {
            for (OrderCoinsEntity orderCoinsEntity : list) {
                BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(orderCoinsEntity.getSymbol() + "/USDT")));
@@ -813,7 +815,9 @@
    public void handleOrder(List<ExchangeTrade> trades){
        // 处理撮合交易的订单
        for(ExchangeTrade exchangeTrade : trades){
            if(exchangeTrade==null){
                continue;
            }
            BigDecimal amount = exchangeTrade.getAmount();
            Long buyOrderId = exchangeTrade.getBuyOrderId();
            BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
@@ -39,10 +39,14 @@
        CoinProcessor processor = processorFactory.getProcessor(symbol);
        Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap();
        Collection<Candlestick> values = currentKlineMap.values();
        BigDecimal newPrice = trades.get(trades.size()-1).getPrice();
        BigDecimal newPrice =null;
        for (Candlestick candlestick : values) {
            for (ExchangeTrade exchangeTrade : trades) {
                if(exchangeTrade==null){
                    continue;
                }
                processor.processTrade(candlestick, exchangeTrade);
                newPrice=exchangeTrade.getPrice();
            }
        }
@@ -51,6 +55,9 @@
        key = StrUtil.format(key, symbolUsdt);
        redisUtils.set(key,currentKlineMap);
        // 更新最新价
        if(newPrice!=null){
        redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice);
    }
    }
}
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -14,6 +14,7 @@
import com.xcong.excoin.processor.DefaultCoinProcessor;
import com.xcong.excoin.processor.MarketService;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
import com.xcong.excoin.rabbit.producer.ExchangeProducer;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.utils.CoinTypeConvert;
@@ -56,12 +57,15 @@
    @Resource
    private CoinProcessorFactory processorFactory;
    @Resource
    ExchangeProducer exchangeProducer;
    @PostConstruct
    public void initCoinTrade() {
        log.info("#=======撮合交易器开启=======#");
        String symbol = "NEKK";
        CoinTrader newTrader = new CoinTrader(symbol);
        newTrader.setOrderCoinService(coinService);
        newTrader.setExchangeProducer(exchangeProducer);
        //newTrader.setKafkaTemplate(kafkaTemplate);
        //newTrader.setBaseCoinScale(coin.getBaseCoinScale());
        //newTrader.setCoinScale(coin.getCoinScale());
@@ -70,7 +74,9 @@
        // 创建成功以后需要对未处理订单预处理
        log.info("======CoinTrader Process: " + symbol + "======");
        List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
        List<String> symbolList = new ArrayList<>();
        symbolList.add(symbol);
        List<OrderCoinsEntity> orders = orderCoinsDao.selectCoinOrderOnTrade(symbolList);
        List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
        orders.forEach(order -> {
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -33,9 +33,9 @@
    @Resource
    private RedisUtils redisUtils;
    @Scheduled(cron = "0/2 * * * * *")
    @Scheduled(cron = "0/10 * * * * *")
    public void tradePlate(){
        redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()));
        redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20));
        Candlestick candlestick = new Candlestick();
        candlestick.setOpen(new BigDecimal("10.33"));
        candlestick.setHigh(new BigDecimal("15.23"));
@@ -46,7 +46,7 @@
        candlestick.setId(1599840000L);
        candlestick.setCount(100002);
        candlestick.setClose(new BigDecimal("12.2323"));
        redisUtils.set("NEKK/USDT",candlestick);
        //redisUtils.set("NEKK/USDT",candlestick);
        // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]]
        TradePlateModel tradePlateModel = new TradePlateModel();
        List<BigDecimal> buy;
@@ -61,11 +61,11 @@
            sell.add(new BigDecimal(Math.random()*i*2));
            tradePlateModel.getSell().add(sell);
        }
        plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null);
        plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
        plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
        plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
        log.info("准备发送消息");
        plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null);
        plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
        plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
        plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
    }
    /**
@@ -156,4 +156,5 @@
            }
        });
    }
}
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -14,6 +14,7 @@
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,8 +45,8 @@
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
    public void tradePlate(String content) {
        log.info("#---->{}#", content);
        tradePlateSendWebSocket.sendMessagePlate(content,null);
        log.info("#盘口信息消费者---->{}#", content);
        tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null);
    }
    /**
@@ -56,6 +57,13 @@
    public void handleTradeExchange(String content) {
        log.info("#---->{}#", content);
        List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
        // 去掉空的  暂时这样
        Iterator<ExchangeTrade> iterator = exchangeTrades.iterator();
        while (iterator.hasNext()){
            if(iterator.next()==null){
                iterator.remove();
            }
        }
        // 处理K线 并更新最新价
        handleKlineService.handleExchangeOrderToKline(exchangeTrades);
        // 推送最新K线
src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -17,7 +17,6 @@
public class CoinTrader {
    private String symbol;
    private ExchangeProducer exchangeProducer;
    private OrderCoinService orderCoinService;
    //交易币种的精度
    private int coinScale = 4;
    //基币的精度
@@ -752,11 +751,12 @@
        return count;
    }
    public OrderCoinService getOrderCoinService() {
        return orderCoinService;
    public ExchangeProducer getExchangeProducer() {
        return exchangeProducer;
    }
    public void setOrderCoinService(OrderCoinService orderCoinService) {
        this.orderCoinService = orderCoinService;
    public void setExchangeProducer(ExchangeProducer exchangeProducer) {
        this.exchangeProducer = exchangeProducer;
    }
}
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -75,6 +75,7 @@
        JSONObject jsonObject = JSON.parseObject(message);
        // 盘口的判断
        if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
            log.info("订阅盘口消息:{}", session.getId());
            String sub = jsonObject.get("sub").toString();
            String symbol = sub.split("\\.")[1];
            symbol = CoinTypeConvert.convert(symbol);
@@ -89,6 +90,7 @@
        // 取消盘口订阅
        if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
            // `market.${symbol}.kline.${strPeriod}
            log.info("取消订阅盘口消息:{}", session.getId());
            String unsub = jsonObject.get("unsub").toString();
            String[] split = unsub.split("\\.");
            String symbol = split[1];
@@ -110,6 +112,7 @@
        // 取消订阅 {unsub: xxx(标识)}
        if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
            // 订阅
            log.info("订阅最新K线消息:{}", session.getId());
            String sub = jsonObject.get("sub").toString();
            String[] split = sub.split("\\.");
            String symbol = split[1];
@@ -132,6 +135,7 @@
        // 取消订阅
        if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
            // `market.${symbol}.kline.${strPeriod}
            log.info("取消订阅最新K消息:{}", session.getId());
            String unsub = jsonObject.get("unsub").toString();
            String[] split = unsub.split("\\.");
            String strPeriod = split[3];
@@ -173,14 +177,14 @@
     *
     * @param message 消息内容
     */
    public void sendMessagePlate(String message, Session fromSession) {
        if (tradeplateClients.containsKey("nekkusdt")) {
            Map<String, Session> nekk = tradeplateClients.get("nekkusdt");
    public void sendMessagePlate(String symbol,String message, Session fromSession) {
        if (tradeplateClients.containsKey(symbol)) {
            Map<String, Session> nekk = tradeplateClients.get(symbol);
            for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
                Session toSession = sessionEntry.getValue();
                // 排除掉自己
                //if (!fromSession.getId().equals(toSession.getId())) {
                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
                log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
                boolean open = toSession.isOpen();
                if (open) {
                    toSession.getAsyncRemote().sendText(message);
src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
@@ -25,5 +25,23 @@
        select *
        from coins_order
        where order_status=1
        <if test="list != null">
            and symbol not in
            <foreach collection="list" separator="," item="item" open="(" close=")">
                #{item}
            </foreach>
        </if>
    </select>
    <select id="selectCoinOrderOnTrade" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
        select *
        from coins_order
        where order_status=1
        <if test="list != null">
            and symbol in
            <foreach collection="list" separator="," item="item" open="(" close=")">
                #{item}
            </foreach>
        </if>
    </select>
</mapper>