From df1716a9abacac95261d686bdf0776bc7d6deca2 Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Sun, 13 Sep 2020 01:15:03 +0800 Subject: [PATCH] 撮合交易代码提交 --- src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java | 16 src/main/java/com/xcong/excoin/processor/MarketHandler.java | 21 src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java | 14 src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java | 4 src/main/java/com/xcong/excoin/trade/ExchangeTrade.java | 27 src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 220 ++++ src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java | 392 +++++++ src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java | 5 pom.xml | 5 src/main/java/com/xcong/excoin/trade/TradePlateModel.java | 13 src/main/java/com/xcong/excoin/processor/CoinProcessor.java | 54 + src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java | 40 src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java | 8 src/main/java/com/xcong/excoin/processor/CoinThumb.java | 26 src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java | 5 src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java | 17 src/test/java/com/xcong/excoin/GuijiTest.java | 4 src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java | 98 + src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 98 + src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 290 +++++ src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java | 56 + src/main/java/com/xcong/excoin/trade/TradePlateItem.java | 11 src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 65 + src/main/java/com/xcong/excoin/processor/MarketService.java | 125 ++ src/main/resources/mapper/member/MemberWalletCoinDao.xml | 16 src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java | 5 src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java | 3 src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java | 3 src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml | 73 src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java | 1 src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java | 47 src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java | 7 src/main/java/com/xcong/excoin/trade/MergeOrder.java | 42 src/main/java/com/xcong/excoin/trade/TradePlate.java | 181 +++ src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java | 1 src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java | 34 src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java | 159 +++ src/main/java/com/xcong/excoin/trade/CoinTrader.java | 762 +++++++++++++++ 38 files changed, 2,907 insertions(+), 41 deletions(-) diff --git a/pom.xml b/pom.xml index 0438973..c77623f 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,11 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> + <dependency> + <!-- websocket --> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-websocket</artifactId> + </dependency> <!-- <dependency>--> <!-- <groupId>org.springframework.security</groupId>--> diff --git a/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java b/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java index ca590aa..70888bd 100644 --- a/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java +++ b/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java @@ -14,7 +14,8 @@ ,BCH("BCH", "BCH/USDT") ,EOS("EOS", "EOS/USDT") ,XRP("XRP", "XRP/USDT") - ,ETC("ETC", "ETC/USDT"); + ,ETC("ETC", "ETC/USDT") + ,NEKK("NEKK", "NEKK/USDT"); private String name; diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java index 0a791bf..1389b69 100644 --- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java @@ -29,6 +29,11 @@ public static final String EXCHANGE_A = "biue-exchange-A"; + /** + * 撮合交易 + */ + public static final String EXCHANGE_B = "biue-exchange-B"; + // 开多止盈队列 public static final String QUEUE_MOREPRO = "QUEUE_MOREPRO_NEW"; @@ -51,6 +56,12 @@ // 平仓队列 public static final String QUEUE_CLOSETRADE = "QUEUE_CLOSETRADE_NEW"; + // 盘口队列 + public static final String QUEUE_TRADE_PLATE = "QUEUE_TRADE_PLATE"; + + // 处理交易 + public static final String QUEUE_HANDLE_TRADE = "QUEUE_HANDLE_TRADE"; + // 开多止盈路由键 public static final String ROUTINGKEY_MOREPRO = "ROUTINGKEY_MOREPRO"; @@ -71,6 +82,12 @@ public static final String ROUTINGKEY_PRICEOPERATE = "ROUTINGKEY_PRICEOPERATE"; // 平仓路由 public static final String ROUTINGKEY_CLOSETRADE = "ROUTINGKEY_CLOSETRADE"; + + // 盘口理路由 + public static final String ROUTINGKEY_TRADE_PLATE = "ROUTINGKEY_TRADE_PLATE"; + + // 交易订单处理 + public static final String ROUTINGKEY_HANDLE_TRADE = "ROUTINGKEY_HANDLE_TRADE"; @Resource private ConnectionFactory connectionFactory; @@ -94,6 +111,7 @@ public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_ONE); } + @Bean public Queue testQueue() { @@ -204,6 +222,27 @@ /** + * 盘口推送 + * + * @return + */ + @Bean + public Queue queuePlateTrade() { + return new Queue(QUEUE_TRADE_PLATE, true); + } + + /** + * 交易订单处理 + * + * @return + */ + @Bean + public Queue queueHandleTrade() { + return new Queue(QUEUE_HANDLE_TRADE, true); + } + + + /** * 开多止盈 * * @return @@ -286,4 +325,30 @@ return BindingBuilder.bind(queueCloseTrade()).to(orderExchange()).with(RabbitMqConfig.ROUTINGKEY_CLOSETRADE); } + + @Bean + public DirectExchange matchTradeExchange() { + return new DirectExchange(EXCHANGE_B); + } + + /** + * 盘口变化绑定 + * + * @return + */ + @Bean + public Binding bindingPlateTrade() { + return BindingBuilder.bind(queuePlateTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_TRADE_PLATE); + } + + /** + * 交易订单处理 + * + * @return + */ + @Bean + public Binding bindingHandleTrade() { + return BindingBuilder.bind(queueHandleTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE); + } + } diff --git a/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java b/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java new file mode 100644 index 0000000..31b5eeb --- /dev/null +++ b/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java @@ -0,0 +1,16 @@ +package com.xcong.excoin.configurations; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +@Configuration +public class WebSocketConfig { + /** + * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint + */ + @Bean + public ServerEndpointExporter serverEndpointExporter() { + return new ServerEndpointExporter(); + } +} diff --git a/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java b/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java index dfe6523..dce5e35 100644 --- a/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java @@ -53,6 +53,7 @@ .antMatchers("/api/member/getAppVersionInfo").permitAll() .antMatchers("/api/orderCoin/searchSymbolResultList").permitAll() .antMatchers("/api/orderCoin/findCollect").permitAll() + .antMatchers("/trade/**").permitAll() .anyRequest().authenticated() .and().apply(securityConfiguereAdapter()); } diff --git a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java index f9f4ece..ca953be 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java +++ b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java @@ -67,7 +67,12 @@ Integer tradeType = submitSalesWalletCoinOrderDto.getTradeType(); BigDecimal price = submitSalesWalletCoinOrderDto.getPrice(); BigDecimal amount = submitSalesWalletCoinOrderDto.getAmount(); - return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount); + if("NEKK".equals(symbol)){ + return orderCoinService.submitSalesWalletCoinOrderWithMatch(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount()); + + }else{ + return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount); + } } /** diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java index ab900b8..e38459d 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java +++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java @@ -1,7 +1,9 @@ package com.xcong.excoin.modules.coin.dao; +import java.util.Date; import java.util.List; +import com.xcong.excoin.trade.ExchangeTrade; import org.apache.ibatis.annotations.Param; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -21,5 +23,6 @@ IPage<OrderCoinsDealEntity> findAllWalletCoinOrderInPage(Page<OrderCoinsDealEntity> page, @Param("record") OrderCoinsDealEntity orderCoinsDealEntity); + List<ExchangeTrade> selectOrderCoinDealByTime(@Param("symbol")String symbol, @Param("startTime")Date startTime, @Param("endTime")Date endTime); } diff --git a/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java b/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java index 54cadd8..f536c5d 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java +++ b/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java @@ -57,6 +57,11 @@ * 成交价 */ private BigDecimal dealPrice; + + /** + * 市价委托时的委托金额 + */ + private BigDecimal entrustAmount; /** * 成交金额 */ diff --git a/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java b/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java index b15186a..1395732 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java +++ b/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java @@ -26,12 +26,16 @@ private Integer tradeType; @Min(0) - @NotNull(message = "数量不能为空") + //@NotNull(message = "数量不能为空") @ApiModelProperty(value = "数量", example = "100") private BigDecimal amount; - @NotNull(message = "建仓价不能为空") + //@NotNull(message = "建仓价不能为空") @ApiModelProperty(value = "建仓价", example = "20.0000") private BigDecimal price; + + @ApiModelProperty(value = "市价时输入的总金额", example = "20.0000") + private BigDecimal entrustAmount; + } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java index a8f2914..8bf291c 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java @@ -1,11 +1,13 @@ package com.xcong.excoin.modules.coin.service; import java.math.BigDecimal; +import java.util.List; import com.baomidou.mybatisplus.extension.service.IService; import com.xcong.excoin.common.response.Result; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.parameter.dto.FindAllWalletCoinOrderDto; +import com.xcong.excoin.trade.ExchangeTrade; public interface OrderCoinService extends IService<OrderCoinsEntity>{ @@ -15,6 +17,19 @@ Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount); + + /** + * 需要撮合交易的币种提交买卖单 + * @param symbol + * @param type + * @param tradeType + * @param price + * @param amount + * @param entrustAmount + * @return + */ + Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, + BigDecimal amount,BigDecimal entrustAmount); public Result getEntrustWalletCoinOrder(String symbol, Integer status); @@ -34,4 +49,6 @@ public void dealEntrustCoinOrder(); + public void handleOrder(List<ExchangeTrade> trades); + } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java index c96299f..200f4a7 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java @@ -1,14 +1,9 @@ package com.xcong.excoin.modules.coin.service.impl; import java.math.BigDecimal; +import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import javax.annotation.Resource; @@ -16,6 +11,10 @@ import com.xcong.excoin.modules.platform.entity.PlatformCnyUsdtExchangeEntity; import com.xcong.excoin.modules.platform.entity.PlatformSymbolsCoinEntity; +import com.xcong.excoin.trade.CoinTrader; +import com.xcong.excoin.trade.CoinTraderFactory; +import com.xcong.excoin.trade.ExchangeTrade; +import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -79,6 +78,10 @@ RedisUtils redisUtils; @Resource PlatformSymbolsCoinDao platformSymbolsCoinDao; + + @Resource + private CoinTraderFactory factory; + @Override public String generateSimpleSerialno(String userId) { @@ -309,6 +312,174 @@ record.setBalance(walletCoinUsdt.getAvailableBalance()); memberAccountFlowEntityDao.insert(record); + + return Result.ok(MessageSourceUtils.getString("order_service_0011")); + } + + @Override + public Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) { + //获取用户ID + Long memberId = 13L; + BigDecimal nowPriceinBigDecimal = price; + //查询当前价 + //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT"))); + + // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置 + symbol = symbol.toUpperCase(); + MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol); + if (ObjectUtil.isEmpty(walletCoin)) { + return Result.fail(MessageSourceUtils.getString("order_service_0003")); + } + // 查询交易设置 + PlatformTradeSettingEntity tradeSetting = platformTradeSettingDao.findTradeSetting(); + if (ObjectUtil.isEmpty(tradeSetting)) { + return Result.fail(MessageSourceUtils.getString("order_service_0009")); + } + // 手续费用(手续费=建仓价X数量X手续费率) + BigDecimal closingPrice ; + // 总费用分两种 1,限价交易 是价格*数量+手续费 2,市价交易 用户输入的金额+手续费 + //总费用 = 成交价*数量+手续费 + BigDecimal totalPayPrice ; + if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType) ){ + // 限价 + closingPrice = price.multiply(amount).multiply(tradeSetting.getCoinFeeRatio()); + totalPayPrice = price.multiply(amount).add(closingPrice); + }else{ + // 市价 + closingPrice = entrustAmount.multiply(tradeSetting.getCoinFeeRatio()); + totalPayPrice = entrustAmount.add(closingPrice); + } + // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice); + + String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue(); + MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode); + if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { + //买入,所需总费用跟用户USDT金额进行比较 + BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance(); + if (totalPayPrice.compareTo(availableBalance) > 0) { + return Result.fail(MessageSourceUtils.getString("order_service_0010")); + } + } else { + //卖出,用户币是否足够 + BigDecimal availableBalance = walletCoin.getAvailableBalance(); + if (amount.compareTo(availableBalance) > 0) { + return Result.fail(MessageSourceUtils.getString("order_service_0010")); + } + } + + // 首先将单插入到数据库主表(委托表) + // 创建订单 + OrderCoinsEntity order = new OrderCoinsEntity(); + //根据委托类型生成不同数据 + // 如果是限价交易直接插入主表数据 + order.setMemberId(memberId); + order.setOrderNo(generateSimpleSerialno(memberId.toString())); + order.setOrderType(type); + order.setSymbol(symbol); + //order.setMarkPrice(nowPrice); + + // 成交量 先设置为0 + order.setDealCnt(BigDecimal.ZERO); + // 成交价 + //order.setDealPrice(price); + // 成交金额 + order.setDealAmount(BigDecimal.ZERO); + order.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DODING); + order.setTradeType(tradeType); + // 手续费 + order.setFeeAmount(closingPrice); + if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType)){ + + // 限价 是需要价格和数量 可以得到成交金额 + // 下单量 + order.setEntrustCnt(amount); + // 下单价格 + order.setEntrustPrice(price); + order.setEntrustAmount(amount.multiply(price)); + + + }else{ + if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){ + // 市价 只有金额 + order.setEntrustAmount(entrustAmount); + }else{ + // 下单量 + order.setEntrustCnt(amount); + // 下单价格 + order.setEntrustPrice(price); + order.setEntrustAmount(amount.multiply(price)); + } + + } + orderCoinsDao.insert(order); + + //更新用户钱包信息 + //冻结相应的资产 + if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { + //如果是买入,所对应的币种增加,USDT账户减少金额 + BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice); + BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice); + walletCoinUsdt.setAvailableBalance(availableBalance); + walletCoinUsdt.setFrozenBalance(frozenBalance); + memberWalletCoinDao.updateById(walletCoinUsdt); + } else { + //如果是卖出,币种减少,USDT增加 + BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount); + BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount); + walletCoin.setAvailableBalance(availableBalance); + walletCoin.setFrozenBalance(frozenBalance); + memberWalletCoinDao.updateById(walletCoin); + } + // 加入到撮合 + CoinTrader trader = factory.getTrader(symbol); +// if(trader==null){ +// +// trader = new CoinTrader("NEKK"); +// //newTrader.setKafkaTemplate(kafkaTemplate); +// //newTrader.setBaseCoinScale(coin.getBaseCoinScale()); +// //newTrader.setCoinScale(coin.getCoinScale()); +// // newTrader.setPublishType(coin.getPublishType()); +// //newTrader.setClearTime(coin.getClearTime()); +// +// // 创建成功以后需要对未处理订单预处理 +// //log.info("======CoinTrader Process: " + symbol + "======"); +// List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList(); +// if(CollectionUtils.isNotEmpty(orders)){ +// List<OrderCoinsEntity> tradingOrders = new ArrayList<>(); +// List<OrderCoinsEntity> completedOrders = new ArrayList<>(); +// orders.forEach(order1 -> { +// tradingOrders.add(order1); +// }); +// try { +// trader.trade(tradingOrders); +// } catch (ParseException e) { +// e.printStackTrace(); +// // log.info("异常:trader.trade(tradingOrders);"); +// } +// } +// +// trader.setReady(true); +// factory.addTrader(symbol, trader); +// } + trader.trade(order); + + +// // 流水记录 TODO +// MemberAccountFlowEntity record = new MemberAccountFlowEntity(); +// record.setMemberId(memberId); +// if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { +// record.setPrice(totalPayPrice.setScale(4, BigDecimal.ROUND_DOWN)); +// record.setSource(MemberAccountFlowEntity.SOURCE_BUY + symbol); +// record.setRemark(MemberAccountFlowEntity.REMARK_BUY + symbol + ":" + amount); +// } else { +// record.setPrice(totalPayPrice.negate().setScale(4, BigDecimal.ROUND_DOWN)); +// record.setSource(MemberAccountFlowEntity.SOURCE_SALE + symbol); +// record.setRemark(MemberAccountFlowEntity.REMARK_SALE + symbol + ":" + amount); +// } +// record.setSymbol(symbol); +// record.setBalance(walletCoinUsdt.getAvailableBalance()); +// +// memberAccountFlowEntityDao.insert(record); return Result.ok(MessageSourceUtils.getString("order_service_0011")); } @@ -638,4 +809,109 @@ } } } + + public void handleOrder(List<ExchangeTrade> trades){ + // 处理撮合交易的订单 + for(ExchangeTrade exchangeTrade : trades){ + + BigDecimal amount = exchangeTrade.getAmount(); + Long buyOrderId = exchangeTrade.getBuyOrderId(); + BigDecimal buyTurnover = exchangeTrade.getBuyTurnover(); + int direction = exchangeTrade.getDirection(); + BigDecimal price = exchangeTrade.getPrice(); + Long sellOrderId = exchangeTrade.getSellOrderId(); + // 买卖单都需要处理 + // 买单 + OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId); + if(buyOrderCoinsEntity!=null){ + // 比较剩余的量 + BigDecimal dealAmount = buyOrderCoinsEntity.getDealAmount(); + // 单的总金额 + BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount(); + BigDecimal add = dealAmount.add(buyTurnover); + // 创建一个完成的单 + OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); + detail.setMemberId(buyOrderCoinsEntity.getMemberId()); + //detail.setOrderId(order.getId()); + detail.setOrderNo(buyOrderCoinsEntity.getOrderNo()); + detail.setOrderType(buyOrderCoinsEntity.getOrderType()); + detail.setTradeType(buyOrderCoinsEntity.getTradeType()); + detail.setSymbol(buyOrderCoinsEntity.getSymbol()); + detail.setSymbolCnt(amount); + detail.setEntrustPrice(buyOrderCoinsEntity.getEntrustPrice()); + detail.setDealPrice(price); + detail.setDealAmount(buyTurnover); + //detail.setFeeAmount(closingPrice); + detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); + orderCoinDealDao.insert(detail); + // 如果这个单成交完 更改状态 + if(add.compareTo(entrustAmount)>=0){ + OrderCoinsEntity update = new OrderCoinsEntity(); + update.setId(buyOrderId); + update.setDealAmount(entrustAmount); + update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount)); + update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); + update.setUpdateTime(new Date()); + orderCoinsDao.updateById(update); + }else{ + OrderCoinsEntity update = new OrderCoinsEntity(); + update.setId(buyOrderId); + update.setDealAmount(add); + update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount)); + update.setUpdateTime(new Date()); + orderCoinsDao.updateById(update); + } + } + // 卖单 + OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId); + if(sellOrderCoinsEntity!=null){ + // 比较剩余的量 + BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt(); + // 单的总数量 + BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt(); + BigDecimal add = dealAmount.add(amount); + // 创建一个完成的单 + OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); + detail.setMemberId(sellOrderCoinsEntity.getMemberId()); + //detail.setOrderId(order.getId()); + detail.setOrderNo(sellOrderCoinsEntity.getOrderNo()); + detail.setOrderType(sellOrderCoinsEntity.getOrderType()); + detail.setTradeType(sellOrderCoinsEntity.getTradeType()); + detail.setSymbol(sellOrderCoinsEntity.getSymbol()); + detail.setSymbolCnt(amount); + detail.setEntrustPrice(sellOrderCoinsEntity.getEntrustPrice()); + detail.setDealPrice(price); + detail.setDealAmount(buyTurnover); + //detail.setFeeAmount(closingPrice); + detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); + orderCoinDealDao.insert(detail); + // 如果这个单成交完 更改状态 + if(add.compareTo(entrustCnt)>=0){ + OrderCoinsEntity update = new OrderCoinsEntity(); + update.setId(sellOrderId); + // 总成交额 + update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); + update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); + update.setDealCnt(entrustCnt); + update.setUpdateTime(new Date()); + orderCoinsDao.updateById(update); + }else{ + // 未完成 + OrderCoinsEntity update = new OrderCoinsEntity(); + update.setId(sellOrderId); + // 总成交额 + update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); + update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount)); + update.setUpdateTime(new Date()); + orderCoinsDao.updateById(update); + } + // 卖币得到的usdt + MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol()); + if(memberWalletCoinEntity!=null){ + memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),buyTurnover,buyTurnover,null); + } + + } + } + } } diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java b/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java new file mode 100644 index 0000000..cdedee9 --- /dev/null +++ b/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java @@ -0,0 +1,14 @@ +package com.xcong.excoin.modules.exchange.service; + +import com.xcong.excoin.trade.ExchangeTrade; + +import java.util.List; + +public interface HandleKlineService { + + /** + * 处理交易后的最新K线 + * @param trades + */ + void handleExchangeOrderToKline(List<ExchangeTrade> trades); +} diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java new file mode 100644 index 0000000..7add4f8 --- /dev/null +++ b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java @@ -0,0 +1,56 @@ +package com.xcong.excoin.modules.exchange.service.impl; + +import cn.hutool.core.util.StrUtil; +import com.huobi.client.model.Candlestick; +import com.xcong.excoin.modules.exchange.service.HandleKlineService; +import com.xcong.excoin.processor.CoinProcessor; +import com.xcong.excoin.processor.CoinProcessorFactory; +import com.xcong.excoin.trade.ExchangeTrade; +import com.xcong.excoin.utils.CoinTypeConvert; +import com.xcong.excoin.utils.RedisUtils; +import org.apache.commons.collections.CollectionUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.math.BigDecimal; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +@Service +public class HandleKlineServiceImpl implements HandleKlineService { + + @Resource + private CoinProcessorFactory processorFactory; + + @Resource + private RedisUtils redisUtils; + + @Override + public void handleExchangeOrderToKline(List<ExchangeTrade> trades) { + if (CollectionUtils.isEmpty(trades)) { + return; + } + String symbol = trades.get(0).getSymbol(); + String symbolUsdt = symbol; + if(!symbol.contains("USDT")){ + symbolUsdt = symbol+"/USDT"; + } + CoinProcessor processor = processorFactory.getProcessor(symbol); + Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap(); + Collection<Candlestick> values = currentKlineMap.values(); + BigDecimal newPrice = trades.get(trades.size()-1).getPrice(); + for (Candlestick candlestick : values) { + for (ExchangeTrade exchangeTrade : trades) { + processor.processTrade(candlestick, exchangeTrade); + } + } + + // 存入redis,websocket去取 + String key = "NEW_KINE_{}"; + key = StrUtil.format(key, symbolUsdt); + redisUtils.set(key,currentKlineMap); + // 更新最新价 + redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice); + } +} diff --git a/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java b/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java index 66a50dc..de32153 100644 --- a/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java +++ b/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java @@ -22,4 +22,5 @@ int subFrozenBalance(@Param("memberId") Long memberId, @Param("id") Long id, @Param("amount") BigDecimal amount); int updateBlockBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("earlyBalance") BigDecimal earlyBalance, @Param("blockNumber") Integer blockNumber); + int updateWalletBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("totalBalance") BigDecimal totalBalance, @Param("frozenBalance") BigDecimal frozenBalance); } diff --git a/src/main/java/com/xcong/excoin/processor/CoinProcessor.java b/src/main/java/com/xcong/excoin/processor/CoinProcessor.java new file mode 100644 index 0000000..6400c54 --- /dev/null +++ b/src/main/java/com/xcong/excoin/processor/CoinProcessor.java @@ -0,0 +1,54 @@ +package com.xcong.excoin.processor; + + +import com.huobi.client.model.Candlestick; +import com.xcong.excoin.trade.ExchangeTrade; +import com.xcong.excoin.utils.RedisUtils; + +import java.util.List; +import java.util.Map; + +public interface CoinProcessor { + + void setIsHalt(boolean status); + + void setIsStopKLine(boolean stop); + + boolean isStopKline(); + /** + * 处理新生成的交易信息 + * @param trades + * @return + */ + void process(List<ExchangeTrade> trades); + + void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade); + /** + * 添加存储器 + * @param storage + */ + void addHandler(MarketHandler storage); + + CoinThumb getThumb(); + + void setMarketService(MarketService service); + + void generateKLine(int range, int field, long time); + + Candlestick getKLine(); + + void initializeThumb(); + + void autoGenerate(); + + void resetThumb(); + + // void setExchangeRate(CoinExchangeRate coinExchangeRate); + + void update24HVolume(long time); + + //void initializeUsdRate(); + Map<String,Candlestick> getCurrentKlineMap(); + + void setRedisUtils(RedisUtils redisUtils); +} diff --git a/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java b/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java new file mode 100644 index 0000000..8c3ebda --- /dev/null +++ b/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java @@ -0,0 +1,34 @@ +package com.xcong.excoin.processor; + + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Component +public class CoinProcessorFactory { + private ConcurrentHashMap<String, CoinProcessor> processorMap; + + public CoinProcessorFactory() { + processorMap = new ConcurrentHashMap<>(); + } + + public void addProcessor(String symbol, CoinProcessor processor) { + log.info("CoinProcessorFactory addProcessor = {}" + symbol); + processorMap.put(symbol, processor); + } + + public boolean containsProcessor(String symbol) { + return processorMap != null && processorMap.containsKey(symbol); + } + + public CoinProcessor getProcessor(String symbol) { + return processorMap.get(symbol); + } + + public ConcurrentHashMap<String, CoinProcessor> getProcessorMap() { + return processorMap; + } +} diff --git a/src/main/java/com/xcong/excoin/processor/CoinThumb.java b/src/main/java/com/xcong/excoin/processor/CoinThumb.java new file mode 100644 index 0000000..1e9a198 --- /dev/null +++ b/src/main/java/com/xcong/excoin/processor/CoinThumb.java @@ -0,0 +1,26 @@ +package com.xcong.excoin.processor; + +import lombok.Data; + +import java.math.BigDecimal; + +@Data +public class CoinThumb { + private String symbol; + private BigDecimal open = BigDecimal.ZERO; + private BigDecimal high= BigDecimal.ZERO; + private BigDecimal low= BigDecimal.ZERO; + private BigDecimal close=BigDecimal.ZERO; + private BigDecimal chg = BigDecimal.ZERO.setScale(2); + private BigDecimal change = BigDecimal.ZERO.setScale(2); + private BigDecimal volume = BigDecimal.ZERO.setScale(2); + private BigDecimal turnover= BigDecimal.ZERO; + //昨日收盘价 + private BigDecimal lastDayClose = BigDecimal.ZERO; + //交易币对usd汇率 + private BigDecimal usdRate; + //基币对usd的汇率 + private BigDecimal baseUsdRate; + // 交易區 + private int zone; +} diff --git a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java new file mode 100644 index 0000000..ca6dc28 --- /dev/null +++ b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java @@ -0,0 +1,392 @@ +package com.xcong.excoin.processor; + + +import com.alibaba.fastjson.JSON; +import com.huobi.client.model.Candlestick; +import com.xcong.excoin.trade.ExchangeTrade; +import com.xcong.excoin.utils.RedisUtils; +import lombok.ToString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 默认交易处理器,产生1mK线信息 + */ +@ToString +public class DefaultCoinProcessor implements CoinProcessor { + private Logger logger = LoggerFactory.getLogger(DefaultCoinProcessor.class); + private String symbol; + private String baseCoin; + private Candlestick currentKLine; + private List<MarketHandler> handlers; + private CoinThumb coinThumb; + private MarketService service; + private RedisUtils redisUtils; + //private CoinExchangeRate coinExchangeRate; + //是否暂时处理 + private Boolean isHalt = true; + //是否停止K线生成 + private Boolean stopKLine = false; + + /** + * 每个时间段的K线在生成后,生成一个最新的K线 + */ + private Map<String,Candlestick> currentKlineMap = new ConcurrentHashMap<>(); + + public DefaultCoinProcessor(String symbol, String baseCoin) { + //handlers = new ArrayList<>(); + createNewKLine(); + this.baseCoin = baseCoin; + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + + @Override + public void initializeThumb() { + Calendar calendar = Calendar.getInstance(); + //将秒、微秒字段置为0 + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + long nowTime = calendar.getTimeInMillis(); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.HOUR_OF_DAY, 0); + long firstTimeOfToday = calendar.getTimeInMillis(); + String period = "1min"; + logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime); + List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period); + coinThumb = new CoinThumb(); + synchronized (coinThumb) { + coinThumb.setSymbol(symbol); + for (Candlestick kline : lines) { + if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0) { + continue; + } + if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) { + coinThumb.setOpen(kline.getOpen()); + } + if (coinThumb.getHigh().compareTo(kline.getHigh()) < 0) { + coinThumb.setHigh(kline.getHigh()); + } + if (kline.getLow().compareTo(BigDecimal.ZERO) > 0 && coinThumb.getLow().compareTo(kline.getLow()) > 0) { + coinThumb.setLow(kline.getLow()); + } + if (kline.getClose().compareTo(BigDecimal.ZERO) > 0) { + coinThumb.setClose(kline.getClose()); + } + coinThumb.setVolume(coinThumb.getVolume().add(kline.getVolume())); + // TODO + coinThumb.setTurnover(coinThumb.getTurnover().add(kline.getAmount())); + } + coinThumb.setChange(coinThumb.getClose().subtract(coinThumb.getOpen())); + // 此处计算涨幅并没有以开盘价为标准,而是以最低价 + if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) { + coinThumb.setChg(coinThumb.getChange().divide(coinThumb.getLow(), 4, RoundingMode.UP)); + } + } + } + + public void createNewKLine() { + currentKLine = new Candlestick(); + synchronized (currentKLine) { + Calendar calendar = Calendar.getInstance(); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + //1Min时间要是下一整分钟的 + calendar.add(Calendar.MINUTE, 1); + currentKLine.setTimestamp(calendar.getTimeInMillis()); + // K线类型 + //currentKLine.setPeriod("1min"); + currentKLine.setCount(0); + } + } + + /** + * 00:00:00 时重置CoinThumb + */ + @Override + public void resetThumb() { + logger.info("reset coinThumb"); + synchronized (coinThumb) { + coinThumb.setOpen(BigDecimal.ZERO); + coinThumb.setHigh(BigDecimal.ZERO); + //设置昨收价格 + coinThumb.setLastDayClose(coinThumb.getClose()); + //coinThumb.setClose(BigDecimal.ZERO); + coinThumb.setLow(BigDecimal.ZERO); + coinThumb.setChg(BigDecimal.ZERO); + coinThumb.setChange(BigDecimal.ZERO); + } + } + +// @Override +// public void setExchangeRate(CoinExchangeRate coinExchangeRate) { +// this.coinExchangeRate = coinExchangeRate; +// } + + @Override + public void update24HVolume(long time) { + if(coinThumb!=null) { + synchronized (coinThumb) { + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(time); + calendar.add(Calendar.HOUR_OF_DAY, -24); + long timeStart = calendar.getTimeInMillis(); + // TODO + BigDecimal volume = service.findTradeVolume(this.symbol, timeStart, time); + coinThumb.setVolume(volume.setScale(4, RoundingMode.DOWN)); + } + } + } + +// @Override +// public void initializeUsdRate() { +// //logger.info("symbol = {} ,baseCoin = {}",this.symbol,this.baseCoin); +// BigDecimal baseUsdRate = coinExchangeRate.getUsdRate(baseCoin); +// coinThumb.setBaseUsdRate(baseUsdRate); +// //logger.info("setBaseUsdRate = ",baseUsdRate); +// BigDecimal multiply = coinThumb.getClose().multiply(coinExchangeRate.getUsdRate(baseCoin)); +// //logger.info("setUsdRate = ",multiply); +// coinThumb.setUsdRate(multiply); +// } + + + @Override + public void autoGenerate() { + DateFormat df = new SimpleDateFormat("HH:mm:ss"); + //logger.info("auto generate 1min kline in {},data={}", df.format(new Date(currentKLine.getTime())), JSON.toJSONString(currentKLine)); + if(coinThumb != null) { + synchronized (currentKLine) { + //没有成交价时存储上一笔成交价 + if(currentKLine.getOpen()==null){ + currentKLine.setOpen(BigDecimal.ZERO); + } + if (currentKLine.getOpen().compareTo(BigDecimal.ZERO) == 0) { + currentKLine.setOpen(coinThumb.getClose()); + currentKLine.setLow(coinThumb.getClose()); + currentKLine.setHigh(coinThumb.getClose()); + currentKLine.setClose(coinThumb.getClose()); + } + Calendar calendar = Calendar.getInstance(); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + currentKLine.setTimestamp(calendar.getTimeInMillis()); + handleKLineStorage(currentKLine); + createNewKLine(); + } + } + } + + @Override + public void setIsHalt(boolean status) { + this.isHalt = status; + } + + @Override + public void process(List<ExchangeTrade> trades) { + if (!isHalt) { + if (trades == null || trades.size() == 0) { + return; + } + synchronized (currentKLine) { + for (ExchangeTrade exchangeTrade : trades) { + //处理K线 + processTrade(currentKLine, exchangeTrade); + //处理今日概况信息 + logger.debug("处理今日概况信息"); + handleThumb(exchangeTrade); + //存储并推送成交信息 + handleTradeStorage(exchangeTrade); + } + } + } + } + + public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) { + if (kLine.getClose().compareTo(BigDecimal.ZERO) == 0) { + //第一次设置K线值 + kLine.setOpen(exchangeTrade.getPrice()); + kLine.setHigh(exchangeTrade.getPrice()); + kLine.setLow(exchangeTrade.getPrice()); + kLine.setClose(exchangeTrade.getPrice()); + } else { + kLine.setHigh(exchangeTrade.getPrice().max(kLine.getHigh())); + kLine.setLow(exchangeTrade.getPrice().min(kLine.getLow())); + kLine.setClose(exchangeTrade.getPrice()); + } + kLine.setCount(kLine.getCount() + 1); + kLine.setVolume(kLine.getVolume().add(exchangeTrade.getAmount())); + BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()); + // kLine.setTurnover(kLine.getTurnover().add(turnover)); + } + + public void handleTradeStorage(ExchangeTrade exchangeTrade) { + for (MarketHandler storage : handlers) { + storage.handleTrade(symbol, exchangeTrade, coinThumb); + } + } + + public void handleKLineStorage(Candlestick kLine) { + // 存储交易信息 TODO 发送最新的一根K线 +// for (MarketHandler storage : handlers) { +// storage.handleKLine(symbol, kLine); +// } + } + + public void handleThumb(ExchangeTrade exchangeTrade) { + logger.info("handleThumb symbol = {}", this.symbol); + synchronized (coinThumb) { + if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) { + //第一笔交易记为开盘价 + coinThumb.setOpen(exchangeTrade.getPrice()); + } + coinThumb.setHigh(exchangeTrade.getPrice().max(coinThumb.getHigh())); + if (coinThumb.getLow().compareTo(BigDecimal.ZERO) == 0) { + coinThumb.setLow(exchangeTrade.getPrice()); + } else { + coinThumb.setLow(exchangeTrade.getPrice().min(coinThumb.getLow())); + } + coinThumb.setClose(exchangeTrade.getPrice()); + coinThumb.setVolume(coinThumb.getVolume().add(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP)); + BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP); + coinThumb.setTurnover(coinThumb.getTurnover().add(turnover)); + BigDecimal change = coinThumb.getClose().subtract(coinThumb.getOpen()); + coinThumb.setChange(change); + if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) { + coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP)); + } + if ("USDT".equalsIgnoreCase(baseCoin)) { + logger.info("setUsdRate", exchangeTrade.getPrice()); + coinThumb.setUsdRate(exchangeTrade.getPrice()); + } else { + + } + //coinThumb.setBaseUsdRate(coinExchangeRate.getUsdRate(baseCoin)); + //coinThumb.setUsdRate(exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin))); + //logger.info("setUsdRate", exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin))); + //logger.info("thumb = {}", coinThumb); + } + } + + @Override + public void addHandler(MarketHandler storage) { + handlers.add(storage); + } + + + @Override + public CoinThumb getThumb() { + return coinThumb; + } + + @Override + public void setMarketService(MarketService service) { + this.service = service; + } + + @Override + public void generateKLine(int range, int field, long time) { + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(time); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long endTick = calendar.getTimeInMillis(); + String endTime = df.format(calendar.getTime()); + //往前推range个时间单位 + calendar.add(field, -range); + String fromTime = df.format(calendar.getTime()); + long startTick = calendar.getTimeInMillis(); + System.out.println("time range from " + fromTime + " to " + endTime); + List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick); + + Candlestick kLine = new Candlestick(); + kLine.setTimestamp(endTick); + kLine.setAmount(BigDecimal.ZERO); + kLine.setClose(BigDecimal.ZERO); + kLine.setLow(BigDecimal.ZERO); + kLine.setOpen(BigDecimal.ZERO); + kLine.setVolume(BigDecimal.ZERO); + kLine.setHigh(BigDecimal.ZERO); + String rangeUnit = ""; + if (field == Calendar.MINUTE) { + rangeUnit = "min"; + } else if (field == Calendar.HOUR_OF_DAY) { + rangeUnit = "hour"; + } else if (field == Calendar.DAY_OF_WEEK) { + rangeUnit = "week"; + } else if (field == Calendar.DAY_OF_YEAR) { + rangeUnit = "day"; + } else if (field == Calendar.MONTH) { + rangeUnit = "month"; + } + // kLine.setPeriod(range + rangeUnit); + String period = range + rangeUnit; + // 处理K线信息 + for (ExchangeTrade exchangeTrade : exchangeTrades) { + processTrade(kLine, exchangeTrade); + } + // 如果开盘价为0,则设置为前一个价格 + if(kLine.getOpen().compareTo(BigDecimal.ZERO) == 0) { + kLine.setOpen(coinThumb.getClose()); + kLine.setClose(coinThumb.getClose()); + kLine.setLow(coinThumb.getClose()); + kLine.setHigh(coinThumb.getClose()); + } + logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine)); + service.saveKLine(symbol,period, kLine); + // 生成一个对应的新K线 后续的交易会更新这个最新K线数据 + Candlestick newKline = new Candlestick(); + //kLine.setTimestamp(endTick); + newKline.setAmount(BigDecimal.ZERO); + newKline.setClose(kLine.getClose()); + newKline.setLow(kLine.getClose()); + // 开盘价是上个K线的收盘价 + newKline.setOpen(kLine.getClose()); + newKline.setVolume(BigDecimal.ZERO); + newKline.setHigh(kLine.getClose()); + currentKlineMap.put(period,newKline); + + // 存储昨日K线 + if("day".equals(rangeUnit)){ + redisUtils.set("NEKK/USDT",kLine); + } + } + + @Override + public Candlestick getKLine() { + return currentKLine; + } + + @Override + public void setIsStopKLine(boolean stop) { + this.stopKLine = stop; + } + + @Override + public boolean isStopKline() { + return this.stopKLine; + } + + + @Override + public Map<String, Candlestick> getCurrentKlineMap() { + return currentKlineMap; + } + + @Override + public void setRedisUtils(RedisUtils redisUtils) { + this.redisUtils = redisUtils; + } + + public void setCurrentKlineMap(Map<String, Candlestick> currentKlineMap) { + this.currentKlineMap = currentKlineMap; + } +} diff --git a/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java b/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java new file mode 100644 index 0000000..4583886 --- /dev/null +++ b/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java @@ -0,0 +1,5 @@ +package com.xcong.excoin.processor; + +public enum ExchangeOrderDirection { + BUY,SELL; +} diff --git a/src/main/java/com/xcong/excoin/processor/MarketHandler.java b/src/main/java/com/xcong/excoin/processor/MarketHandler.java new file mode 100644 index 0000000..526fa71 --- /dev/null +++ b/src/main/java/com/xcong/excoin/processor/MarketHandler.java @@ -0,0 +1,21 @@ +package com.xcong.excoin.processor; + +import com.huobi.client.model.Candlestick; +import com.xcong.excoin.trade.ExchangeTrade; + +public interface MarketHandler { + + /** + * 存储交易信息 + * @param exchangeTrade + */ + void handleTrade(String symbol, ExchangeTrade exchangeTrade, CoinThumb thumb); + + + /** + * 存储K线信息 + * + * @param kLine + */ + void handleKLine(String symbol, Candlestick kLine); +} diff --git a/src/main/java/com/xcong/excoin/processor/MarketService.java b/src/main/java/com/xcong/excoin/processor/MarketService.java new file mode 100644 index 0000000..11c6c56 --- /dev/null +++ b/src/main/java/com/xcong/excoin/processor/MarketService.java @@ -0,0 +1,125 @@ +package com.xcong.excoin.processor; + + +import com.huobi.client.model.Candlestick; +import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao; +import com.xcong.excoin.trade.ExchangeTrade; +import com.xcong.excoin.utils.RedisUtils; +import org.apache.commons.collections.CollectionUtils; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +@Service +public class MarketService { +// @Autowired +// private MongoTemplate mongoTemplate; + + @Resource + private OrderCoinDealDao orderCoinDealDao; + + @Resource + private RedisUtils redisUtils; + + public List<Candlestick> findAllKLine(String symbol, String peroid) { +// Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time")); +// Query query = new Query().with(sort).limit(1000); +// +// return mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+peroid); + return null; + } + + public List<Candlestick> findAllKLine(String symbol, long fromTime, long toTime, String period) { +// Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime)); +// Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time")); +// Query query = new Query(criteria).with(sort); +// List<KLine> kLines = mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+ period); +// return kLines; + String key = "KINE_" + symbol + "_" + period; + Object data = redisUtils.get(key); + List list = new ArrayList(); + if (data != null) { + list = (List) data; + } + return list; + } +// +// public ExchangeTrade findFirstTrade(String symbol,long fromTime,long toTime){ +// Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime)); +// Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time")); +// Query query = new Query(criteria).with(sort); +// return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol); +// } + +// public ExchangeTrade findLastTrade(String symbol,long fromTime,long toTime){ +// Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime)); +// Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time")); +// Query query = new Query(criteria).with(sort); +// return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol); +// } +// +// public ExchangeTrade findTrade(String symbol, long fromTime, long toTime, Sort.Order order){ +// Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime)); +// Sort sort = new Sort(order); +// Query query = new Query(criteria).with(sort); +// return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol); +// } + + public List<ExchangeTrade> findTradeByTimeRange(String symbol, long timeStart, long timeEnd) { +// Criteria criteria = Criteria.where("time").gte(timeStart).andOperator(Criteria.where("time").lt(timeEnd)); +// Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time")); +// Query query = new Query(criteria).with(sort); +// +// return mongoTemplate.find(query,ExchangeTrade.class,"exchange_trade_"+symbol); + return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart)); + // return null; + } + + public void saveKLine(String symbol, String period, Candlestick kLine) { + // 先获取 + String key = "KINE_" + symbol + "_" + period; + Object data = redisUtils.get(key); + List list = new ArrayList(); + if (data != null) { + list = (List) data; + } + list.add(kLine); + redisUtils.set("KINE_" + symbol + "_" + period, list); + // mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod()); + } + + /** + * 查找某时间段内的交易量 + * + * @param symbol + * @param timeStart + * @param timeEnd + * @return + */ + public BigDecimal findTradeVolume(String symbol, long timeStart, long timeEnd) { +// Criteria criteria = Criteria.where("time").gt(timeStart) +// .andOperator(Criteria.where("time").lte(timeEnd)); +// //.andOperator(Criteria.where("volume").gt(0)); +// Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time")); +// Query query = new Query(criteria).with(sort); +// List<KLine> kLines = mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_1min"); +// BigDecimal totalVolume = BigDecimal.ZERO; +// for(KLine kLine:kLines){ +// totalVolume = totalVolume.add(kLine.getVolume()); +// } +// return totalVolume; + List<ExchangeTrade> exchangeTrades = orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart)); + BigDecimal totalVolume = BigDecimal.ZERO; + if (CollectionUtils.isNotEmpty(exchangeTrades)) { + for (ExchangeTrade kLine : exchangeTrades) { + totalVolume = totalVolume.add(kLine.getBuyTurnover()); + } + } + + return totalVolume; + } +} diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java new file mode 100644 index 0000000..90a3e9c --- /dev/null +++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java @@ -0,0 +1,98 @@ +package com.xcong.excoin.quartz.job; + +import com.alibaba.fastjson.JSON; +import com.huobi.client.SubscriptionClient; +import com.huobi.client.SubscriptionOptions; +import com.huobi.client.model.Candlestick; +import com.huobi.client.model.enums.CandlestickInterval; +import com.xcong.excoin.modules.coin.dao.OrderCoinsDao; +import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; +import com.xcong.excoin.modules.coin.service.OrderCoinService; +import com.xcong.excoin.modules.symbols.service.SymbolsService; +import com.xcong.excoin.processor.CoinProcessor; +import com.xcong.excoin.processor.CoinProcessorFactory; +import com.xcong.excoin.processor.DefaultCoinProcessor; +import com.xcong.excoin.processor.MarketService; +import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; +import com.xcong.excoin.trade.CoinTrader; +import com.xcong.excoin.trade.CoinTraderFactory; +import com.xcong.excoin.utils.CoinTypeConvert; +import com.xcong.excoin.utils.RedisUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.math.BigDecimal; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; + +/** + * 开启撮合交易 + * + * @author wzy + * @date 2020-05-28 + **/ +@Slf4j +@Component +//@ConditionalOnProperty(prefix = "app", name = "trade", havingValue = "true") +public class CoinTradeInitJob { + + @Resource + private OrderCoinsDao orderCoinsDao; + + @Resource + private CoinTraderFactory factory; + + @Resource + private OrderCoinService coinService; + + @Resource + private MarketService marketService; + + @Resource + private CoinProcessorFactory processorFactory; + + @PostConstruct + public void initCoinTrade() { + log.info("#=======撮合交易器开启=======#"); + String symbol = "NEKK"; + CoinTrader newTrader = new CoinTrader(symbol); + newTrader.setOrderCoinService(coinService); + //newTrader.setKafkaTemplate(kafkaTemplate); + //newTrader.setBaseCoinScale(coin.getBaseCoinScale()); + //newTrader.setCoinScale(coin.getCoinScale()); + // newTrader.setPublishType(coin.getPublishType()); + //newTrader.setClearTime(coin.getClearTime()); + + // 创建成功以后需要对未处理订单预处理 + log.info("======CoinTrader Process: " + symbol + "======"); + List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList(); + List<OrderCoinsEntity> tradingOrders = new ArrayList<>(); + List<OrderCoinsEntity> completedOrders = new ArrayList<>(); + orders.forEach(order -> { + tradingOrders.add(order); + }); + try { + newTrader.trade(tradingOrders); + } catch (ParseException e) { + e.printStackTrace(); + log.info("异常:trader.trade(tradingOrders);"); + } + newTrader.setReady(true); + factory.addTrader(symbol, newTrader); + + // 创建K线生成器 + CoinProcessor processor = new DefaultCoinProcessor(symbol, "USDT"); + processor.setMarketService(marketService); + //processor.setExchangeRate(exchangeRate); + processor.initializeThumb(); + //processor.initializeUsdRate(); + processor.setIsHalt(false); + + processorFactory.addProcessor(symbol, processor); + } +} diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java new file mode 100644 index 0000000..110c1fc --- /dev/null +++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java @@ -0,0 +1,159 @@ +package com.xcong.excoin.quartz.job; + +import com.alibaba.fastjson.JSON; +import com.huobi.client.model.Candlestick; +import com.xcong.excoin.processor.CoinProcessorFactory; +import com.xcong.excoin.trade.TradePlateModel; +import com.xcong.excoin.utils.RedisUtils; +import com.xcong.excoin.websocket.TradePlateSendWebSocket; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; + +/** + * 生成各时间段的K线信息 + * + */ +@Component +@Slf4j +public class KLineGeneratorJob { + @Resource + private CoinProcessorFactory processorFactory; + + @Resource + private TradePlateSendWebSocket plateSendWebSocket; + + @Resource + private RedisUtils redisUtils; + + @Scheduled(cron = "0/2 * * * * *") + public void tradePlate(){ + redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random())); + Candlestick candlestick = new Candlestick(); + candlestick.setOpen(new BigDecimal("10.33")); + candlestick.setHigh(new BigDecimal("15.23")); + candlestick.setVolume(new BigDecimal("12121.34")); + candlestick.setLow(new BigDecimal("8.234")); + candlestick.setAmount(new BigDecimal("1199")); + candlestick.setTimestamp(1599840000); + candlestick.setId(1599840000L); + candlestick.setCount(100002); + candlestick.setClose(new BigDecimal("12.2323")); + redisUtils.set("NEKK/USDT",candlestick); + // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]] + TradePlateModel tradePlateModel = new TradePlateModel(); + List<BigDecimal> buy; + List<BigDecimal> sell; + for(int i=0;i<5;i++){ + buy = new ArrayList<>(2); + buy.add(new BigDecimal(Math.random()*i)); + buy.add(new BigDecimal(Math.random()*i)); + tradePlateModel.getBuy().add(buy); + sell = new ArrayList<>(2); + sell.add(new BigDecimal(Math.random()*i*2)); + sell.add(new BigDecimal(Math.random()*i*2)); + tradePlateModel.getSell().add(sell); + } + + plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null); + plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); + plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); + plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); + } + + /** + * 每分钟定时器,处理分钟K线 + */ + @Scheduled(cron = "0 * * * * *") + public void handle5minKLine(){ + + Calendar calendar = Calendar.getInstance(); + log.debug("分钟K线:{}",calendar.getTime()); + //将秒、微秒字段置为0 + calendar.set(Calendar.SECOND,0); + calendar.set(Calendar.MILLISECOND,0); + long time = calendar.getTimeInMillis(); + int minute = calendar.get(Calendar.MINUTE); + int hour = calendar.get(Calendar.HOUR_OF_DAY); + processorFactory.getProcessorMap().forEach((symbol,processor)->{ + if(!processor.isStopKline()) { + log.debug("生成{}分钟k线:{}",symbol); + //生成1分钟K线 + processor.autoGenerate(); + //更新24H成交量 + processor.update24HVolume(time); + if(minute%5 == 0) { + // 五分钟的当前K线 + processor.generateKLine(5, Calendar.MINUTE, time); + } + if(minute%10 == 0){ + processor.generateKLine(10, Calendar.MINUTE,time); + } + if(minute%15 == 0){ + processor.generateKLine(15, Calendar.MINUTE,time); + } + if(minute%30 == 0){ + processor.generateKLine(30, Calendar.MINUTE,time); + } + if(hour == 0 && minute == 0){ + processor.resetThumb(); + } + } + }); + } + + /** + * 每小时运行 + */ + @Scheduled(cron = "0 0 * * * *") + public void handleHourKLine(){ + processorFactory.getProcessorMap().forEach((symbol,processor)-> { + if(!processor.isStopKline()) { + Calendar calendar = Calendar.getInstance(); + log.info("小时K线:{}",calendar.getTime()); + //将秒、微秒字段置为0 + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + long time = calendar.getTimeInMillis(); + + processor.generateKLine(1, Calendar.HOUR_OF_DAY, time); + } + }); + } + + /** + * 每日0点处理器,处理日K线 + */ + @Scheduled(cron = "0 0 0 * * *") + public void handleDayKLine(){ + processorFactory.getProcessorMap().forEach((symbol,processor)->{ + if(!processor.isStopKline()) { + Calendar calendar = Calendar.getInstance(); + log.info("日K线:{}",calendar.getTime()); + //将秒、微秒字段置为0 + calendar.set(Calendar.HOUR_OF_DAY,0); + calendar.set(Calendar.MINUTE,0); + calendar.set(Calendar.SECOND,0); + calendar.set(Calendar.MILLISECOND,0); + long time = calendar.getTimeInMillis(); + int week = calendar.get(Calendar.DAY_OF_WEEK); + int dayOfMonth = calendar.get(Calendar.DAY_OF_MONTH); + if(week == 1){ + processor.generateKLine(1, Calendar.DAY_OF_WEEK, time); + } + if(dayOfMonth == 1){ + processor.generateKLine(1, Calendar.DAY_OF_MONTH, time); + } + processor.generateKLine(1, Calendar.DAY_OF_YEAR,time); + } + }); + } +} diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java new file mode 100644 index 0000000..9b03546 --- /dev/null +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java @@ -0,0 +1,98 @@ +package com.xcong.excoin.rabbit.consumer; + +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; +import com.huobi.client.model.Candlestick; +import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.modules.coin.service.OrderCoinService; +import com.xcong.excoin.modules.exchange.service.HandleKlineService; +import com.xcong.excoin.trade.ExchangeTrade; +import com.xcong.excoin.utils.RedisUtils; +import com.xcong.excoin.websocket.TradePlateSendWebSocket; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author wzy + * @date 2020-05-25 + **/ +@Slf4j +@Component +public class ExchangeConsumer { + + @Resource + private TradePlateSendWebSocket tradePlateSendWebSocket; + + @Resource + private RedisUtils redisUtils; + + @Resource + private HandleKlineService handleKlineService; + + @Resource + private OrderCoinService orderCoinService; + + /** + * 发送盘口信息 + * @param content + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) + public void tradePlate(String content) { + log.info("#---->{}#", content); + tradePlateSendWebSocket.sendMessagePlate(content,null); + } + + /** + * 处理订单 + * @param content + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE) + public void handleTradeExchange(String content) { + log.info("#---->{}#", content); + List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); + // 处理K线 并更新最新价 + handleKlineService.handleExchangeOrderToKline(exchangeTrades); + // 推送最新K线 + String symbol = exchangeTrades.get(0).getSymbol(); + String symbolUsdt = symbol; + if(!symbol.contains("USDT")){ + symbolUsdt = symbol+"/USDT"; + } + String key = "NEW_KINE_{}"; + key = StrUtil.format(key, symbolUsdt); + Object o = redisUtils.get(key); + Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o; + Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); + for(Map.Entry<String, Candlestick> map : entries){ + tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null); + } + // 处理用户订单 + orderCoinService.handleOrder(exchangeTrades); + } + + /** + * 更新最新K线 + * @param content + */ +// @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) +// public void newKling(String content) { +// log.info("#---->{}#", content); +// // 最新K线的币种 +// String key = "NEW_KINE_{}"; +// key = StrUtil.format(key, content); +// Object o = redisUtils.get(key); +// Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o; +// // 推送最新K线 +// Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); +// for(Map.Entry<String, Candlestick> map : entries){ +// tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null); +// } +// +// } +} diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java new file mode 100644 index 0000000..42e67d4 --- /dev/null +++ b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java @@ -0,0 +1,47 @@ +package com.xcong.excoin.rabbit.producer; + +import cn.hutool.core.util.IdUtil; +import com.xcong.excoin.configurations.RabbitMqConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @date 2020-05-25 + **/ +@Slf4j +@Component +public class ExchangeProducer implements RabbitTemplate.ConfirmCallback { + + private RabbitTemplate rabbitTemplate; + + @Autowired + public ExchangeProducer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + rabbitTemplate.setConfirmCallback(this); + } + + public void sendPlateMsg(String content) { + CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_TRADE_PLATE, content, correlationData); + } + + public void sendHandleTrade(String content) { + CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE, content, correlationData); + } + + + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + log.info("#----->{}#", correlationData); + if (ack) { + log.info("success"); + } else { + log.info("--->{}", cause); + } + } +} diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java new file mode 100644 index 0000000..46289af --- /dev/null +++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java @@ -0,0 +1,762 @@ +package com.xcong.excoin.trade; + +import com.alibaba.fastjson.JSON; + +import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; +import com.xcong.excoin.modules.coin.service.OrderCoinService; +import com.xcong.excoin.rabbit.producer.ExchangeProducer; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.*; + +public class CoinTrader { + private String symbol; + private ExchangeProducer exchangeProducer; + private OrderCoinService orderCoinService; + //交易币种的精度 + private int coinScale = 4; + //基币的精度 + private int baseCoinScale = 4; + private Logger logger = LoggerFactory.getLogger(CoinTrader.class); + //买入限价订单链表,价格从高到低排列 + private TreeMap<BigDecimal, MergeOrder> buyLimitPriceQueue; + //卖出限价订单链表,价格从低到高排列 + private TreeMap<BigDecimal, MergeOrder> sellLimitPriceQueue; + //买入市价订单链表,按时间从小到大排序 + private LinkedList<OrderCoinsEntity> buyMarketQueue; + //卖出市价订单链表,按时间从小到大排序 + private LinkedList<OrderCoinsEntity> sellMarketQueue; + //卖盘盘口信息 + private TradePlate sellTradePlate; + //买盘盘口信息 + private TradePlate buyTradePlate; + //是否暂停交易 + private boolean tradingHalt = false; + private boolean ready = false; + //交易对信息 + //private ExchangeCoinPublishType publishType; + private String clearTime; + + private SimpleDateFormat dateTimeFormat; + + + public CoinTrader(String symbol) { + this.symbol = symbol; + initialize(); + } + + /** + * 初始化交易线程 + */ + public void initialize() { + logger.info("init CoinTrader for symbol {}", symbol); + //买单队列价格降序排列 + buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder()); + //卖单队列价格升序排列 + this.sellLimitPriceQueue = new TreeMap<>(Comparator.naturalOrder()); + this.buyMarketQueue = new LinkedList<>(); + this.sellMarketQueue = new LinkedList<>(); + this.sellTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_SELL); + this.buyTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_BUY); + this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + } + + /** + * 增加限价订单到队列,买入单按从价格高到低排,卖出单按价格从低到高排 + * + * @param exchangeOrder + */ + public void addLimitPriceOrder(OrderCoinsEntity exchangeOrder) { + if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { + return; + } + //logger.info("addLimitPriceOrder,orderId = {}", exchangeOrder.getOrderId()); + TreeMap<BigDecimal, MergeOrder> list; + if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + list = buyLimitPriceQueue; + // 添加盘口信息 TODO + buyTradePlate.add(exchangeOrder); + if (ready) { + // 发送盘口信息 + sendTradePlateMessage(buyTradePlate, sellTradePlate); + } + } else { + list = sellLimitPriceQueue; + sellTradePlate.add(exchangeOrder); + if (ready) { + sendTradePlateMessage(buyTradePlate, sellTradePlate); + } + } + synchronized (list) { + MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice()); + if (mergeOrder == null) { + mergeOrder = new MergeOrder(); + mergeOrder.add(exchangeOrder); + list.put(exchangeOrder.getEntrustPrice(), mergeOrder); + } else { + mergeOrder.add(exchangeOrder); + } + } + } + + public void addMarketPriceOrder(OrderCoinsEntity exchangeOrder) { + if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) { + return; + } + logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId()); + LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue; + synchronized (list) { + list.addLast(exchangeOrder); + } + } + + public void trade(List<OrderCoinsEntity> orders) throws ParseException { + if (tradingHalt) { + return; + } + for (OrderCoinsEntity order : orders) { + trade(order); + } + } + + + /** + * 主动交易输入的订单,交易不完成的会输入到队列 + * + * @param exchangeOrder + * @throws ParseException + */ + public void trade(OrderCoinsEntity exchangeOrder) { + if (tradingHalt) { + return; + } + //logger.info("trade order={}",exchangeOrder); + if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) { + logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT"); + return; + } + // 如果 + if (exchangeOrder.getEntrustAmount().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustAmount().subtract(exchangeOrder.getDealAmount()).compareTo(BigDecimal.ZERO) <= 0) { + return; + } + + TreeMap<BigDecimal, MergeOrder> limitPriceOrderList; + LinkedList<OrderCoinsEntity> marketPriceOrderList; + if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + // 买单 和限价卖单以及市价市场卖单队列对比 完成撮合 + limitPriceOrderList = sellLimitPriceQueue; + marketPriceOrderList = sellMarketQueue; + } else { + limitPriceOrderList = buyLimitPriceQueue; + marketPriceOrderList = buyMarketQueue; + } + if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { + //logger.info(">>>>>市价单>>>交易与限价单交易"); + //与限价单交易 + matchMarketPriceWithLPList(limitPriceOrderList, exchangeOrder); + } else if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { + //限价单价格必须大于0 + if (exchangeOrder.getEntrustPrice().compareTo(BigDecimal.ZERO) <= 0) { + return; + } + + //logger.info(">>>>>限价单>>>交易与限价单交易"); + //先与限价单交易 + matchLimitPriceWithLPList(limitPriceOrderList, exchangeOrder, false); + if (exchangeOrder.getEntrustCnt().compareTo(exchangeOrder.getDealCnt()) > 0) { + //logger.info(">>>>限价单未交易完>>>>与市价单交易>>>>"); + //后与市价单交易 + matchLimitPriceWithMPList(marketPriceOrderList, exchangeOrder); + } + } + } + + /** + * 限价委托单与限价队列匹配 + * + * @param lpList 限价对手单队列 + * @param focusedOrder 交易订单 + */ + public void matchLimitPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder, boolean canEnterList) { + List<ExchangeTrade> exchangeTrades = new ArrayList<>(); + List<OrderCoinsEntity> completedOrders = new ArrayList<>(); + synchronized (lpList) { + Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator(); + boolean exitLoop = false; + while (!exitLoop && mergeOrderIterator.hasNext()) { + Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next(); + MergeOrder mergeOrder = entry.getValue(); + Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator(); + //买入单需要匹配的价格不大于委托价,否则退出 + if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) > 0) { + break; + } + //卖出单需要匹配的价格不小于委托价,否则退出 + if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) < 0) { + break; + } + while (orderIterator.hasNext()) { + OrderCoinsEntity matchOrder = orderIterator.next(); + //处理匹配 + ExchangeTrade trade = processMatch(focusedOrder, matchOrder); + exchangeTrades.add(trade); + //判断匹配单是否完成 + if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { + //当前匹配的订单完成交易,删除该订单 + orderIterator.remove(); + completedOrders.add(matchOrder); + } + //判断交易单是否完成 + if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { + //交易完成 + completedOrders.add(focusedOrder); + //退出循环 + exitLoop = true; + break; + } + } + if (mergeOrder.size() == 0) { + mergeOrderIterator.remove(); + } + } + } + //如果还没有交易完,订单压入列表中 + if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0 && canEnterList) { + addLimitPriceOrder(focusedOrder); + } + //每个订单的匹配批量推送 + handleExchangeTrade(exchangeTrades); + if (completedOrders.size() > 0) { + orderCompleted(completedOrders); + TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate; + sendTradePlateMessage(buyTradePlate, sellTradePlate); + } + } + + /** + * 限价委托单与市价队列匹配 + * + * @param mpList 市价对手单队列 + * @param focusedOrder 交易订单 + */ + public void matchLimitPriceWithMPList(LinkedList<OrderCoinsEntity> mpList, OrderCoinsEntity focusedOrder) { + List<ExchangeTrade> exchangeTrades = new ArrayList<>(); + List<OrderCoinsEntity> completedOrders = new ArrayList<>(); + synchronized (mpList) { + Iterator<OrderCoinsEntity> iterator = mpList.iterator(); + while (iterator.hasNext()) { + OrderCoinsEntity matchOrder = iterator.next(); + ExchangeTrade trade = processMatch(focusedOrder, matchOrder); + logger.info(">>>>>" + trade); + if (trade != null) { + exchangeTrades.add(trade); + } + //判断匹配单是否完成,市价单amount为成交量 + if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { + iterator.remove(); + completedOrders.add(matchOrder); + } + //判断吃单是否完成,判断成交量是否完成 + if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { + //交易完成 + completedOrders.add(focusedOrder); + //退出循环 + break; + } + } + } + //如果还没有交易完,订单压入列表中 + if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0) { + addLimitPriceOrder(focusedOrder); + } + //每个订单的匹配批量推送 + handleExchangeTrade(exchangeTrades); + orderCompleted(completedOrders); + } + + + /** + * 市价委托单与限价对手单列表交易 + * + * @param lpList 限价对手单列表 + * @param focusedOrder 待交易订单 + */ + public void matchMarketPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder) { + List<ExchangeTrade> exchangeTrades = new ArrayList<>(); + List<OrderCoinsEntity> completedOrders = new ArrayList<>(); + // 加锁 同步 + synchronized (lpList) { + Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator(); + boolean exitLoop = false; + while (!exitLoop && mergeOrderIterator.hasNext()) { + Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next(); + MergeOrder mergeOrder = entry.getValue(); + Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator(); + while (orderIterator.hasNext()) { + OrderCoinsEntity matchOrder = orderIterator.next(); + //处理匹配 用户单和买卖盘内的委托单进行比较,得到可以交易的量 + // 例如 市价买单 1000个 卖1 500 卖2 300 卖3 500 此时从卖1开始吃到卖3只剩300个 + ExchangeTrade trade = processMatch(focusedOrder, matchOrder); + if (trade != null) { + exchangeTrades.add(trade); + } + //判断匹配单是否完成 + if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { + //当前匹配的订单完成交易,删除该订单 + orderIterator.remove(); + completedOrders.add(matchOrder); + } + //判断焦点订单是否完成 + if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { + completedOrders.add(focusedOrder); + //退出循环 + exitLoop = true; + break; + } + } + if (mergeOrder.size() == 0) { + mergeOrderIterator.remove(); + } + } + } + //如果还没有交易完,订单压入列表中,市价买单按成交量算 + if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0 + || focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0) { + addMarketPriceOrder(focusedOrder); + } + //每个订单的匹配批量推送 + handleExchangeTrade(exchangeTrades); + if (completedOrders.size() > 0) { + orderCompleted(completedOrders); + TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate; + sendTradePlateMessage(buyTradePlate, sellTradePlate); + } + } + + /** + * 计算委托单剩余可成交的数量 + * + * @param order 委托单 + * @param dealPrice 成交价 + * @return + */ + private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) { + if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { + //剩余成交量 TODO ? + // 委托量-成交量=剩余量 + BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount()); + return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN); + } else { + return order.getEntrustCnt().subtract(order.getDealCnt()); + } + } + + /** + * 调整市价单剩余成交额,当剩余成交额不足时设置订单完成 + * + * @param order + * @param dealPrice + * @return + */ + private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) { + if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { +// BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover()); +// if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN) +// .compareTo(BigDecimal.ZERO)==0){ +// order.setTurnover(order.getAmount()); +// return leftTurnover; +// } + } + return BigDecimal.ZERO; + } + + /** + * 处理两个匹配的委托订单 + * + * @param focusedOrder 焦点单 + * @param matchOrder 匹配单 + * @return + */ + private ExchangeTrade processMatch(OrderCoinsEntity focusedOrder, OrderCoinsEntity matchOrder) { + //需要交易的数量,成交量,成交价,可用数量 + BigDecimal needAmount, dealPrice, availAmount; + //如果匹配单是限价单,则以其价格为成交价 + if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { + dealPrice = matchOrder.getEntrustPrice(); + } else { + dealPrice = focusedOrder.getEntrustPrice(); + } + //成交价必须大于0 + if (dealPrice.compareTo(BigDecimal.ZERO) <= 0) { + return null; + } + // 需要的成交量 + needAmount = calculateTradedAmount(focusedOrder, dealPrice); + // 队列单可提供的成交量 + availAmount = calculateTradedAmount(matchOrder, dealPrice); + //计算成交量 取少的 + BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount); + logger.info("dealPrice={},amount={}", dealPrice, tradedAmount); + //如果成交额为0说明剩余额度无法成交,退出 + if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) { + return null; + } + + //计算成交额,成交额要保留足够精度 + BigDecimal turnover = tradedAmount.multiply(dealPrice); + matchOrder.setDealCnt(matchOrder.getDealCnt().add(tradedAmount)); + // 成交金额 + matchOrder.setDealAmount(matchOrder.getDealAmount().add(turnover)); + // 用户单成交量 + focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount)); + // 用户单成交金额 + focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover)); + + //创建成交记录 + ExchangeTrade exchangeTrade = new ExchangeTrade(); + exchangeTrade.setSymbol(symbol); + // 成交量 + exchangeTrade.setAmount(tradedAmount); + exchangeTrade.setDirection(focusedOrder.getOrderType()); + // 成交价格 + exchangeTrade.setPrice(dealPrice); + // 成交金额 + exchangeTrade.setBuyTurnover(turnover); + exchangeTrade.setSellTurnover(turnover); + //校正市价单剩余成交额 + if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == focusedOrder.getTradeType() && focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + BigDecimal adjustTurnover = adjustMarketOrderTurnover(focusedOrder, dealPrice); + exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover)); + } else if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == matchOrder.getTradeType() && matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + BigDecimal adjustTurnover = adjustMarketOrderTurnover(matchOrder, dealPrice); + exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover)); + } + + if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + exchangeTrade.setBuyOrderId(focusedOrder.getId()); + exchangeTrade.setSellOrderId(matchOrder.getId()); + } else { + exchangeTrade.setBuyOrderId(matchOrder.getId()); + exchangeTrade.setSellOrderId(focusedOrder.getId()); + } + + exchangeTrade.setTime(Calendar.getInstance().getTimeInMillis()); + if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { + if (matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + buyTradePlate.remove(matchOrder, tradedAmount); + } else { + sellTradePlate.remove(matchOrder, tradedAmount); + } + } + return exchangeTrade; + } + + + // 这里是交易完的单 + public void handleExchangeTrade(List<ExchangeTrade> trades) { + //logger.info("handleExchangeTrade:{}", trades); + if (trades.size() > 0) { + int maxSize = 1000; + //发送消息,key为交易对符号 + if (trades.size() > maxSize) { + int size = trades.size(); + for (int index = 0; index < size; index += maxSize) { + int length = (size - index) > maxSize ? maxSize : size - index; + List<ExchangeTrade> subTrades = trades.subList(index, index + length); + exchangeProducer.sendHandleTrade(JSON.toJSONString(subTrades)); + //orderCoinService.handleOrder(subTrades); + } + } else { + trades.forEach(e -> { + System.out.println(e); + }); + exchangeProducer.sendHandleTrade(JSON.toJSONString(trades)); + //orderCoinService.handleOrder(trades); + // kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades)); + } + // 更新最新K线 TODO + + } + } + + /** + * 订单完成,执行消息通知,订单数超1000个要拆分发送 + * + * @param orders + */ + public void orderCompleted(List<OrderCoinsEntity> orders) { + //logger.info("orderCompleted ,order={}",orders); + if (orders.size() > 0) { + int maxSize = 1000; + if (orders.size() > maxSize) { + int size = orders.size(); + for (int index = 0; index < size; index += maxSize) { + int length = (size - index) > maxSize ? maxSize : size - index; + List<OrderCoinsEntity> subOrders = orders.subList(index, index + length); + // TODO 通知订单完成 + //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders)); + } + } else { + // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders)); + } + } + } + + /** + * 发送盘口变化消息 + * + * @param buyTradePlate sellTradePlate + */ + public void sendTradePlateMessage(TradePlate buyTradePlate, TradePlate sellTradePlate) { + //防止并发引起数组越界,造成盘口倒挂 TODO + List<List<BigDecimal>> plate; + List<BigDecimal> plateItem; + TradePlateModel tradePlateModel = new TradePlateModel(); + // 转换格式 + if (buyTradePlate != null && buyTradePlate.getItems() != null) { + plate = new ArrayList<>(); + LinkedList<TradePlateItem> items = buyTradePlate.getItems(); + for (TradePlateItem item : items) { + plateItem = new ArrayList<>(2); + BigDecimal price = item.getPrice(); + BigDecimal amount = item.getAmount(); + plateItem.add(price); + plateItem.add(amount); + plate.add(plateItem); + } + tradePlateModel.setBuy(plate); + } + + if (sellTradePlate != null && sellTradePlate.getItems() != null) { + plate = new ArrayList<>(); + LinkedList<TradePlateItem> items = sellTradePlate.getItems(); + for (TradePlateItem item : items) { + plateItem = new ArrayList<>(2); + BigDecimal price = item.getPrice(); + BigDecimal amount = item.getAmount(); + plateItem.add(price); + plateItem.add(amount); + plate.add(plateItem); + } + tradePlateModel.setSell(plate); + } + + // 盘口发生变化通知TODO + + exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel)); + } + + /** + * 取消委托订单 + * + * @param exchangeOrder + * @return + */ + public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) { + logger.info("cancelOrder,orderId={}", exchangeOrder.getId()); + if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { + //处理市价单 + Iterator<OrderCoinsEntity> orderIterator; + List<OrderCoinsEntity> list = null; + if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + list = this.buyMarketQueue; + } else { + list = this.sellMarketQueue; + } + synchronized (list) { + orderIterator = list.iterator(); + while ((orderIterator.hasNext())) { + OrderCoinsEntity order = orderIterator.next(); + if (order.getId().equals(exchangeOrder.getId())) { + orderIterator.remove(); + onRemoveOrder(order); + return order; + } + } + } + } else { + //处理限价单 + TreeMap<BigDecimal, MergeOrder> list = null; + Iterator<MergeOrder> mergeOrderIterator; + if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + list = this.buyLimitPriceQueue; + } else { + list = this.sellLimitPriceQueue; + } + synchronized (list) { + MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice()); + if (mergeOrder != null) { + Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator(); + while (orderIterator.hasNext()) { + OrderCoinsEntity order = orderIterator.next(); + if (order.getId().equals(exchangeOrder.getId())) { + orderIterator.remove(); + if (mergeOrder.size() == 0) { + list.remove(exchangeOrder.getEntrustPrice()); + } + onRemoveOrder(order); + return order; + } + } + } + } + } + return null; + } + + public void onRemoveOrder(OrderCoinsEntity order) { + if (order.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) { + if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) { + buyTradePlate.remove(order); + sendTradePlateMessage(buyTradePlate, sellTradePlate); + } else { + sellTradePlate.remove(order); + sendTradePlateMessage(buyTradePlate, sellTradePlate); + } + } + } + + + public TradePlate getTradePlate(ExchangeOrderDirection direction) { + if (direction == ExchangeOrderDirection.BUY) { + return buyTradePlate; + } else { + return sellTradePlate; + } + } + + + /** + * 查询交易器里的订单 + * + * @param orderId + * @param type + * @param direction + * @return + */ + public OrderCoinsEntity findOrder(String orderId, int type, int direction) { + if (type == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { + LinkedList<OrderCoinsEntity> list; + if (direction == OrderCoinsEntity.ORDERTYPE_BUY) { + list = this.buyMarketQueue; + } else { + list = this.sellMarketQueue; + } + synchronized (list) { + Iterator<OrderCoinsEntity> orderIterator = list.iterator(); + while ((orderIterator.hasNext())) { + OrderCoinsEntity order = orderIterator.next(); + if (order.getId().equals(orderId)) { + return order; + } + } + } + } else { + TreeMap<BigDecimal, MergeOrder> list; + if (direction == OrderCoinsEntity.ORDERTYPE_BUY) { + list = this.buyLimitPriceQueue; + } else { + list = this.sellLimitPriceQueue; + } + synchronized (list) { + Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = list.entrySet().iterator(); + while (mergeOrderIterator.hasNext()) { + Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next(); + MergeOrder mergeOrder = entry.getValue(); + Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator(); + while ((orderIterator.hasNext())) { + OrderCoinsEntity order = orderIterator.next(); + if (order.getId().equals(orderId)) { + return order; + } + } + } + } + } + return null; + } + + public TreeMap<BigDecimal, MergeOrder> getBuyLimitPriceQueue() { + return buyLimitPriceQueue; + } + + public LinkedList<OrderCoinsEntity> getBuyMarketQueue() { + return buyMarketQueue; + } + + public TreeMap<BigDecimal, MergeOrder> getSellLimitPriceQueue() { + return sellLimitPriceQueue; + } + + public LinkedList<OrderCoinsEntity> getSellMarketQueue() { + return sellMarketQueue; + } + + + public void setCoinScale(int scale) { + this.coinScale = scale; + } + + public void setBaseCoinScale(int scale) { + this.baseCoinScale = scale; + } + + public boolean isTradingHalt() { + return this.tradingHalt; + } + + /** + * 暂停交易,不接收新的订单 + */ + public void haltTrading() { + this.tradingHalt = true; + } + + /** + * 恢复交易 + */ + public void resumeTrading() { + this.tradingHalt = false; + } + + public void stopTrading() { + //TODO:停止交易,取消当前所有订单 + } + + public boolean getReady() { + return this.ready; + } + + public void setReady(boolean ready) { + this.ready = ready; + } + + public void setClearTime(String clearTime) { + this.clearTime = clearTime; + } + + public int getLimitPriceOrderCount(ExchangeOrderDirection direction) { + int count = 0; + TreeMap<BigDecimal, MergeOrder> queue = direction == ExchangeOrderDirection.BUY ? buyLimitPriceQueue : sellLimitPriceQueue; + Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = queue.entrySet().iterator(); + while (mergeOrderIterator.hasNext()) { + Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next(); + MergeOrder mergeOrder = entry.getValue(); + count += mergeOrder.size(); + } + return count; + } + + public OrderCoinService getOrderCoinService() { + return orderCoinService; + } + + public void setOrderCoinService(OrderCoinService orderCoinService) { + this.orderCoinService = orderCoinService; + } +} diff --git a/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java b/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java new file mode 100644 index 0000000..e4eb159 --- /dev/null +++ b/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java @@ -0,0 +1,40 @@ +package com.xcong.excoin.trade; + +import org.springframework.stereotype.Service; + +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class CoinTraderFactory { + + private ConcurrentHashMap<String, CoinTrader> traderMap; + + public CoinTraderFactory() { + traderMap = new ConcurrentHashMap<>(); + } + + //添加,已存在的无法添加 + public void addTrader(String symbol, CoinTrader trader) { + if(!traderMap.containsKey(symbol)) { + traderMap.put(symbol, trader); + } + } + + //重置,即使已经存在也会覆盖 + public void resetTrader(String symbol, CoinTrader trader) { + traderMap.put(symbol, trader); + } + + public boolean containsTrader(String symbol) { + return traderMap.containsKey(symbol); + } + + public CoinTrader getTrader(String symbol) { + return traderMap.get(symbol); + } + + public ConcurrentHashMap<String, CoinTrader> getTraderMap() { + return traderMap; + } + +} diff --git a/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java b/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java new file mode 100644 index 0000000..8da593e --- /dev/null +++ b/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java @@ -0,0 +1,5 @@ +package com.xcong.excoin.trade; + +public enum ExchangeOrderDirection { + BUY,SELL; +} diff --git a/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java b/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java new file mode 100644 index 0000000..ea56d87 --- /dev/null +++ b/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java @@ -0,0 +1,27 @@ +package com.xcong.excoin.trade; + +import com.alibaba.fastjson.JSON; +import lombok.Data; + +import java.io.Serializable; +import java.math.BigDecimal; + +/** + * 撮合交易信息 + */ +@Data +public class ExchangeTrade implements Serializable{ + private String symbol; + private BigDecimal price; + private BigDecimal amount; + private BigDecimal buyTurnover; + private BigDecimal sellTurnover; + private int direction; + private Long buyOrderId; + private Long sellOrderId; + private Long time; + @Override + public String toString() { + return JSON.toJSONString(this); + } +} diff --git a/src/main/java/com/xcong/excoin/trade/MergeOrder.java b/src/main/java/com/xcong/excoin/trade/MergeOrder.java new file mode 100644 index 0000000..76f1909 --- /dev/null +++ b/src/main/java/com/xcong/excoin/trade/MergeOrder.java @@ -0,0 +1,42 @@ +package com.xcong.excoin.trade; + +import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class MergeOrder { + private List<OrderCoinsEntity> orders = new ArrayList<>(); + + //最后位置添加一个 + public void add(OrderCoinsEntity order){ + orders.add(order); + } + + + public OrderCoinsEntity get(){ + return orders.get(0); + } + + public int size(){ + return orders.size(); + } + + public BigDecimal getPrice(){ + return orders.get(0).getEntrustPrice(); + } + + public Iterator<OrderCoinsEntity> iterator(){ + return orders.iterator(); + } + + public BigDecimal getTotalAmount() { + BigDecimal total = new BigDecimal(0); + for(OrderCoinsEntity item : orders) { + total = total.add(item.getEntrustCnt()); + } + return total; + } +} diff --git a/src/main/java/com/xcong/excoin/trade/TradePlate.java b/src/main/java/com/xcong/excoin/trade/TradePlate.java new file mode 100644 index 0000000..fe24e0b --- /dev/null +++ b/src/main/java/com/xcong/excoin/trade/TradePlate.java @@ -0,0 +1,181 @@ +package com.xcong.excoin.trade; + + +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.math.BigDecimal; +import java.util.LinkedList; + +/** + * 盘口信息 + */ +@Data +@Slf4j +public class TradePlate { + private LinkedList<TradePlateItem> items; + //最大深度 + private int maxDepth = 100; + //方向 订单类型 1、买入2、卖出 + private int direction; + private String symbol; + public TradePlate(){ + + } + + public TradePlate(String symbol,int direction) { + this.direction = direction; + this.symbol = symbol; + items = new LinkedList<>(); + } + + public boolean add(OrderCoinsEntity exchangeOrder) { + //log.info("add TradePlate order={}",exchangeOrder); + synchronized (items) { + int index = 0; + if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { + return false; + } + if (exchangeOrder.getOrderType() != direction) { + return false; + } + if (items.size() > 0) { + for (index = 0; index < items.size(); index++) { + TradePlateItem item = items.get(index); + if ((exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_BUY && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) > 0) + || (exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_SELL && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) < 0)) { + continue; + } else if (item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) == 0) { + BigDecimal deltaAmount = exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt()); + item.setAmount(item.getAmount().add(deltaAmount)); + return true; + } else { + break; + } + } + } + if(index < maxDepth) { + TradePlateItem newItem = new TradePlateItem(); + newItem.setAmount(exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt())); + newItem.setPrice(exchangeOrder.getEntrustPrice()); + items.add(index, newItem); + } + } + return true; + } + + public void remove(OrderCoinsEntity order,BigDecimal amount) { + synchronized (items) { + //log.info("items>>init_size={},orderPrice={}",items.size(),order.getPrice()); + for (int index = 0; index < items.size(); index++) { + TradePlateItem item = items.get(index); + if (item.getPrice().compareTo(order.getEntrustPrice()) == 0) { + item.setAmount(item.getAmount().subtract(amount)); + if (item.getAmount().compareTo(BigDecimal.ZERO) <= 0) { + items.remove(index); + } + //log.info("items>>final_size={},itemAmount={},itemPrice={}",items.size(),item.getAmount(),item.getPrice()); + return; + } + } + log.info("items>>return_size={}",items.size()); + } + } + + public void remove(OrderCoinsEntity order){ + remove(order,order.getEntrustCnt().subtract(order.getDealCnt())); + } + + public void setItems(LinkedList<TradePlateItem> items){ + this.items = items; + } + + public BigDecimal getHighestPrice(){ + if(items.size() == 0) { + return BigDecimal.ZERO; + } + if(direction == OrderCoinsEntity.ORDERTYPE_BUY){ + return items.getFirst().getPrice(); + } + else{ + return items.getLast().getPrice(); + } + } + + public int getDepth(){ + return items.size(); + } + + + public BigDecimal getLowestPrice(){ + if(items.size() == 0) { + return BigDecimal.ZERO; + } + if(direction == OrderCoinsEntity.ORDERTYPE_BUY){ + return items.getLast().getPrice(); + } + else{ + return items.getFirst().getPrice(); + } + } + + /** + * 获取委托量最大的档位 + * @return + */ + public BigDecimal getMaxAmount(){ + if(items.size() == 0) { + return BigDecimal.ZERO; + } + BigDecimal amount = BigDecimal.ZERO; + for(TradePlateItem item:items){ + if(item.getAmount().compareTo(amount)>0){ + amount = item.getAmount(); + } + } + return amount; + } + + /** + * 获取委托量最小的档位 + * @return + */ + public BigDecimal getMinAmount(){ + if(items.size() == 0) { + return BigDecimal.ZERO; + } + BigDecimal amount = items.getFirst().getAmount(); + for(TradePlateItem item:items){ + if(item.getAmount().compareTo(amount) < 0){ + amount = item.getAmount(); + } + } + return amount; + } + + public JSONObject toJSON(){ + JSONObject json = new JSONObject(); + json.put("direction",direction); + json.put("maxAmount",getMaxAmount()); + json.put("minAmount",getMinAmount()); + json.put("highestPrice",getHighestPrice()); + json.put("lowestPrice",getLowestPrice()); + json.put("symbol",getSymbol()); + json.put("items",items); + return json; + } + + public JSONObject toJSON(int limit){ + JSONObject json = new JSONObject(); + json.put("direction",direction); + json.put("maxAmount",getMaxAmount()); + json.put("minAmount",getMinAmount()); + json.put("highestPrice",getHighestPrice()); + json.put("lowestPrice",getLowestPrice()); + json.put("symbol",getSymbol()); + json.put("items",items.size() > limit ? items.subList(0,limit) : items); + return json; + } +} diff --git a/src/main/java/com/xcong/excoin/trade/TradePlateItem.java b/src/main/java/com/xcong/excoin/trade/TradePlateItem.java new file mode 100644 index 0000000..9a6f83a --- /dev/null +++ b/src/main/java/com/xcong/excoin/trade/TradePlateItem.java @@ -0,0 +1,11 @@ +package com.xcong.excoin.trade; + +import lombok.Data; + +import java.math.BigDecimal; + +@Data +public class TradePlateItem { + private BigDecimal price; + private BigDecimal amount; +} diff --git a/src/main/java/com/xcong/excoin/trade/TradePlateModel.java b/src/main/java/com/xcong/excoin/trade/TradePlateModel.java new file mode 100644 index 0000000..9760925 --- /dev/null +++ b/src/main/java/com/xcong/excoin/trade/TradePlateModel.java @@ -0,0 +1,13 @@ +package com.xcong.excoin.trade; + +import lombok.Data; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +@Data +public class TradePlateModel { + private List<List<BigDecimal>> buy = new ArrayList<>(); + private List<List<BigDecimal>> sell = new ArrayList<>(); +} diff --git a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java index fc2dc6c..e9ff270 100644 --- a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java +++ b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java @@ -22,6 +22,8 @@ return "EOS/USDT"; case "etcusdt": return "ETC/USDT"; + case "nekkusdt": + return "NEKK/USDT"; default: return null; } @@ -43,6 +45,8 @@ return "EOS_NEW_PRICE"; case "ETC/USDT": return "ETC_NEW_PRICE"; + case "NEKK/USDT": + return "NEKK_NEW_PRICE"; default: return null; } diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java new file mode 100644 index 0000000..b955f11 --- /dev/null +++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java @@ -0,0 +1,220 @@ +package com.xcong.excoin.websocket; + +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.common.contants.AppContants; +import com.xcong.excoin.utils.CoinTypeConvert; +import com.xcong.excoin.utils.RedisUtils; +import com.xcong.excoin.utils.SpringContextHolder; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@ServerEndpoint(value = "/trade/market") +@Component +public class TradePlateSendWebSocket { + @Resource + RedisUtils redisUtils; + + /** + * 记录当前在线连接数 + */ + private static AtomicInteger onlineCount = new AtomicInteger(0); + + + private static Map<String, Map<String, Session>> tradeplateClients = new ConcurrentHashMap<>(); + + private static Map<String, Map<String, Session>> klineClients = new ConcurrentHashMap<>(); + + /** + * 连接建立成功调用的方法 + */ + @OnOpen + public void onOpen(Session session) { + onlineCount.incrementAndGet(); // 在线数加1 + log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); + } + + /** + * 连接关闭调用的方法 + */ + @OnClose + public void onClose(Session session) { + onlineCount.decrementAndGet(); // 在线数减1 +// Collection<Map<String, Session>> values = tradeplateClients.values(); +// if(CollectionUtils.isNotEmpty(values)){ +// for(Map<String,Session> map : values){ +// map.remove(session.getId()); +// } +// } + log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); + } + + /** + * 收到客户端消息后调用的方法 + * + * @param message 客户端发送过来的消息 + */ + @OnMessage + public void onMessage(String message, Session session) { + log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); + // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol + //} + JSONObject jsonObject = JSON.parseObject(message); + // 盘口的判断 + if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { + String sub = jsonObject.get("sub").toString(); + String symbol = sub.split("\\.")[1]; + symbol = CoinTypeConvert.convert(symbol); + if (tradeplateClients.containsKey(symbol)) { + tradeplateClients.get(symbol).put(session.getId(), session); + } else { + Map<String, Session> map = new HashMap<>(); + map.put(session.getId(), session); + tradeplateClients.put(symbol, map); + } + } + // 取消盘口订阅 + if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { + // `market.${symbol}.kline.${strPeriod} + String unsub = jsonObject.get("unsub").toString(); + String[] split = unsub.split("\\."); + String symbol = split[1]; + symbol = CoinTypeConvert.convert(symbol); + String key = symbol; + if (tradeplateClients.containsKey(key)) { + tradeplateClients.get(key).remove(session.getId()); + } + } + + + // 最新K线订阅 + + // 根据消息判断这个用户需要订阅哪种数据 + // {sub: `market.${symbol}.kline.${strPeriod}`, + // symbol: symbol, + // period: strPeriod + //} + // 取消订阅 {unsub: xxx(标识)} + if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { + // 订阅 + String sub = jsonObject.get("sub").toString(); + String[] split = sub.split("\\."); + String symbol = split[1]; + symbol = CoinTypeConvert.convert(symbol); + String period = split[3]; + String key = symbol + "-" + period; + if (klineClients.containsKey(key)) { + // 有这个币种K线 + Map<String, Session> stringSessionMap = klineClients.get(key); + if (!stringSessionMap.containsKey(session.getId())) { + stringSessionMap.put(session.getId(), session); + } + } else { + Map<String, Session> stringSessionMap = new HashMap<>(); + stringSessionMap.put(session.getId(), session); + klineClients.put(key, stringSessionMap); + } + } + + // 取消订阅 + if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { + // `market.${symbol}.kline.${strPeriod} + String unsub = jsonObject.get("unsub").toString(); + String[] split = unsub.split("\\."); + String strPeriod = split[3]; + String symbol = split[1]; + symbol = CoinTypeConvert.convert(symbol); + String key = symbol + "-" + strPeriod; + if (klineClients.containsKey(key)) { + klineClients.get(key).remove(session.getId()); + } + } + + // 历史K线订阅 + // {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"} + if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) { + String sub = jsonObject.get("req").toString(); + String[] split = sub.split("\\."); + String symbol = split[1]; + symbol = CoinTypeConvert.convert(symbol); + String period = split[3]; + //String key = symbol+"-"+period; + // String key = "KINE_BCH/USDT_1week"; + String key = "KINE_{}_{}"; + // 币币k线数据 + key = StrUtil.format(key, symbol, period); + RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); + Object o = bean.get(key); + sendMessageHistory(JSON.toJSONString(o), session); + } + } + + @OnError + public void onError(Session session, Throwable error) { + log.error("发生错误"); + error.printStackTrace(); + } + + /** + * 群发消息 + * + * @param message 消息内容 + */ + public void sendMessagePlate(String message, Session fromSession) { + if (tradeplateClients.containsKey("nekkusdt")) { + Map<String, Session> nekk = tradeplateClients.get("nekkusdt"); + for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { + Session toSession = sessionEntry.getValue(); + // 排除掉自己 + //if (!fromSession.getId().equals(toSession.getId())) { + log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); + boolean open = toSession.isOpen(); + if (open) { + toSession.getAsyncRemote().sendText(message); + } + + // } + } + } + + } + + public void sendMessageHistory(String message, Session toSession) { + log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); + boolean open = toSession.isOpen(); + if (open) { + toSession.getAsyncRemote().sendText(message); + } + } + + public void sendMessageKline(String symbol, String period, String message, Session fromSession) { + String key = symbol + "-" + period; + if (klineClients.containsKey(key)) { + Map<String, Session> stringSessionMap = klineClients.get(key); + for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { + Session toSession = sessionEntry.getValue(); + // 排除掉自己 + //if (!fromSession.getId().equals(toSession.getId())) { + log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); + boolean open = toSession.isOpen(); + if (open) { + toSession.getAsyncRemote().sendText(message); + } + // } + } + } + } +} diff --git a/src/main/resources/mapper/member/MemberWalletCoinDao.xml b/src/main/resources/mapper/member/MemberWalletCoinDao.xml index 79979c5..e7fd894 100644 --- a/src/main/resources/mapper/member/MemberWalletCoinDao.xml +++ b/src/main/resources/mapper/member/MemberWalletCoinDao.xml @@ -39,5 +39,21 @@ where id=#{id} </update> + <update id="updateWalletBalance" parameterType="map"> + update member_wallet_coin + <set> + <if test="availableBalance != null"> + available_balance = IFNULL(available_balance, 0) + #{availableBalance}, + </if> + <if test="totalBalance != null"> + total_balance = IFNULL(total_balance, 0) + #{totalBalance}, + </if> + <if test="frozenBalance != null"> + frozen_balance = IFNULL(frozen_balance, 0) + #{frozenBalance}, + </if> + </set> + where id=#{id} + </update> + </mapper> \ No newline at end of file diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml index 03a89ae..303667f 100644 --- a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml +++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml @@ -1,38 +1,55 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > -<mapper namespace="com.xcong.excoin.modules.coin.dao.OrderCoinDealDao"> - - <select id="selectAllWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> - select * from coins_order_deal - <where> - <if test="memberId != null and memberId != ''"> - and member_id = #{memberId} - </if> - </where> - order by create_time desc +<mapper namespace="com.xcong.excoin.modules.coin.dao.OrderCoinDealDao"> + + <select id="selectAllWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> + select * from coins_order_deal + <where> + <if test="memberId != null and memberId != ''"> + and member_id = #{memberId} + </if> + </where> + order by create_time desc + </select> + + <select id="selectOrderCoinDealByTime" resultType="com.xcong.excoin.trade.ExchangeTrade"> + SELECT + symbol symbol, + deal_price price, + symbol_cnt amount, + deal_amount buyTurnover, + deal_amount sellTurnover, + order_type direction, + create_time time + from coins_order_deal + where symbol = #{symbol} + and order_type = 1 + and order_status = 3 + and create_time between #{startTime} and #{endTime} </select> - <select id="selectAllWalletCoinOrderBySymbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> - select * from coins_order_deal - <where> - <if test="memberId != null and memberId != ''"> - and member_id = #{memberId} - </if> - <if test="symbol != null and symbol != ''"> - and symbol = #{symbol} - </if> - </where> - order by create_time desc - </select> - - <select id="selectWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> + <select id="selectAllWalletCoinOrderBySymbol" + resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> + select * from coins_order_deal + <where> + <if test="memberId != null and memberId != ''"> + and member_id = #{memberId} + </if> + <if test="symbol != null and symbol != ''"> + and symbol = #{symbol} + </if> + </where> + order by create_time desc + </select> + + <select id="selectWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal where order_id= #{orderId} and member_id = #{memberId} </select> - - <select id="findAllWalletCoinOrderInPage" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> + + <select id="findAllWalletCoinOrderInPage" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal <if test="record != null"> <where> - <if test="record.memberId != null" > + <if test="record.memberId != null"> and member_id=#{record.memberId} </if> <if test="record.symbol != null and record.symbol != ''"> @@ -42,5 +59,5 @@ </if> order by create_time desc </select> - + </mapper> diff --git a/src/test/java/com/xcong/excoin/GuijiTest.java b/src/test/java/com/xcong/excoin/GuijiTest.java index 6b99989..8babbbe 100644 --- a/src/test/java/com/xcong/excoin/GuijiTest.java +++ b/src/test/java/com/xcong/excoin/GuijiTest.java @@ -116,8 +116,8 @@ e.printStackTrace(); } } - - + + } -- Gitblit v1.9.1