pom.xml
@@ -76,6 +76,11 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <!-- websocket --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- <dependency>--> <!-- <groupId>org.springframework.security</groupId>--> src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
@@ -14,7 +14,8 @@ ,BCH("BCH", "BCH/USDT") ,EOS("EOS", "EOS/USDT") ,XRP("XRP", "XRP/USDT") ,ETC("ETC", "ETC/USDT"); ,ETC("ETC", "ETC/USDT") ,NEKK("NEKK", "NEKK/USDT"); private String name; src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -29,6 +29,11 @@ public static final String EXCHANGE_A = "biue-exchange-A"; /** * 撮合交易 */ public static final String EXCHANGE_B = "biue-exchange-B"; // 开多止盈队列 public static final String QUEUE_MOREPRO = "QUEUE_MOREPRO_NEW"; @@ -51,6 +56,12 @@ // 平仓队列 public static final String QUEUE_CLOSETRADE = "QUEUE_CLOSETRADE_NEW"; // 盘口队列 public static final String QUEUE_TRADE_PLATE = "QUEUE_TRADE_PLATE"; // 处理交易 public static final String QUEUE_HANDLE_TRADE = "QUEUE_HANDLE_TRADE"; // 开多止盈路由键 public static final String ROUTINGKEY_MOREPRO = "ROUTINGKEY_MOREPRO"; @@ -71,6 +82,12 @@ public static final String ROUTINGKEY_PRICEOPERATE = "ROUTINGKEY_PRICEOPERATE"; // 平仓路由 public static final String ROUTINGKEY_CLOSETRADE = "ROUTINGKEY_CLOSETRADE"; // 盘口理路由 public static final String ROUTINGKEY_TRADE_PLATE = "ROUTINGKEY_TRADE_PLATE"; // 交易订单处理 public static final String ROUTINGKEY_HANDLE_TRADE = "ROUTINGKEY_HANDLE_TRADE"; @Resource private ConnectionFactory connectionFactory; @@ -94,6 +111,7 @@ public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_ONE); } @Bean public Queue testQueue() { @@ -204,6 +222,27 @@ /** * 盘口推送 * * @return */ @Bean public Queue queuePlateTrade() { return new Queue(QUEUE_TRADE_PLATE, true); } /** * 交易订单处理 * * @return */ @Bean public Queue queueHandleTrade() { return new Queue(QUEUE_HANDLE_TRADE, true); } /** * 开多止盈 * * @return @@ -286,4 +325,30 @@ return BindingBuilder.bind(queueCloseTrade()).to(orderExchange()).with(RabbitMqConfig.ROUTINGKEY_CLOSETRADE); } @Bean public DirectExchange matchTradeExchange() { return new DirectExchange(EXCHANGE_B); } /** * 盘口变化绑定 * * @return */ @Bean public Binding bindingPlateTrade() { return BindingBuilder.bind(queuePlateTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_TRADE_PLATE); } /** * 交易订单处理 * * @return */ @Bean public Binding bindingHandleTrade() { return BindingBuilder.bind(queueHandleTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE); } } src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java
New file @@ -0,0 +1,16 @@ package com.xcong.excoin.configurations; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { /** * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
@@ -53,6 +53,7 @@ .antMatchers("/api/member/getAppVersionInfo").permitAll() .antMatchers("/api/orderCoin/searchSymbolResultList").permitAll() .antMatchers("/api/orderCoin/findCollect").permitAll() .antMatchers("/trade/**").permitAll() .anyRequest().authenticated() .and().apply(securityConfiguereAdapter()); } src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -67,7 +67,12 @@ Integer tradeType = submitSalesWalletCoinOrderDto.getTradeType(); BigDecimal price = submitSalesWalletCoinOrderDto.getPrice(); BigDecimal amount = submitSalesWalletCoinOrderDto.getAmount(); return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount); if("NEKK".equals(symbol)){ return orderCoinService.submitSalesWalletCoinOrderWithMatch(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount()); }else{ return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount); } } /** src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
@@ -1,7 +1,9 @@ package com.xcong.excoin.modules.coin.dao; import java.util.Date; import java.util.List; import com.xcong.excoin.trade.ExchangeTrade; import org.apache.ibatis.annotations.Param; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -21,5 +23,6 @@ IPage<OrderCoinsDealEntity> findAllWalletCoinOrderInPage(Page<OrderCoinsDealEntity> page, @Param("record") OrderCoinsDealEntity orderCoinsDealEntity); List<ExchangeTrade> selectOrderCoinDealByTime(@Param("symbol")String symbol, @Param("startTime")Date startTime, @Param("endTime")Date endTime); } src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java
@@ -57,6 +57,11 @@ * 成交价 */ private BigDecimal dealPrice; /** * 市价委托时的委托金额 */ private BigDecimal entrustAmount; /** * 成交金额 */ src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java
@@ -26,12 +26,16 @@ private Integer tradeType; @Min(0) @NotNull(message = "数量不能为空") //@NotNull(message = "数量不能为空") @ApiModelProperty(value = "数量", example = "100") private BigDecimal amount; @NotNull(message = "建仓价不能为空") //@NotNull(message = "建仓价不能为空") @ApiModelProperty(value = "建仓价", example = "20.0000") private BigDecimal price; @ApiModelProperty(value = "市价时输入的总金额", example = "20.0000") private BigDecimal entrustAmount; } src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -1,11 +1,13 @@ package com.xcong.excoin.modules.coin.service; import java.math.BigDecimal; import java.util.List; import com.baomidou.mybatisplus.extension.service.IService; import com.xcong.excoin.common.response.Result; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.parameter.dto.FindAllWalletCoinOrderDto; import com.xcong.excoin.trade.ExchangeTrade; public interface OrderCoinService extends IService<OrderCoinsEntity>{ @@ -15,6 +17,19 @@ Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount); /** * 需要撮合交易的币种提交买卖单 * @param symbol * @param type * @param tradeType * @param price * @param amount * @param entrustAmount * @return */ Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount,BigDecimal entrustAmount); public Result getEntrustWalletCoinOrder(String symbol, Integer status); @@ -34,4 +49,6 @@ public void dealEntrustCoinOrder(); public void handleOrder(List<ExchangeTrade> trades); } src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -1,14 +1,9 @@ package com.xcong.excoin.modules.coin.service.impl; import java.math.BigDecimal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.*; import javax.annotation.Resource; @@ -16,6 +11,10 @@ import com.xcong.excoin.modules.platform.entity.PlatformCnyUsdtExchangeEntity; import com.xcong.excoin.modules.platform.entity.PlatformSymbolsCoinEntity; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.trade.ExchangeTrade; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -79,6 +78,10 @@ RedisUtils redisUtils; @Resource PlatformSymbolsCoinDao platformSymbolsCoinDao; @Resource private CoinTraderFactory factory; @Override public String generateSimpleSerialno(String userId) { @@ -309,6 +312,174 @@ record.setBalance(walletCoinUsdt.getAvailableBalance()); memberAccountFlowEntityDao.insert(record); return Result.ok(MessageSourceUtils.getString("order_service_0011")); } @Override public Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) { //获取用户ID Long memberId = 13L; BigDecimal nowPriceinBigDecimal = price; //查询当前价 //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT"))); // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置 symbol = symbol.toUpperCase(); MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol); if (ObjectUtil.isEmpty(walletCoin)) { return Result.fail(MessageSourceUtils.getString("order_service_0003")); } // 查询交易设置 PlatformTradeSettingEntity tradeSetting = platformTradeSettingDao.findTradeSetting(); if (ObjectUtil.isEmpty(tradeSetting)) { return Result.fail(MessageSourceUtils.getString("order_service_0009")); } // 手续费用(手续费=建仓价X数量X手续费率) BigDecimal closingPrice ; // 总费用分两种 1,限价交易 是价格*数量+手续费 2,市价交易 用户输入的金额+手续费 //总费用 = 成交价*数量+手续费 BigDecimal totalPayPrice ; if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType) ){ // 限价 closingPrice = price.multiply(amount).multiply(tradeSetting.getCoinFeeRatio()); totalPayPrice = price.multiply(amount).add(closingPrice); }else{ // 市价 closingPrice = entrustAmount.multiply(tradeSetting.getCoinFeeRatio()); totalPayPrice = entrustAmount.add(closingPrice); } // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice); String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue(); MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode); if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { //买入,所需总费用跟用户USDT金额进行比较 BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance(); if (totalPayPrice.compareTo(availableBalance) > 0) { return Result.fail(MessageSourceUtils.getString("order_service_0010")); } } else { //卖出,用户币是否足够 BigDecimal availableBalance = walletCoin.getAvailableBalance(); if (amount.compareTo(availableBalance) > 0) { return Result.fail(MessageSourceUtils.getString("order_service_0010")); } } // 首先将单插入到数据库主表(委托表) // 创建订单 OrderCoinsEntity order = new OrderCoinsEntity(); //根据委托类型生成不同数据 // 如果是限价交易直接插入主表数据 order.setMemberId(memberId); order.setOrderNo(generateSimpleSerialno(memberId.toString())); order.setOrderType(type); order.setSymbol(symbol); //order.setMarkPrice(nowPrice); // 成交量 先设置为0 order.setDealCnt(BigDecimal.ZERO); // 成交价 //order.setDealPrice(price); // 成交金额 order.setDealAmount(BigDecimal.ZERO); order.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DODING); order.setTradeType(tradeType); // 手续费 order.setFeeAmount(closingPrice); if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType)){ // 限价 是需要价格和数量 可以得到成交金额 // 下单量 order.setEntrustCnt(amount); // 下单价格 order.setEntrustPrice(price); order.setEntrustAmount(amount.multiply(price)); }else{ if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){ // 市价 只有金额 order.setEntrustAmount(entrustAmount); }else{ // 下单量 order.setEntrustCnt(amount); // 下单价格 order.setEntrustPrice(price); order.setEntrustAmount(amount.multiply(price)); } } orderCoinsDao.insert(order); //更新用户钱包信息 //冻结相应的资产 if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { //如果是买入,所对应的币种增加,USDT账户减少金额 BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice); BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice); walletCoinUsdt.setAvailableBalance(availableBalance); walletCoinUsdt.setFrozenBalance(frozenBalance); memberWalletCoinDao.updateById(walletCoinUsdt); } else { //如果是卖出,币种减少,USDT增加 BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount); BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount); walletCoin.setAvailableBalance(availableBalance); walletCoin.setFrozenBalance(frozenBalance); memberWalletCoinDao.updateById(walletCoin); } // 加入到撮合 CoinTrader trader = factory.getTrader(symbol); // if(trader==null){ // // trader = new CoinTrader("NEKK"); // //newTrader.setKafkaTemplate(kafkaTemplate); // //newTrader.setBaseCoinScale(coin.getBaseCoinScale()); // //newTrader.setCoinScale(coin.getCoinScale()); // // newTrader.setPublishType(coin.getPublishType()); // //newTrader.setClearTime(coin.getClearTime()); // // // 创建成功以后需要对未处理订单预处理 // //log.info("======CoinTrader Process: " + symbol + "======"); // List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList(); // if(CollectionUtils.isNotEmpty(orders)){ // List<OrderCoinsEntity> tradingOrders = new ArrayList<>(); // List<OrderCoinsEntity> completedOrders = new ArrayList<>(); // orders.forEach(order1 -> { // tradingOrders.add(order1); // }); // try { // trader.trade(tradingOrders); // } catch (ParseException e) { // e.printStackTrace(); // // log.info("异常:trader.trade(tradingOrders);"); // } // } // // trader.setReady(true); // factory.addTrader(symbol, trader); // } trader.trade(order); // // 流水记录 TODO // MemberAccountFlowEntity record = new MemberAccountFlowEntity(); // record.setMemberId(memberId); // if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { // record.setPrice(totalPayPrice.setScale(4, BigDecimal.ROUND_DOWN)); // record.setSource(MemberAccountFlowEntity.SOURCE_BUY + symbol); // record.setRemark(MemberAccountFlowEntity.REMARK_BUY + symbol + ":" + amount); // } else { // record.setPrice(totalPayPrice.negate().setScale(4, BigDecimal.ROUND_DOWN)); // record.setSource(MemberAccountFlowEntity.SOURCE_SALE + symbol); // record.setRemark(MemberAccountFlowEntity.REMARK_SALE + symbol + ":" + amount); // } // record.setSymbol(symbol); // record.setBalance(walletCoinUsdt.getAvailableBalance()); // // memberAccountFlowEntityDao.insert(record); return Result.ok(MessageSourceUtils.getString("order_service_0011")); } @@ -638,4 +809,109 @@ } } } public void handleOrder(List<ExchangeTrade> trades){ // 处理撮合交易的订单 for(ExchangeTrade exchangeTrade : trades){ BigDecimal amount = exchangeTrade.getAmount(); Long buyOrderId = exchangeTrade.getBuyOrderId(); BigDecimal buyTurnover = exchangeTrade.getBuyTurnover(); int direction = exchangeTrade.getDirection(); BigDecimal price = exchangeTrade.getPrice(); Long sellOrderId = exchangeTrade.getSellOrderId(); // 买卖单都需要处理 // 买单 OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId); if(buyOrderCoinsEntity!=null){ // 比较剩余的量 BigDecimal dealAmount = buyOrderCoinsEntity.getDealAmount(); // 单的总金额 BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount(); BigDecimal add = dealAmount.add(buyTurnover); // 创建一个完成的单 OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); detail.setMemberId(buyOrderCoinsEntity.getMemberId()); //detail.setOrderId(order.getId()); detail.setOrderNo(buyOrderCoinsEntity.getOrderNo()); detail.setOrderType(buyOrderCoinsEntity.getOrderType()); detail.setTradeType(buyOrderCoinsEntity.getTradeType()); detail.setSymbol(buyOrderCoinsEntity.getSymbol()); detail.setSymbolCnt(amount); detail.setEntrustPrice(buyOrderCoinsEntity.getEntrustPrice()); detail.setDealPrice(price); detail.setDealAmount(buyTurnover); //detail.setFeeAmount(closingPrice); detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); orderCoinDealDao.insert(detail); // 如果这个单成交完 更改状态 if(add.compareTo(entrustAmount)>=0){ OrderCoinsEntity update = new OrderCoinsEntity(); update.setId(buyOrderId); update.setDealAmount(entrustAmount); update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount)); update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); update.setUpdateTime(new Date()); orderCoinsDao.updateById(update); }else{ OrderCoinsEntity update = new OrderCoinsEntity(); update.setId(buyOrderId); update.setDealAmount(add); update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount)); update.setUpdateTime(new Date()); orderCoinsDao.updateById(update); } } // 卖单 OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId); if(sellOrderCoinsEntity!=null){ // 比较剩余的量 BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt(); // 单的总数量 BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt(); BigDecimal add = dealAmount.add(amount); // 创建一个完成的单 OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); detail.setMemberId(sellOrderCoinsEntity.getMemberId()); //detail.setOrderId(order.getId()); detail.setOrderNo(sellOrderCoinsEntity.getOrderNo()); detail.setOrderType(sellOrderCoinsEntity.getOrderType()); detail.setTradeType(sellOrderCoinsEntity.getTradeType()); detail.setSymbol(sellOrderCoinsEntity.getSymbol()); detail.setSymbolCnt(amount); detail.setEntrustPrice(sellOrderCoinsEntity.getEntrustPrice()); detail.setDealPrice(price); detail.setDealAmount(buyTurnover); //detail.setFeeAmount(closingPrice); detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); orderCoinDealDao.insert(detail); // 如果这个单成交完 更改状态 if(add.compareTo(entrustCnt)>=0){ OrderCoinsEntity update = new OrderCoinsEntity(); update.setId(sellOrderId); // 总成交额 update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); update.setDealCnt(entrustCnt); update.setUpdateTime(new Date()); orderCoinsDao.updateById(update); }else{ // 未完成 OrderCoinsEntity update = new OrderCoinsEntity(); update.setId(sellOrderId); // 总成交额 update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount)); update.setUpdateTime(new Date()); orderCoinsDao.updateById(update); } // 卖币得到的usdt MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol()); if(memberWalletCoinEntity!=null){ memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),buyTurnover,buyTurnover,null); } } } } } src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java
New file @@ -0,0 +1,14 @@ package com.xcong.excoin.modules.exchange.service; import com.xcong.excoin.trade.ExchangeTrade; import java.util.List; public interface HandleKlineService { /** * 处理交易后的最新K线 * @param trades */ void handleExchangeOrderToKline(List<ExchangeTrade> trades); } src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
New file @@ -0,0 +1,56 @@ package com.xcong.excoin.modules.exchange.service.impl; import cn.hutool.core.util.StrUtil; import com.huobi.client.model.Candlestick; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.processor.CoinProcessor; import com.xcong.excoin.processor.CoinProcessorFactory; import com.xcong.excoin.trade.ExchangeTrade; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.math.BigDecimal; import java.util.Collection; import java.util.List; import java.util.Map; @Service public class HandleKlineServiceImpl implements HandleKlineService { @Resource private CoinProcessorFactory processorFactory; @Resource private RedisUtils redisUtils; @Override public void handleExchangeOrderToKline(List<ExchangeTrade> trades) { if (CollectionUtils.isEmpty(trades)) { return; } String symbol = trades.get(0).getSymbol(); String symbolUsdt = symbol; if(!symbol.contains("USDT")){ symbolUsdt = symbol+"/USDT"; } CoinProcessor processor = processorFactory.getProcessor(symbol); Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap(); Collection<Candlestick> values = currentKlineMap.values(); BigDecimal newPrice = trades.get(trades.size()-1).getPrice(); for (Candlestick candlestick : values) { for (ExchangeTrade exchangeTrade : trades) { processor.processTrade(candlestick, exchangeTrade); } } // 存入redis,websocket去取 String key = "NEW_KINE_{}"; key = StrUtil.format(key, symbolUsdt); redisUtils.set(key,currentKlineMap); // 更新最新价 redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice); } } src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java
@@ -22,4 +22,5 @@ int subFrozenBalance(@Param("memberId") Long memberId, @Param("id") Long id, @Param("amount") BigDecimal amount); int updateBlockBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("earlyBalance") BigDecimal earlyBalance, @Param("blockNumber") Integer blockNumber); int updateWalletBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("totalBalance") BigDecimal totalBalance, @Param("frozenBalance") BigDecimal frozenBalance); } src/main/java/com/xcong/excoin/processor/CoinProcessor.java
New file @@ -0,0 +1,54 @@ package com.xcong.excoin.processor; import com.huobi.client.model.Candlestick; import com.xcong.excoin.trade.ExchangeTrade; import com.xcong.excoin.utils.RedisUtils; import java.util.List; import java.util.Map; public interface CoinProcessor { void setIsHalt(boolean status); void setIsStopKLine(boolean stop); boolean isStopKline(); /** * 处理新生成的交易信息 * @param trades * @return */ void process(List<ExchangeTrade> trades); void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade); /** * 添加存储器 * @param storage */ void addHandler(MarketHandler storage); CoinThumb getThumb(); void setMarketService(MarketService service); void generateKLine(int range, int field, long time); Candlestick getKLine(); void initializeThumb(); void autoGenerate(); void resetThumb(); // void setExchangeRate(CoinExchangeRate coinExchangeRate); void update24HVolume(long time); //void initializeUsdRate(); Map<String,Candlestick> getCurrentKlineMap(); void setRedisUtils(RedisUtils redisUtils); } src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java
New file @@ -0,0 +1,34 @@ package com.xcong.excoin.processor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component public class CoinProcessorFactory { private ConcurrentHashMap<String, CoinProcessor> processorMap; public CoinProcessorFactory() { processorMap = new ConcurrentHashMap<>(); } public void addProcessor(String symbol, CoinProcessor processor) { log.info("CoinProcessorFactory addProcessor = {}" + symbol); processorMap.put(symbol, processor); } public boolean containsProcessor(String symbol) { return processorMap != null && processorMap.containsKey(symbol); } public CoinProcessor getProcessor(String symbol) { return processorMap.get(symbol); } public ConcurrentHashMap<String, CoinProcessor> getProcessorMap() { return processorMap; } } src/main/java/com/xcong/excoin/processor/CoinThumb.java
New file @@ -0,0 +1,26 @@ package com.xcong.excoin.processor; import lombok.Data; import java.math.BigDecimal; @Data public class CoinThumb { private String symbol; private BigDecimal open = BigDecimal.ZERO; private BigDecimal high= BigDecimal.ZERO; private BigDecimal low= BigDecimal.ZERO; private BigDecimal close=BigDecimal.ZERO; private BigDecimal chg = BigDecimal.ZERO.setScale(2); private BigDecimal change = BigDecimal.ZERO.setScale(2); private BigDecimal volume = BigDecimal.ZERO.setScale(2); private BigDecimal turnover= BigDecimal.ZERO; //昨日收盘价 private BigDecimal lastDayClose = BigDecimal.ZERO; //交易币对usd汇率 private BigDecimal usdRate; //基币对usd的汇率 private BigDecimal baseUsdRate; // 交易區 private int zone; } src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
New file @@ -0,0 +1,392 @@ package com.xcong.excoin.processor; import com.alibaba.fastjson.JSON; import com.huobi.client.model.Candlestick; import com.xcong.excoin.trade.ExchangeTrade; import com.xcong.excoin.utils.RedisUtils; import lombok.ToString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * 默认交易处理器,产生1mK线信息 */ @ToString public class DefaultCoinProcessor implements CoinProcessor { private Logger logger = LoggerFactory.getLogger(DefaultCoinProcessor.class); private String symbol; private String baseCoin; private Candlestick currentKLine; private List<MarketHandler> handlers; private CoinThumb coinThumb; private MarketService service; private RedisUtils redisUtils; //private CoinExchangeRate coinExchangeRate; //是否暂时处理 private Boolean isHalt = true; //是否停止K线生成 private Boolean stopKLine = false; /** * 每个时间段的K线在生成后,生成一个最新的K线 */ private Map<String,Candlestick> currentKlineMap = new ConcurrentHashMap<>(); public DefaultCoinProcessor(String symbol, String baseCoin) { //handlers = new ArrayList<>(); createNewKLine(); this.baseCoin = baseCoin; this.symbol = symbol; } public String getSymbol() { return symbol; } @Override public void initializeThumb() { Calendar calendar = Calendar.getInstance(); //将秒、微秒字段置为0 calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); long nowTime = calendar.getTimeInMillis(); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.HOUR_OF_DAY, 0); long firstTimeOfToday = calendar.getTimeInMillis(); String period = "1min"; logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime); List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period); coinThumb = new CoinThumb(); synchronized (coinThumb) { coinThumb.setSymbol(symbol); for (Candlestick kline : lines) { if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0) { continue; } if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) { coinThumb.setOpen(kline.getOpen()); } if (coinThumb.getHigh().compareTo(kline.getHigh()) < 0) { coinThumb.setHigh(kline.getHigh()); } if (kline.getLow().compareTo(BigDecimal.ZERO) > 0 && coinThumb.getLow().compareTo(kline.getLow()) > 0) { coinThumb.setLow(kline.getLow()); } if (kline.getClose().compareTo(BigDecimal.ZERO) > 0) { coinThumb.setClose(kline.getClose()); } coinThumb.setVolume(coinThumb.getVolume().add(kline.getVolume())); // TODO coinThumb.setTurnover(coinThumb.getTurnover().add(kline.getAmount())); } coinThumb.setChange(coinThumb.getClose().subtract(coinThumb.getOpen())); // 此处计算涨幅并没有以开盘价为标准,而是以最低价 if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) { coinThumb.setChg(coinThumb.getChange().divide(coinThumb.getLow(), 4, RoundingMode.UP)); } } } public void createNewKLine() { currentKLine = new Candlestick(); synchronized (currentKLine) { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); //1Min时间要是下一整分钟的 calendar.add(Calendar.MINUTE, 1); currentKLine.setTimestamp(calendar.getTimeInMillis()); // K线类型 //currentKLine.setPeriod("1min"); currentKLine.setCount(0); } } /** * 00:00:00 时重置CoinThumb */ @Override public void resetThumb() { logger.info("reset coinThumb"); synchronized (coinThumb) { coinThumb.setOpen(BigDecimal.ZERO); coinThumb.setHigh(BigDecimal.ZERO); //设置昨收价格 coinThumb.setLastDayClose(coinThumb.getClose()); //coinThumb.setClose(BigDecimal.ZERO); coinThumb.setLow(BigDecimal.ZERO); coinThumb.setChg(BigDecimal.ZERO); coinThumb.setChange(BigDecimal.ZERO); } } // @Override // public void setExchangeRate(CoinExchangeRate coinExchangeRate) { // this.coinExchangeRate = coinExchangeRate; // } @Override public void update24HVolume(long time) { if(coinThumb!=null) { synchronized (coinThumb) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); calendar.add(Calendar.HOUR_OF_DAY, -24); long timeStart = calendar.getTimeInMillis(); // TODO BigDecimal volume = service.findTradeVolume(this.symbol, timeStart, time); coinThumb.setVolume(volume.setScale(4, RoundingMode.DOWN)); } } } // @Override // public void initializeUsdRate() { // //logger.info("symbol = {} ,baseCoin = {}",this.symbol,this.baseCoin); // BigDecimal baseUsdRate = coinExchangeRate.getUsdRate(baseCoin); // coinThumb.setBaseUsdRate(baseUsdRate); // //logger.info("setBaseUsdRate = ",baseUsdRate); // BigDecimal multiply = coinThumb.getClose().multiply(coinExchangeRate.getUsdRate(baseCoin)); // //logger.info("setUsdRate = ",multiply); // coinThumb.setUsdRate(multiply); // } @Override public void autoGenerate() { DateFormat df = new SimpleDateFormat("HH:mm:ss"); //logger.info("auto generate 1min kline in {},data={}", df.format(new Date(currentKLine.getTime())), JSON.toJSONString(currentKLine)); if(coinThumb != null) { synchronized (currentKLine) { //没有成交价时存储上一笔成交价 if(currentKLine.getOpen()==null){ currentKLine.setOpen(BigDecimal.ZERO); } if (currentKLine.getOpen().compareTo(BigDecimal.ZERO) == 0) { currentKLine.setOpen(coinThumb.getClose()); currentKLine.setLow(coinThumb.getClose()); currentKLine.setHigh(coinThumb.getClose()); currentKLine.setClose(coinThumb.getClose()); } Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); currentKLine.setTimestamp(calendar.getTimeInMillis()); handleKLineStorage(currentKLine); createNewKLine(); } } } @Override public void setIsHalt(boolean status) { this.isHalt = status; } @Override public void process(List<ExchangeTrade> trades) { if (!isHalt) { if (trades == null || trades.size() == 0) { return; } synchronized (currentKLine) { for (ExchangeTrade exchangeTrade : trades) { //处理K线 processTrade(currentKLine, exchangeTrade); //处理今日概况信息 logger.debug("处理今日概况信息"); handleThumb(exchangeTrade); //存储并推送成交信息 handleTradeStorage(exchangeTrade); } } } } public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) { if (kLine.getClose().compareTo(BigDecimal.ZERO) == 0) { //第一次设置K线值 kLine.setOpen(exchangeTrade.getPrice()); kLine.setHigh(exchangeTrade.getPrice()); kLine.setLow(exchangeTrade.getPrice()); kLine.setClose(exchangeTrade.getPrice()); } else { kLine.setHigh(exchangeTrade.getPrice().max(kLine.getHigh())); kLine.setLow(exchangeTrade.getPrice().min(kLine.getLow())); kLine.setClose(exchangeTrade.getPrice()); } kLine.setCount(kLine.getCount() + 1); kLine.setVolume(kLine.getVolume().add(exchangeTrade.getAmount())); BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()); // kLine.setTurnover(kLine.getTurnover().add(turnover)); } public void handleTradeStorage(ExchangeTrade exchangeTrade) { for (MarketHandler storage : handlers) { storage.handleTrade(symbol, exchangeTrade, coinThumb); } } public void handleKLineStorage(Candlestick kLine) { // 存储交易信息 TODO 发送最新的一根K线 // for (MarketHandler storage : handlers) { // storage.handleKLine(symbol, kLine); // } } public void handleThumb(ExchangeTrade exchangeTrade) { logger.info("handleThumb symbol = {}", this.symbol); synchronized (coinThumb) { if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) { //第一笔交易记为开盘价 coinThumb.setOpen(exchangeTrade.getPrice()); } coinThumb.setHigh(exchangeTrade.getPrice().max(coinThumb.getHigh())); if (coinThumb.getLow().compareTo(BigDecimal.ZERO) == 0) { coinThumb.setLow(exchangeTrade.getPrice()); } else { coinThumb.setLow(exchangeTrade.getPrice().min(coinThumb.getLow())); } coinThumb.setClose(exchangeTrade.getPrice()); coinThumb.setVolume(coinThumb.getVolume().add(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP)); BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP); coinThumb.setTurnover(coinThumb.getTurnover().add(turnover)); BigDecimal change = coinThumb.getClose().subtract(coinThumb.getOpen()); coinThumb.setChange(change); if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) { coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP)); } if ("USDT".equalsIgnoreCase(baseCoin)) { logger.info("setUsdRate", exchangeTrade.getPrice()); coinThumb.setUsdRate(exchangeTrade.getPrice()); } else { } //coinThumb.setBaseUsdRate(coinExchangeRate.getUsdRate(baseCoin)); //coinThumb.setUsdRate(exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin))); //logger.info("setUsdRate", exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin))); //logger.info("thumb = {}", coinThumb); } } @Override public void addHandler(MarketHandler storage) { handlers.add(storage); } @Override public CoinThumb getThumb() { return coinThumb; } @Override public void setMarketService(MarketService service) { this.service = service; } @Override public void generateKLine(int range, int field, long time) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long endTick = calendar.getTimeInMillis(); String endTime = df.format(calendar.getTime()); //往前推range个时间单位 calendar.add(field, -range); String fromTime = df.format(calendar.getTime()); long startTick = calendar.getTimeInMillis(); System.out.println("time range from " + fromTime + " to " + endTime); List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick); Candlestick kLine = new Candlestick(); kLine.setTimestamp(endTick); kLine.setAmount(BigDecimal.ZERO); kLine.setClose(BigDecimal.ZERO); kLine.setLow(BigDecimal.ZERO); kLine.setOpen(BigDecimal.ZERO); kLine.setVolume(BigDecimal.ZERO); kLine.setHigh(BigDecimal.ZERO); String rangeUnit = ""; if (field == Calendar.MINUTE) { rangeUnit = "min"; } else if (field == Calendar.HOUR_OF_DAY) { rangeUnit = "hour"; } else if (field == Calendar.DAY_OF_WEEK) { rangeUnit = "week"; } else if (field == Calendar.DAY_OF_YEAR) { rangeUnit = "day"; } else if (field == Calendar.MONTH) { rangeUnit = "month"; } // kLine.setPeriod(range + rangeUnit); String period = range + rangeUnit; // 处理K线信息 for (ExchangeTrade exchangeTrade : exchangeTrades) { processTrade(kLine, exchangeTrade); } // 如果开盘价为0,则设置为前一个价格 if(kLine.getOpen().compareTo(BigDecimal.ZERO) == 0) { kLine.setOpen(coinThumb.getClose()); kLine.setClose(coinThumb.getClose()); kLine.setLow(coinThumb.getClose()); kLine.setHigh(coinThumb.getClose()); } logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine)); service.saveKLine(symbol,period, kLine); // 生成一个对应的新K线 后续的交易会更新这个最新K线数据 Candlestick newKline = new Candlestick(); //kLine.setTimestamp(endTick); newKline.setAmount(BigDecimal.ZERO); newKline.setClose(kLine.getClose()); newKline.setLow(kLine.getClose()); // 开盘价是上个K线的收盘价 newKline.setOpen(kLine.getClose()); newKline.setVolume(BigDecimal.ZERO); newKline.setHigh(kLine.getClose()); currentKlineMap.put(period,newKline); // 存储昨日K线 if("day".equals(rangeUnit)){ redisUtils.set("NEKK/USDT",kLine); } } @Override public Candlestick getKLine() { return currentKLine; } @Override public void setIsStopKLine(boolean stop) { this.stopKLine = stop; } @Override public boolean isStopKline() { return this.stopKLine; } @Override public Map<String, Candlestick> getCurrentKlineMap() { return currentKlineMap; } @Override public void setRedisUtils(RedisUtils redisUtils) { this.redisUtils = redisUtils; } public void setCurrentKlineMap(Map<String, Candlestick> currentKlineMap) { this.currentKlineMap = currentKlineMap; } } src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java
New file @@ -0,0 +1,5 @@ package com.xcong.excoin.processor; public enum ExchangeOrderDirection { BUY,SELL; } src/main/java/com/xcong/excoin/processor/MarketHandler.java
New file @@ -0,0 +1,21 @@ package com.xcong.excoin.processor; import com.huobi.client.model.Candlestick; import com.xcong.excoin.trade.ExchangeTrade; public interface MarketHandler { /** * 存储交易信息 * @param exchangeTrade */ void handleTrade(String symbol, ExchangeTrade exchangeTrade, CoinThumb thumb); /** * 存储K线信息 * * @param kLine */ void handleKLine(String symbol, Candlestick kLine); } src/main/java/com/xcong/excoin/processor/MarketService.java
New file @@ -0,0 +1,125 @@ package com.xcong.excoin.processor; import com.huobi.client.model.Candlestick; import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao; import com.xcong.excoin.trade.ExchangeTrade; import com.xcong.excoin.utils.RedisUtils; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.List; @Service public class MarketService { // @Autowired // private MongoTemplate mongoTemplate; @Resource private OrderCoinDealDao orderCoinDealDao; @Resource private RedisUtils redisUtils; public List<Candlestick> findAllKLine(String symbol, String peroid) { // Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time")); // Query query = new Query().with(sort).limit(1000); // // return mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+peroid); return null; } public List<Candlestick> findAllKLine(String symbol, long fromTime, long toTime, String period) { // Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime)); // Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time")); // Query query = new Query(criteria).with(sort); // List<KLine> kLines = mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+ period); // return kLines; String key = "KINE_" + symbol + "_" + period; Object data = redisUtils.get(key); List list = new ArrayList(); if (data != null) { list = (List) data; } return list; } // // public ExchangeTrade findFirstTrade(String symbol,long fromTime,long toTime){ // Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime)); // Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time")); // Query query = new Query(criteria).with(sort); // return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol); // } // public ExchangeTrade findLastTrade(String symbol,long fromTime,long toTime){ // Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime)); // Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time")); // Query query = new Query(criteria).with(sort); // return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol); // } // // public ExchangeTrade findTrade(String symbol, long fromTime, long toTime, Sort.Order order){ // Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime)); // Sort sort = new Sort(order); // Query query = new Query(criteria).with(sort); // return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol); // } public List<ExchangeTrade> findTradeByTimeRange(String symbol, long timeStart, long timeEnd) { // Criteria criteria = Criteria.where("time").gte(timeStart).andOperator(Criteria.where("time").lt(timeEnd)); // Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time")); // Query query = new Query(criteria).with(sort); // // return mongoTemplate.find(query,ExchangeTrade.class,"exchange_trade_"+symbol); return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart)); // return null; } public void saveKLine(String symbol, String period, Candlestick kLine) { // 先获取 String key = "KINE_" + symbol + "_" + period; Object data = redisUtils.get(key); List list = new ArrayList(); if (data != null) { list = (List) data; } list.add(kLine); redisUtils.set("KINE_" + symbol + "_" + period, list); // mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod()); } /** * 查找某时间段内的交易量 * * @param symbol * @param timeStart * @param timeEnd * @return */ public BigDecimal findTradeVolume(String symbol, long timeStart, long timeEnd) { // Criteria criteria = Criteria.where("time").gt(timeStart) // .andOperator(Criteria.where("time").lte(timeEnd)); // //.andOperator(Criteria.where("volume").gt(0)); // Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time")); // Query query = new Query(criteria).with(sort); // List<KLine> kLines = mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_1min"); // BigDecimal totalVolume = BigDecimal.ZERO; // for(KLine kLine:kLines){ // totalVolume = totalVolume.add(kLine.getVolume()); // } // return totalVolume; List<ExchangeTrade> exchangeTrades = orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart)); BigDecimal totalVolume = BigDecimal.ZERO; if (CollectionUtils.isNotEmpty(exchangeTrades)) { for (ExchangeTrade kLine : exchangeTrades) { totalVolume = totalVolume.add(kLine.getBuyTurnover()); } } return totalVolume; } } src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
New file @@ -0,0 +1,98 @@ package com.xcong.excoin.quartz.job; import com.alibaba.fastjson.JSON; import com.huobi.client.SubscriptionClient; import com.huobi.client.SubscriptionOptions; import com.huobi.client.model.Candlestick; import com.huobi.client.model.enums.CandlestickInterval; import com.xcong.excoin.modules.coin.dao.OrderCoinsDao; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.symbols.service.SymbolsService; import com.xcong.excoin.processor.CoinProcessor; import com.xcong.excoin.processor.CoinProcessorFactory; import com.xcong.excoin.processor.DefaultCoinProcessor; import com.xcong.excoin.processor.MarketService; import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.math.BigDecimal; import java.text.ParseException; import java.util.ArrayList; import java.util.List; /** * 开启撮合交易 * * @author wzy * @date 2020-05-28 **/ @Slf4j @Component //@ConditionalOnProperty(prefix = "app", name = "trade", havingValue = "true") public class CoinTradeInitJob { @Resource private OrderCoinsDao orderCoinsDao; @Resource private CoinTraderFactory factory; @Resource private OrderCoinService coinService; @Resource private MarketService marketService; @Resource private CoinProcessorFactory processorFactory; @PostConstruct public void initCoinTrade() { log.info("#=======撮合交易器开启=======#"); String symbol = "NEKK"; CoinTrader newTrader = new CoinTrader(symbol); newTrader.setOrderCoinService(coinService); //newTrader.setKafkaTemplate(kafkaTemplate); //newTrader.setBaseCoinScale(coin.getBaseCoinScale()); //newTrader.setCoinScale(coin.getCoinScale()); // newTrader.setPublishType(coin.getPublishType()); //newTrader.setClearTime(coin.getClearTime()); // 创建成功以后需要对未处理订单预处理 log.info("======CoinTrader Process: " + symbol + "======"); List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList(); List<OrderCoinsEntity> tradingOrders = new ArrayList<>(); List<OrderCoinsEntity> completedOrders = new ArrayList<>(); orders.forEach(order -> { tradingOrders.add(order); }); try { newTrader.trade(tradingOrders); } catch (ParseException e) { e.printStackTrace(); log.info("异常:trader.trade(tradingOrders);"); } newTrader.setReady(true); factory.addTrader(symbol, newTrader); // 创建K线生成器 CoinProcessor processor = new DefaultCoinProcessor(symbol, "USDT"); processor.setMarketService(marketService); //processor.setExchangeRate(exchangeRate); processor.initializeThumb(); //processor.initializeUsdRate(); processor.setIsHalt(false); processorFactory.addProcessor(symbol, processor); } } src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
New file @@ -0,0 +1,159 @@ package com.xcong.excoin.quartz.job; import com.alibaba.fastjson.JSON; import com.huobi.client.model.Candlestick; import com.xcong.excoin.processor.CoinProcessorFactory; import com.xcong.excoin.trade.TradePlateModel; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.websocket.TradePlateSendWebSocket; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Calendar; import java.util.List; /** * 生成各时间段的K线信息 * */ @Component @Slf4j public class KLineGeneratorJob { @Resource private CoinProcessorFactory processorFactory; @Resource private TradePlateSendWebSocket plateSendWebSocket; @Resource private RedisUtils redisUtils; @Scheduled(cron = "0/2 * * * * *") public void tradePlate(){ redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random())); Candlestick candlestick = new Candlestick(); candlestick.setOpen(new BigDecimal("10.33")); candlestick.setHigh(new BigDecimal("15.23")); candlestick.setVolume(new BigDecimal("12121.34")); candlestick.setLow(new BigDecimal("8.234")); candlestick.setAmount(new BigDecimal("1199")); candlestick.setTimestamp(1599840000); candlestick.setId(1599840000L); candlestick.setCount(100002); candlestick.setClose(new BigDecimal("12.2323")); redisUtils.set("NEKK/USDT",candlestick); // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]] TradePlateModel tradePlateModel = new TradePlateModel(); List<BigDecimal> buy; List<BigDecimal> sell; for(int i=0;i<5;i++){ buy = new ArrayList<>(2); buy.add(new BigDecimal(Math.random()*i)); buy.add(new BigDecimal(Math.random()*i)); tradePlateModel.getBuy().add(buy); sell = new ArrayList<>(2); sell.add(new BigDecimal(Math.random()*i*2)); sell.add(new BigDecimal(Math.random()*i*2)); tradePlateModel.getSell().add(sell); } plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null); plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); } /** * 每分钟定时器,处理分钟K线 */ @Scheduled(cron = "0 * * * * *") public void handle5minKLine(){ Calendar calendar = Calendar.getInstance(); log.debug("分钟K线:{}",calendar.getTime()); //将秒、微秒字段置为0 calendar.set(Calendar.SECOND,0); calendar.set(Calendar.MILLISECOND,0); long time = calendar.getTimeInMillis(); int minute = calendar.get(Calendar.MINUTE); int hour = calendar.get(Calendar.HOUR_OF_DAY); processorFactory.getProcessorMap().forEach((symbol,processor)->{ if(!processor.isStopKline()) { log.debug("生成{}分钟k线:{}",symbol); //生成1分钟K线 processor.autoGenerate(); //更新24H成交量 processor.update24HVolume(time); if(minute%5 == 0) { // 五分钟的当前K线 processor.generateKLine(5, Calendar.MINUTE, time); } if(minute%10 == 0){ processor.generateKLine(10, Calendar.MINUTE,time); } if(minute%15 == 0){ processor.generateKLine(15, Calendar.MINUTE,time); } if(minute%30 == 0){ processor.generateKLine(30, Calendar.MINUTE,time); } if(hour == 0 && minute == 0){ processor.resetThumb(); } } }); } /** * 每小时运行 */ @Scheduled(cron = "0 0 * * * *") public void handleHourKLine(){ processorFactory.getProcessorMap().forEach((symbol,processor)-> { if(!processor.isStopKline()) { Calendar calendar = Calendar.getInstance(); log.info("小时K线:{}",calendar.getTime()); //将秒、微秒字段置为0 calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); long time = calendar.getTimeInMillis(); processor.generateKLine(1, Calendar.HOUR_OF_DAY, time); } }); } /** * 每日0点处理器,处理日K线 */ @Scheduled(cron = "0 0 0 * * *") public void handleDayKLine(){ processorFactory.getProcessorMap().forEach((symbol,processor)->{ if(!processor.isStopKline()) { Calendar calendar = Calendar.getInstance(); log.info("日K线:{}",calendar.getTime()); //将秒、微秒字段置为0 calendar.set(Calendar.HOUR_OF_DAY,0); calendar.set(Calendar.MINUTE,0); calendar.set(Calendar.SECOND,0); calendar.set(Calendar.MILLISECOND,0); long time = calendar.getTimeInMillis(); int week = calendar.get(Calendar.DAY_OF_WEEK); int dayOfMonth = calendar.get(Calendar.DAY_OF_MONTH); if(week == 1){ processor.generateKLine(1, Calendar.DAY_OF_WEEK, time); } if(dayOfMonth == 1){ processor.generateKLine(1, Calendar.DAY_OF_MONTH, time); } processor.generateKLine(1, Calendar.DAY_OF_YEAR,time); } }); } } src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
New file @@ -0,0 +1,98 @@ package com.xcong.excoin.rabbit.consumer; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.trade.ExchangeTrade; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.websocket.TradePlateSendWebSocket; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; import java.util.Map; import java.util.Set; /** * @author wzy * @date 2020-05-25 **/ @Slf4j @Component public class ExchangeConsumer { @Resource private TradePlateSendWebSocket tradePlateSendWebSocket; @Resource private RedisUtils redisUtils; @Resource private HandleKlineService handleKlineService; @Resource private OrderCoinService orderCoinService; /** * 发送盘口信息 * @param content */ @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) public void tradePlate(String content) { log.info("#---->{}#", content); tradePlateSendWebSocket.sendMessagePlate(content,null); } /** * 处理订单 * @param content */ @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE) public void handleTradeExchange(String content) { log.info("#---->{}#", content); List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); // 处理K线 并更新最新价 handleKlineService.handleExchangeOrderToKline(exchangeTrades); // 推送最新K线 String symbol = exchangeTrades.get(0).getSymbol(); String symbolUsdt = symbol; if(!symbol.contains("USDT")){ symbolUsdt = symbol+"/USDT"; } String key = "NEW_KINE_{}"; key = StrUtil.format(key, symbolUsdt); Object o = redisUtils.get(key); Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o; Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); for(Map.Entry<String, Candlestick> map : entries){ tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null); } // 处理用户订单 orderCoinService.handleOrder(exchangeTrades); } /** * 更新最新K线 * @param content */ // @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) // public void newKling(String content) { // log.info("#---->{}#", content); // // 最新K线的币种 // String key = "NEW_KINE_{}"; // key = StrUtil.format(key, content); // Object o = redisUtils.get(key); // Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o; // // 推送最新K线 // Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); // for(Map.Entry<String, Candlestick> map : entries){ // tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null); // } // // } } src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
New file @@ -0,0 +1,47 @@ package com.xcong.excoin.rabbit.producer; import cn.hutool.core.util.IdUtil; import com.xcong.excoin.configurations.RabbitMqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author wzy * @date 2020-05-25 **/ @Slf4j @Component public class ExchangeProducer implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; @Autowired public ExchangeProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } public void sendPlateMsg(String content) { CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_TRADE_PLATE, content, correlationData); } public void sendHandleTrade(String content) { CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE, content, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("#----->{}#", correlationData); if (ack) { log.info("success"); } else { log.info("--->{}", cause); } } } src/main/java/com/xcong/excoin/trade/CoinTrader.java
New file @@ -0,0 +1,762 @@ package com.xcong.excoin.trade; import com.alibaba.fastjson.JSON; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.rabbit.producer.ExchangeProducer; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; public class CoinTrader { private String symbol; private ExchangeProducer exchangeProducer; private OrderCoinService orderCoinService; //交易币种的精度 private int coinScale = 4; //基币的精度 private int baseCoinScale = 4; private Logger logger = LoggerFactory.getLogger(CoinTrader.class); //买入限价订单链表,价格从高到低排列 private TreeMap<BigDecimal, MergeOrder> buyLimitPriceQueue; //卖出限价订单链表,价格从低到高排列 private TreeMap<BigDecimal, MergeOrder> sellLimitPriceQueue; //买入市价订单链表,按时间从小到大排序 private LinkedList<OrderCoinsEntity> buyMarketQueue; //卖出市价订单链表,按时间从小到大排序 private LinkedList<OrderCoinsEntity> sellMarketQueue; //卖盘盘口信息 private TradePlate sellTradePlate; //买盘盘口信息 private TradePlate buyTradePlate; //是否暂停交易 private boolean tradingHalt = false; private boolean ready = false; //交易对信息 //private ExchangeCoinPublishType publishType; private String clearTime; private SimpleDateFormat dateTimeFormat; public CoinTrader(String symbol) { this.symbol = symbol; initialize(); } /** * 初始化交易线程 */ public void initialize() { logger.info("init CoinTrader for symbol {}", symbol); //买单队列价格降序排列 buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder()); //卖单队列价格升序排列 this.sellLimitPriceQueue = new TreeMap<>(Comparator.naturalOrder()); this.buyMarketQueue = new LinkedList<>(); this.sellMarketQueue = new LinkedList<>(); this.sellTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_SELL); this.buyTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_BUY); this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); } /** * 增加限价订单到队列,买入单按从价格高到低排,卖出单按价格从低到高排 * * @param exchangeOrder */ public void addLimitPriceOrder(OrderCoinsEntity exchangeOrder) { if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { return; } //logger.info("addLimitPriceOrder,orderId = {}", exchangeOrder.getOrderId()); TreeMap<BigDecimal, MergeOrder> list; if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { list = buyLimitPriceQueue; // 添加盘口信息 TODO buyTradePlate.add(exchangeOrder); if (ready) { // 发送盘口信息 sendTradePlateMessage(buyTradePlate, sellTradePlate); } } else { list = sellLimitPriceQueue; sellTradePlate.add(exchangeOrder); if (ready) { sendTradePlateMessage(buyTradePlate, sellTradePlate); } } synchronized (list) { MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice()); if (mergeOrder == null) { mergeOrder = new MergeOrder(); mergeOrder.add(exchangeOrder); list.put(exchangeOrder.getEntrustPrice(), mergeOrder); } else { mergeOrder.add(exchangeOrder); } } } public void addMarketPriceOrder(OrderCoinsEntity exchangeOrder) { if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) { return; } logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId()); LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue; synchronized (list) { list.addLast(exchangeOrder); } } public void trade(List<OrderCoinsEntity> orders) throws ParseException { if (tradingHalt) { return; } for (OrderCoinsEntity order : orders) { trade(order); } } /** * 主动交易输入的订单,交易不完成的会输入到队列 * * @param exchangeOrder * @throws ParseException */ public void trade(OrderCoinsEntity exchangeOrder) { if (tradingHalt) { return; } //logger.info("trade order={}",exchangeOrder); if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) { logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT"); return; } // 如果 if (exchangeOrder.getEntrustAmount().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustAmount().subtract(exchangeOrder.getDealAmount()).compareTo(BigDecimal.ZERO) <= 0) { return; } TreeMap<BigDecimal, MergeOrder> limitPriceOrderList; LinkedList<OrderCoinsEntity> marketPriceOrderList; if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { // 买单 和限价卖单以及市价市场卖单队列对比 完成撮合 limitPriceOrderList = sellLimitPriceQueue; marketPriceOrderList = sellMarketQueue; } else { limitPriceOrderList = buyLimitPriceQueue; marketPriceOrderList = buyMarketQueue; } if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { //logger.info(">>>>>市价单>>>交易与限价单交易"); //与限价单交易 matchMarketPriceWithLPList(limitPriceOrderList, exchangeOrder); } else if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { //限价单价格必须大于0 if (exchangeOrder.getEntrustPrice().compareTo(BigDecimal.ZERO) <= 0) { return; } //logger.info(">>>>>限价单>>>交易与限价单交易"); //先与限价单交易 matchLimitPriceWithLPList(limitPriceOrderList, exchangeOrder, false); if (exchangeOrder.getEntrustCnt().compareTo(exchangeOrder.getDealCnt()) > 0) { //logger.info(">>>>限价单未交易完>>>>与市价单交易>>>>"); //后与市价单交易 matchLimitPriceWithMPList(marketPriceOrderList, exchangeOrder); } } } /** * 限价委托单与限价队列匹配 * * @param lpList 限价对手单队列 * @param focusedOrder 交易订单 */ public void matchLimitPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder, boolean canEnterList) { List<ExchangeTrade> exchangeTrades = new ArrayList<>(); List<OrderCoinsEntity> completedOrders = new ArrayList<>(); synchronized (lpList) { Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator(); boolean exitLoop = false; while (!exitLoop && mergeOrderIterator.hasNext()) { Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next(); MergeOrder mergeOrder = entry.getValue(); Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator(); //买入单需要匹配的价格不大于委托价,否则退出 if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) > 0) { break; } //卖出单需要匹配的价格不小于委托价,否则退出 if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) < 0) { break; } while (orderIterator.hasNext()) { OrderCoinsEntity matchOrder = orderIterator.next(); //处理匹配 ExchangeTrade trade = processMatch(focusedOrder, matchOrder); exchangeTrades.add(trade); //判断匹配单是否完成 if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { //当前匹配的订单完成交易,删除该订单 orderIterator.remove(); completedOrders.add(matchOrder); } //判断交易单是否完成 if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { //交易完成 completedOrders.add(focusedOrder); //退出循环 exitLoop = true; break; } } if (mergeOrder.size() == 0) { mergeOrderIterator.remove(); } } } //如果还没有交易完,订单压入列表中 if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0 && canEnterList) { addLimitPriceOrder(focusedOrder); } //每个订单的匹配批量推送 handleExchangeTrade(exchangeTrades); if (completedOrders.size() > 0) { orderCompleted(completedOrders); TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate; sendTradePlateMessage(buyTradePlate, sellTradePlate); } } /** * 限价委托单与市价队列匹配 * * @param mpList 市价对手单队列 * @param focusedOrder 交易订单 */ public void matchLimitPriceWithMPList(LinkedList<OrderCoinsEntity> mpList, OrderCoinsEntity focusedOrder) { List<ExchangeTrade> exchangeTrades = new ArrayList<>(); List<OrderCoinsEntity> completedOrders = new ArrayList<>(); synchronized (mpList) { Iterator<OrderCoinsEntity> iterator = mpList.iterator(); while (iterator.hasNext()) { OrderCoinsEntity matchOrder = iterator.next(); ExchangeTrade trade = processMatch(focusedOrder, matchOrder); logger.info(">>>>>" + trade); if (trade != null) { exchangeTrades.add(trade); } //判断匹配单是否完成,市价单amount为成交量 if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { iterator.remove(); completedOrders.add(matchOrder); } //判断吃单是否完成,判断成交量是否完成 if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { //交易完成 completedOrders.add(focusedOrder); //退出循环 break; } } } //如果还没有交易完,订单压入列表中 if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0) { addLimitPriceOrder(focusedOrder); } //每个订单的匹配批量推送 handleExchangeTrade(exchangeTrades); orderCompleted(completedOrders); } /** * 市价委托单与限价对手单列表交易 * * @param lpList 限价对手单列表 * @param focusedOrder 待交易订单 */ public void matchMarketPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder) { List<ExchangeTrade> exchangeTrades = new ArrayList<>(); List<OrderCoinsEntity> completedOrders = new ArrayList<>(); // 加锁 同步 synchronized (lpList) { Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator(); boolean exitLoop = false; while (!exitLoop && mergeOrderIterator.hasNext()) { Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next(); MergeOrder mergeOrder = entry.getValue(); Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator(); while (orderIterator.hasNext()) { OrderCoinsEntity matchOrder = orderIterator.next(); //处理匹配 用户单和买卖盘内的委托单进行比较,得到可以交易的量 // 例如 市价买单 1000个 卖1 500 卖2 300 卖3 500 此时从卖1开始吃到卖3只剩300个 ExchangeTrade trade = processMatch(focusedOrder, matchOrder); if (trade != null) { exchangeTrades.add(trade); } //判断匹配单是否完成 if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { //当前匹配的订单完成交易,删除该订单 orderIterator.remove(); completedOrders.add(matchOrder); } //判断焦点订单是否完成 if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { completedOrders.add(focusedOrder); //退出循环 exitLoop = true; break; } } if (mergeOrder.size() == 0) { mergeOrderIterator.remove(); } } } //如果还没有交易完,订单压入列表中,市价买单按成交量算 if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0 || focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0) { addMarketPriceOrder(focusedOrder); } //每个订单的匹配批量推送 handleExchangeTrade(exchangeTrades); if (completedOrders.size() > 0) { orderCompleted(completedOrders); TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate; sendTradePlateMessage(buyTradePlate, sellTradePlate); } } /** * 计算委托单剩余可成交的数量 * * @param order 委托单 * @param dealPrice 成交价 * @return */ private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) { if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { //剩余成交量 TODO ? // 委托量-成交量=剩余量 BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount()); return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN); } else { return order.getEntrustCnt().subtract(order.getDealCnt()); } } /** * 调整市价单剩余成交额,当剩余成交额不足时设置订单完成 * * @param order * @param dealPrice * @return */ private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) { if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { // BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover()); // if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN) // .compareTo(BigDecimal.ZERO)==0){ // order.setTurnover(order.getAmount()); // return leftTurnover; // } } return BigDecimal.ZERO; } /** * 处理两个匹配的委托订单 * * @param focusedOrder 焦点单 * @param matchOrder 匹配单 * @return */ private ExchangeTrade processMatch(OrderCoinsEntity focusedOrder, OrderCoinsEntity matchOrder) { //需要交易的数量,成交量,成交价,可用数量 BigDecimal needAmount, dealPrice, availAmount; //如果匹配单是限价单,则以其价格为成交价 if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { dealPrice = matchOrder.getEntrustPrice(); } else { dealPrice = focusedOrder.getEntrustPrice(); } //成交价必须大于0 if (dealPrice.compareTo(BigDecimal.ZERO) <= 0) { return null; } // 需要的成交量 needAmount = calculateTradedAmount(focusedOrder, dealPrice); // 队列单可提供的成交量 availAmount = calculateTradedAmount(matchOrder, dealPrice); //计算成交量 取少的 BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount); logger.info("dealPrice={},amount={}", dealPrice, tradedAmount); //如果成交额为0说明剩余额度无法成交,退出 if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) { return null; } //计算成交额,成交额要保留足够精度 BigDecimal turnover = tradedAmount.multiply(dealPrice); matchOrder.setDealCnt(matchOrder.getDealCnt().add(tradedAmount)); // 成交金额 matchOrder.setDealAmount(matchOrder.getDealAmount().add(turnover)); // 用户单成交量 focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount)); // 用户单成交金额 focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover)); //创建成交记录 ExchangeTrade exchangeTrade = new ExchangeTrade(); exchangeTrade.setSymbol(symbol); // 成交量 exchangeTrade.setAmount(tradedAmount); exchangeTrade.setDirection(focusedOrder.getOrderType()); // 成交价格 exchangeTrade.setPrice(dealPrice); // 成交金额 exchangeTrade.setBuyTurnover(turnover); exchangeTrade.setSellTurnover(turnover); //校正市价单剩余成交额 if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == focusedOrder.getTradeType() && focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { BigDecimal adjustTurnover = adjustMarketOrderTurnover(focusedOrder, dealPrice); exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover)); } else if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == matchOrder.getTradeType() && matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { BigDecimal adjustTurnover = adjustMarketOrderTurnover(matchOrder, dealPrice); exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover)); } if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { exchangeTrade.setBuyOrderId(focusedOrder.getId()); exchangeTrade.setSellOrderId(matchOrder.getId()); } else { exchangeTrade.setBuyOrderId(matchOrder.getId()); exchangeTrade.setSellOrderId(focusedOrder.getId()); } exchangeTrade.setTime(Calendar.getInstance().getTimeInMillis()); if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { if (matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { buyTradePlate.remove(matchOrder, tradedAmount); } else { sellTradePlate.remove(matchOrder, tradedAmount); } } return exchangeTrade; } // 这里是交易完的单 public void handleExchangeTrade(List<ExchangeTrade> trades) { //logger.info("handleExchangeTrade:{}", trades); if (trades.size() > 0) { int maxSize = 1000; //发送消息,key为交易对符号 if (trades.size() > maxSize) { int size = trades.size(); for (int index = 0; index < size; index += maxSize) { int length = (size - index) > maxSize ? maxSize : size - index; List<ExchangeTrade> subTrades = trades.subList(index, index + length); exchangeProducer.sendHandleTrade(JSON.toJSONString(subTrades)); //orderCoinService.handleOrder(subTrades); } } else { trades.forEach(e -> { System.out.println(e); }); exchangeProducer.sendHandleTrade(JSON.toJSONString(trades)); //orderCoinService.handleOrder(trades); // kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades)); } // 更新最新K线 TODO } } /** * 订单完成,执行消息通知,订单数超1000个要拆分发送 * * @param orders */ public void orderCompleted(List<OrderCoinsEntity> orders) { //logger.info("orderCompleted ,order={}",orders); if (orders.size() > 0) { int maxSize = 1000; if (orders.size() > maxSize) { int size = orders.size(); for (int index = 0; index < size; index += maxSize) { int length = (size - index) > maxSize ? maxSize : size - index; List<OrderCoinsEntity> subOrders = orders.subList(index, index + length); // TODO 通知订单完成 //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders)); } } else { // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders)); } } } /** * 发送盘口变化消息 * * @param buyTradePlate sellTradePlate */ public void sendTradePlateMessage(TradePlate buyTradePlate, TradePlate sellTradePlate) { //防止并发引起数组越界,造成盘口倒挂 TODO List<List<BigDecimal>> plate; List<BigDecimal> plateItem; TradePlateModel tradePlateModel = new TradePlateModel(); // 转换格式 if (buyTradePlate != null && buyTradePlate.getItems() != null) { plate = new ArrayList<>(); LinkedList<TradePlateItem> items = buyTradePlate.getItems(); for (TradePlateItem item : items) { plateItem = new ArrayList<>(2); BigDecimal price = item.getPrice(); BigDecimal amount = item.getAmount(); plateItem.add(price); plateItem.add(amount); plate.add(plateItem); } tradePlateModel.setBuy(plate); } if (sellTradePlate != null && sellTradePlate.getItems() != null) { plate = new ArrayList<>(); LinkedList<TradePlateItem> items = sellTradePlate.getItems(); for (TradePlateItem item : items) { plateItem = new ArrayList<>(2); BigDecimal price = item.getPrice(); BigDecimal amount = item.getAmount(); plateItem.add(price); plateItem.add(amount); plate.add(plateItem); } tradePlateModel.setSell(plate); } // 盘口发生变化通知TODO exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel)); } /** * 取消委托订单 * * @param exchangeOrder * @return */ public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) { logger.info("cancelOrder,orderId={}", exchangeOrder.getId()); if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { //处理市价单 Iterator<OrderCoinsEntity> orderIterator; List<OrderCoinsEntity> list = null; if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { list = this.buyMarketQueue; } else { list = this.sellMarketQueue; } synchronized (list) { orderIterator = list.iterator(); while ((orderIterator.hasNext())) { OrderCoinsEntity order = orderIterator.next(); if (order.getId().equals(exchangeOrder.getId())) { orderIterator.remove(); onRemoveOrder(order); return order; } } } } else { //处理限价单 TreeMap<BigDecimal, MergeOrder> list = null; Iterator<MergeOrder> mergeOrderIterator; if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { list = this.buyLimitPriceQueue; } else { list = this.sellLimitPriceQueue; } synchronized (list) { MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice()); if (mergeOrder != null) { Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator(); while (orderIterator.hasNext()) { OrderCoinsEntity order = orderIterator.next(); if (order.getId().equals(exchangeOrder.getId())) { orderIterator.remove(); if (mergeOrder.size() == 0) { list.remove(exchangeOrder.getEntrustPrice()); } onRemoveOrder(order); return order; } } } } } return null; } public void onRemoveOrder(OrderCoinsEntity order) { if (order.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { buyTradePlate.remove(order); sendTradePlateMessage(buyTradePlate, sellTradePlate); } else { sellTradePlate.remove(order); sendTradePlateMessage(buyTradePlate, sellTradePlate); } } } public TradePlate getTradePlate(ExchangeOrderDirection direction) { if (direction == ExchangeOrderDirection.BUY) { return buyTradePlate; } else { return sellTradePlate; } } /** * 查询交易器里的订单 * * @param orderId * @param type * @param direction * @return */ public OrderCoinsEntity findOrder(String orderId, int type, int direction) { if (type == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { LinkedList<OrderCoinsEntity> list; if (direction == OrderCoinsEntity.ORDERTYPE_BUY) { list = this.buyMarketQueue; } else { list = this.sellMarketQueue; } synchronized (list) { Iterator<OrderCoinsEntity> orderIterator = list.iterator(); while ((orderIterator.hasNext())) { OrderCoinsEntity order = orderIterator.next(); if (order.getId().equals(orderId)) { return order; } } } } else { TreeMap<BigDecimal, MergeOrder> list; if (direction == OrderCoinsEntity.ORDERTYPE_BUY) { list = this.buyLimitPriceQueue; } else { list = this.sellLimitPriceQueue; } synchronized (list) { Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = list.entrySet().iterator(); while (mergeOrderIterator.hasNext()) { Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next(); MergeOrder mergeOrder = entry.getValue(); Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator(); while ((orderIterator.hasNext())) { OrderCoinsEntity order = orderIterator.next(); if (order.getId().equals(orderId)) { return order; } } } } } return null; } public TreeMap<BigDecimal, MergeOrder> getBuyLimitPriceQueue() { return buyLimitPriceQueue; } public LinkedList<OrderCoinsEntity> getBuyMarketQueue() { return buyMarketQueue; } public TreeMap<BigDecimal, MergeOrder> getSellLimitPriceQueue() { return sellLimitPriceQueue; } public LinkedList<OrderCoinsEntity> getSellMarketQueue() { return sellMarketQueue; } public void setCoinScale(int scale) { this.coinScale = scale; } public void setBaseCoinScale(int scale) { this.baseCoinScale = scale; } public boolean isTradingHalt() { return this.tradingHalt; } /** * 暂停交易,不接收新的订单 */ public void haltTrading() { this.tradingHalt = true; } /** * 恢复交易 */ public void resumeTrading() { this.tradingHalt = false; } public void stopTrading() { //TODO:停止交易,取消当前所有订单 } public boolean getReady() { return this.ready; } public void setReady(boolean ready) { this.ready = ready; } public void setClearTime(String clearTime) { this.clearTime = clearTime; } public int getLimitPriceOrderCount(ExchangeOrderDirection direction) { int count = 0; TreeMap<BigDecimal, MergeOrder> queue = direction == ExchangeOrderDirection.BUY ? buyLimitPriceQueue : sellLimitPriceQueue; Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = queue.entrySet().iterator(); while (mergeOrderIterator.hasNext()) { Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next(); MergeOrder mergeOrder = entry.getValue(); count += mergeOrder.size(); } return count; } public OrderCoinService getOrderCoinService() { return orderCoinService; } public void setOrderCoinService(OrderCoinService orderCoinService) { this.orderCoinService = orderCoinService; } } src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java
New file @@ -0,0 +1,40 @@ package com.xcong.excoin.trade; import org.springframework.stereotype.Service; import java.util.concurrent.ConcurrentHashMap; @Service public class CoinTraderFactory { private ConcurrentHashMap<String, CoinTrader> traderMap; public CoinTraderFactory() { traderMap = new ConcurrentHashMap<>(); } //添加,已存在的无法添加 public void addTrader(String symbol, CoinTrader trader) { if(!traderMap.containsKey(symbol)) { traderMap.put(symbol, trader); } } //重置,即使已经存在也会覆盖 public void resetTrader(String symbol, CoinTrader trader) { traderMap.put(symbol, trader); } public boolean containsTrader(String symbol) { return traderMap.containsKey(symbol); } public CoinTrader getTrader(String symbol) { return traderMap.get(symbol); } public ConcurrentHashMap<String, CoinTrader> getTraderMap() { return traderMap; } } src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java
New file @@ -0,0 +1,5 @@ package com.xcong.excoin.trade; public enum ExchangeOrderDirection { BUY,SELL; } src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
New file @@ -0,0 +1,27 @@ package com.xcong.excoin.trade; import com.alibaba.fastjson.JSON; import lombok.Data; import java.io.Serializable; import java.math.BigDecimal; /** * 撮合交易信息 */ @Data public class ExchangeTrade implements Serializable{ private String symbol; private BigDecimal price; private BigDecimal amount; private BigDecimal buyTurnover; private BigDecimal sellTurnover; private int direction; private Long buyOrderId; private Long sellOrderId; private Long time; @Override public String toString() { return JSON.toJSONString(this); } } src/main/java/com/xcong/excoin/trade/MergeOrder.java
New file @@ -0,0 +1,42 @@ package com.xcong.excoin.trade; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class MergeOrder { private List<OrderCoinsEntity> orders = new ArrayList<>(); //最后位置添加一个 public void add(OrderCoinsEntity order){ orders.add(order); } public OrderCoinsEntity get(){ return orders.get(0); } public int size(){ return orders.size(); } public BigDecimal getPrice(){ return orders.get(0).getEntrustPrice(); } public Iterator<OrderCoinsEntity> iterator(){ return orders.iterator(); } public BigDecimal getTotalAmount() { BigDecimal total = new BigDecimal(0); for(OrderCoinsEntity item : orders) { total = total.add(item.getEntrustCnt()); } return total; } } src/main/java/com/xcong/excoin/trade/TradePlate.java
New file @@ -0,0 +1,181 @@ package com.xcong.excoin.trade; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; import java.util.LinkedList; /** * 盘口信息 */ @Data @Slf4j public class TradePlate { private LinkedList<TradePlateItem> items; //最大深度 private int maxDepth = 100; //方向 订单类型 1、买入2、卖出 private int direction; private String symbol; public TradePlate(){ } public TradePlate(String symbol,int direction) { this.direction = direction; this.symbol = symbol; items = new LinkedList<>(); } public boolean add(OrderCoinsEntity exchangeOrder) { //log.info("add TradePlate order={}",exchangeOrder); synchronized (items) { int index = 0; if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { return false; } if (exchangeOrder.getOrderType() != direction) { return false; } if (items.size() > 0) { for (index = 0; index < items.size(); index++) { TradePlateItem item = items.get(index); if ((exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_BUY && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) > 0) || (exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_SELL && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) < 0)) { continue; } else if (item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) == 0) { BigDecimal deltaAmount = exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt()); item.setAmount(item.getAmount().add(deltaAmount)); return true; } else { break; } } } if(index < maxDepth) { TradePlateItem newItem = new TradePlateItem(); newItem.setAmount(exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt())); newItem.setPrice(exchangeOrder.getEntrustPrice()); items.add(index, newItem); } } return true; } public void remove(OrderCoinsEntity order,BigDecimal amount) { synchronized (items) { //log.info("items>>init_size={},orderPrice={}",items.size(),order.getPrice()); for (int index = 0; index < items.size(); index++) { TradePlateItem item = items.get(index); if (item.getPrice().compareTo(order.getEntrustPrice()) == 0) { item.setAmount(item.getAmount().subtract(amount)); if (item.getAmount().compareTo(BigDecimal.ZERO) <= 0) { items.remove(index); } //log.info("items>>final_size={},itemAmount={},itemPrice={}",items.size(),item.getAmount(),item.getPrice()); return; } } log.info("items>>return_size={}",items.size()); } } public void remove(OrderCoinsEntity order){ remove(order,order.getEntrustCnt().subtract(order.getDealCnt())); } public void setItems(LinkedList<TradePlateItem> items){ this.items = items; } public BigDecimal getHighestPrice(){ if(items.size() == 0) { return BigDecimal.ZERO; } if(direction == OrderCoinsEntity.ORDERTYPE_BUY){ return items.getFirst().getPrice(); } else{ return items.getLast().getPrice(); } } public int getDepth(){ return items.size(); } public BigDecimal getLowestPrice(){ if(items.size() == 0) { return BigDecimal.ZERO; } if(direction == OrderCoinsEntity.ORDERTYPE_BUY){ return items.getLast().getPrice(); } else{ return items.getFirst().getPrice(); } } /** * 获取委托量最大的档位 * @return */ public BigDecimal getMaxAmount(){ if(items.size() == 0) { return BigDecimal.ZERO; } BigDecimal amount = BigDecimal.ZERO; for(TradePlateItem item:items){ if(item.getAmount().compareTo(amount)>0){ amount = item.getAmount(); } } return amount; } /** * 获取委托量最小的档位 * @return */ public BigDecimal getMinAmount(){ if(items.size() == 0) { return BigDecimal.ZERO; } BigDecimal amount = items.getFirst().getAmount(); for(TradePlateItem item:items){ if(item.getAmount().compareTo(amount) < 0){ amount = item.getAmount(); } } return amount; } public JSONObject toJSON(){ JSONObject json = new JSONObject(); json.put("direction",direction); json.put("maxAmount",getMaxAmount()); json.put("minAmount",getMinAmount()); json.put("highestPrice",getHighestPrice()); json.put("lowestPrice",getLowestPrice()); json.put("symbol",getSymbol()); json.put("items",items); return json; } public JSONObject toJSON(int limit){ JSONObject json = new JSONObject(); json.put("direction",direction); json.put("maxAmount",getMaxAmount()); json.put("minAmount",getMinAmount()); json.put("highestPrice",getHighestPrice()); json.put("lowestPrice",getLowestPrice()); json.put("symbol",getSymbol()); json.put("items",items.size() > limit ? items.subList(0,limit) : items); return json; } } src/main/java/com/xcong/excoin/trade/TradePlateItem.java
New file @@ -0,0 +1,11 @@ package com.xcong.excoin.trade; import lombok.Data; import java.math.BigDecimal; @Data public class TradePlateItem { private BigDecimal price; private BigDecimal amount; } src/main/java/com/xcong/excoin/trade/TradePlateModel.java
New file @@ -0,0 +1,13 @@ package com.xcong.excoin.trade; import lombok.Data; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; @Data public class TradePlateModel { private List<List<BigDecimal>> buy = new ArrayList<>(); private List<List<BigDecimal>> sell = new ArrayList<>(); } src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -22,6 +22,8 @@ return "EOS/USDT"; case "etcusdt": return "ETC/USDT"; case "nekkusdt": return "NEKK/USDT"; default: return null; } @@ -43,6 +45,8 @@ return "EOS_NEW_PRICE"; case "ETC/USDT": return "ETC_NEW_PRICE"; case "NEKK/USDT": return "NEKK_NEW_PRICE"; default: return null; } src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
New file @@ -0,0 +1,220 @@ package com.xcong.excoin.websocket; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.utils.SpringContextHolder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @ServerEndpoint(value = "/trade/market") @Component public class TradePlateSendWebSocket { @Resource RedisUtils redisUtils; /** * 记录当前在线连接数 */ private static AtomicInteger onlineCount = new AtomicInteger(0); private static Map<String, Map<String, Session>> tradeplateClients = new ConcurrentHashMap<>(); private static Map<String, Map<String, Session>> klineClients = new ConcurrentHashMap<>(); /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) { onlineCount.incrementAndGet(); // 在线数加1 log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose(Session session) { onlineCount.decrementAndGet(); // 在线数减1 // Collection<Map<String, Session>> values = tradeplateClients.values(); // if(CollectionUtils.isNotEmpty(values)){ // for(Map<String,Session> map : values){ // map.remove(session.getId()); // } // } log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol //} JSONObject jsonObject = JSON.parseObject(message); // 盘口的判断 if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { String sub = jsonObject.get("sub").toString(); String symbol = sub.split("\\.")[1]; symbol = CoinTypeConvert.convert(symbol); if (tradeplateClients.containsKey(symbol)) { tradeplateClients.get(symbol).put(session.getId(), session); } else { Map<String, Session> map = new HashMap<>(); map.put(session.getId(), session); tradeplateClients.put(symbol, map); } } // 取消盘口订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { // `market.${symbol}.kline.${strPeriod} String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String key = symbol; if (tradeplateClients.containsKey(key)) { tradeplateClients.get(key).remove(session.getId()); } } // 最新K线订阅 // 根据消息判断这个用户需要订阅哪种数据 // {sub: `market.${symbol}.kline.${strPeriod}`, // symbol: symbol, // period: strPeriod //} // 取消订阅 {unsub: xxx(标识)} if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { // 订阅 String sub = jsonObject.get("sub").toString(); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; String key = symbol + "-" + period; if (klineClients.containsKey(key)) { // 有这个币种K线 Map<String, Session> stringSessionMap = klineClients.get(key); if (!stringSessionMap.containsKey(session.getId())) { stringSessionMap.put(session.getId(), session); } } else { Map<String, Session> stringSessionMap = new HashMap<>(); stringSessionMap.put(session.getId(), session); klineClients.put(key, stringSessionMap); } } // 取消订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { // `market.${symbol}.kline.${strPeriod} String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String strPeriod = split[3]; String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String key = symbol + "-" + strPeriod; if (klineClients.containsKey(key)) { klineClients.get(key).remove(session.getId()); } } // 历史K线订阅 // {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"} if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) { String sub = jsonObject.get("req").toString(); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; //String key = symbol+"-"+period; // String key = "KINE_BCH/USDT_1week"; String key = "KINE_{}_{}"; // 币币k线数据 key = StrUtil.format(key, symbol, period); RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); Object o = bean.get(key); sendMessageHistory(JSON.toJSONString(o), session); } } @OnError public void onError(Session session, Throwable error) { log.error("发生错误"); error.printStackTrace(); } /** * 群发消息 * * @param message 消息内容 */ public void sendMessagePlate(String message, Session fromSession) { if (tradeplateClients.containsKey("nekkusdt")) { Map<String, Session> nekk = tradeplateClients.get("nekkusdt"); for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { Session toSession = sessionEntry.getValue(); // 排除掉自己 //if (!fromSession.getId().equals(toSession.getId())) { log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } // } } } } public void sendMessageHistory(String message, Session toSession) { log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } } public void sendMessageKline(String symbol, String period, String message, Session fromSession) { String key = symbol + "-" + period; if (klineClients.containsKey(key)) { Map<String, Session> stringSessionMap = klineClients.get(key); for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { Session toSession = sessionEntry.getValue(); // 排除掉自己 //if (!fromSession.getId().equals(toSession.getId())) { log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } // } } } } } src/main/resources/mapper/member/MemberWalletCoinDao.xml
@@ -39,5 +39,21 @@ where id=#{id} </update> <update id="updateWalletBalance" parameterType="map"> update member_wallet_coin <set> <if test="availableBalance != null"> available_balance = IFNULL(available_balance, 0) + #{availableBalance}, </if> <if test="totalBalance != null"> total_balance = IFNULL(total_balance, 0) + #{totalBalance}, </if> <if test="frozenBalance != null"> frozen_balance = IFNULL(frozen_balance, 0) + #{frozenBalance}, </if> </set> where id=#{id} </update> </mapper> src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
@@ -1,38 +1,55 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.xcong.excoin.modules.coin.dao.OrderCoinDealDao"> <select id="selectAllWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal <where> <if test="memberId != null and memberId != ''"> and member_id = #{memberId} </if> </where> order by create_time desc <mapper namespace="com.xcong.excoin.modules.coin.dao.OrderCoinDealDao"> <select id="selectAllWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal <where> <if test="memberId != null and memberId != ''"> and member_id = #{memberId} </if> </where> order by create_time desc </select> <select id="selectOrderCoinDealByTime" resultType="com.xcong.excoin.trade.ExchangeTrade"> SELECT symbol symbol, deal_price price, symbol_cnt amount, deal_amount buyTurnover, deal_amount sellTurnover, order_type direction, create_time time from coins_order_deal where symbol = #{symbol} and order_type = 1 and order_status = 3 and create_time between #{startTime} and #{endTime} </select> <select id="selectAllWalletCoinOrderBySymbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal <where> <if test="memberId != null and memberId != ''"> and member_id = #{memberId} </if> <if test="symbol != null and symbol != ''"> and symbol = #{symbol} </if> </where> order by create_time desc </select> <select id="selectWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> <select id="selectAllWalletCoinOrderBySymbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal <where> <if test="memberId != null and memberId != ''"> and member_id = #{memberId} </if> <if test="symbol != null and symbol != ''"> and symbol = #{symbol} </if> </where> order by create_time desc </select> <select id="selectWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal where order_id= #{orderId} and member_id = #{memberId} </select> <select id="findAllWalletCoinOrderInPage" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> <select id="findAllWalletCoinOrderInPage" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal <if test="record != null"> <where> <if test="record.memberId != null" > <if test="record.memberId != null"> and member_id=#{record.memberId} </if> <if test="record.symbol != null and record.symbol != ''"> @@ -42,5 +59,5 @@ </if> order by create_time desc </select> </mapper> src/test/java/com/xcong/excoin/GuijiTest.java
@@ -116,8 +116,8 @@ e.printStackTrace(); } } }