From df1716a9abacac95261d686bdf0776bc7d6deca2 Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Sun, 13 Sep 2020 01:15:03 +0800
Subject: [PATCH] 撮合交易代码提交
---
src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java | 16
src/main/java/com/xcong/excoin/processor/MarketHandler.java | 21
src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java | 14
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java | 4
src/main/java/com/xcong/excoin/trade/ExchangeTrade.java | 27
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 220 ++++
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java | 392 +++++++
src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java | 5
pom.xml | 5
src/main/java/com/xcong/excoin/trade/TradePlateModel.java | 13
src/main/java/com/xcong/excoin/processor/CoinProcessor.java | 54 +
src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java | 40
src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java | 8
src/main/java/com/xcong/excoin/processor/CoinThumb.java | 26
src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java | 5
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java | 17
src/test/java/com/xcong/excoin/GuijiTest.java | 4
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java | 98 +
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 98 +
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 290 +++++
src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java | 56 +
src/main/java/com/xcong/excoin/trade/TradePlateItem.java | 11
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 65 +
src/main/java/com/xcong/excoin/processor/MarketService.java | 125 ++
src/main/resources/mapper/member/MemberWalletCoinDao.xml | 16
src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java | 5
src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java | 3
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java | 3
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml | 73
src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java | 1
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java | 47
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java | 7
src/main/java/com/xcong/excoin/trade/MergeOrder.java | 42
src/main/java/com/xcong/excoin/trade/TradePlate.java | 181 +++
src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java | 1
src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java | 34
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java | 159 +++
src/main/java/com/xcong/excoin/trade/CoinTrader.java | 762 +++++++++++++++
38 files changed, 2,907 insertions(+), 41 deletions(-)
diff --git a/pom.xml b/pom.xml
index 0438973..c77623f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
+ <dependency>
+ <!-- websocket -->
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-websocket</artifactId>
+ </dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.security</groupId>-->
diff --git a/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java b/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
index ca590aa..70888bd 100644
--- a/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
+++ b/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
@@ -14,7 +14,8 @@
,BCH("BCH", "BCH/USDT")
,EOS("EOS", "EOS/USDT")
,XRP("XRP", "XRP/USDT")
- ,ETC("ETC", "ETC/USDT");
+ ,ETC("ETC", "ETC/USDT")
+ ,NEKK("NEKK", "NEKK/USDT");
private String name;
diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
index 0a791bf..1389b69 100644
--- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -29,6 +29,11 @@
public static final String EXCHANGE_A = "biue-exchange-A";
+ /**
+ * 撮合交易
+ */
+ public static final String EXCHANGE_B = "biue-exchange-B";
+
// 开多止盈队列
public static final String QUEUE_MOREPRO = "QUEUE_MOREPRO_NEW";
@@ -51,6 +56,12 @@
// 平仓队列
public static final String QUEUE_CLOSETRADE = "QUEUE_CLOSETRADE_NEW";
+ // 盘口队列
+ public static final String QUEUE_TRADE_PLATE = "QUEUE_TRADE_PLATE";
+
+ // 处理交易
+ public static final String QUEUE_HANDLE_TRADE = "QUEUE_HANDLE_TRADE";
+
// 开多止盈路由键
public static final String ROUTINGKEY_MOREPRO = "ROUTINGKEY_MOREPRO";
@@ -71,6 +82,12 @@
public static final String ROUTINGKEY_PRICEOPERATE = "ROUTINGKEY_PRICEOPERATE";
// 平仓路由
public static final String ROUTINGKEY_CLOSETRADE = "ROUTINGKEY_CLOSETRADE";
+
+ // 盘口理路由
+ public static final String ROUTINGKEY_TRADE_PLATE = "ROUTINGKEY_TRADE_PLATE";
+
+ // 交易订单处理
+ public static final String ROUTINGKEY_HANDLE_TRADE = "ROUTINGKEY_HANDLE_TRADE";
@Resource
private ConnectionFactory connectionFactory;
@@ -94,6 +111,7 @@
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE_ONE);
}
+
@Bean
public Queue testQueue() {
@@ -204,6 +222,27 @@
/**
+ * 盘口推送
+ *
+ * @return
+ */
+ @Bean
+ public Queue queuePlateTrade() {
+ return new Queue(QUEUE_TRADE_PLATE, true);
+ }
+
+ /**
+ * 交易订单处理
+ *
+ * @return
+ */
+ @Bean
+ public Queue queueHandleTrade() {
+ return new Queue(QUEUE_HANDLE_TRADE, true);
+ }
+
+
+ /**
* 开多止盈
*
* @return
@@ -286,4 +325,30 @@
return BindingBuilder.bind(queueCloseTrade()).to(orderExchange()).with(RabbitMqConfig.ROUTINGKEY_CLOSETRADE);
}
+
+ @Bean
+ public DirectExchange matchTradeExchange() {
+ return new DirectExchange(EXCHANGE_B);
+ }
+
+ /**
+ * 盘口变化绑定
+ *
+ * @return
+ */
+ @Bean
+ public Binding bindingPlateTrade() {
+ return BindingBuilder.bind(queuePlateTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_TRADE_PLATE);
+ }
+
+ /**
+ * 交易订单处理
+ *
+ * @return
+ */
+ @Bean
+ public Binding bindingHandleTrade() {
+ return BindingBuilder.bind(queueHandleTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE);
+ }
+
}
diff --git a/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java b/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java
new file mode 100644
index 0000000..31b5eeb
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java
@@ -0,0 +1,16 @@
+package com.xcong.excoin.configurations;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+@Configuration
+public class WebSocketConfig {
+ /**
+ * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
+ */
+ @Bean
+ public ServerEndpointExporter serverEndpointExporter() {
+ return new ServerEndpointExporter();
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java b/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
index dfe6523..dce5e35 100644
--- a/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
@@ -53,6 +53,7 @@
.antMatchers("/api/member/getAppVersionInfo").permitAll()
.antMatchers("/api/orderCoin/searchSymbolResultList").permitAll()
.antMatchers("/api/orderCoin/findCollect").permitAll()
+ .antMatchers("/trade/**").permitAll()
.anyRequest().authenticated()
.and().apply(securityConfiguereAdapter());
}
diff --git a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
index f9f4ece..ca953be 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -67,7 +67,12 @@
Integer tradeType = submitSalesWalletCoinOrderDto.getTradeType();
BigDecimal price = submitSalesWalletCoinOrderDto.getPrice();
BigDecimal amount = submitSalesWalletCoinOrderDto.getAmount();
- return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount);
+ if("NEKK".equals(symbol)){
+ return orderCoinService.submitSalesWalletCoinOrderWithMatch(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
+
+ }else{
+ return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount);
+ }
}
/**
diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
index ab900b8..e38459d 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
@@ -1,7 +1,9 @@
package com.xcong.excoin.modules.coin.dao;
+import java.util.Date;
import java.util.List;
+import com.xcong.excoin.trade.ExchangeTrade;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@@ -21,5 +23,6 @@
IPage<OrderCoinsDealEntity> findAllWalletCoinOrderInPage(Page<OrderCoinsDealEntity> page,
@Param("record") OrderCoinsDealEntity orderCoinsDealEntity);
+ List<ExchangeTrade> selectOrderCoinDealByTime(@Param("symbol")String symbol, @Param("startTime")Date startTime, @Param("endTime")Date endTime);
}
diff --git a/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java b/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java
index 54cadd8..f536c5d 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java
@@ -57,6 +57,11 @@
* 成交价
*/
private BigDecimal dealPrice;
+
+ /**
+ * 市价委托时的委托金额
+ */
+ private BigDecimal entrustAmount;
/**
* 成交金额
*/
diff --git a/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java b/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java
index b15186a..1395732 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java
@@ -26,12 +26,16 @@
private Integer tradeType;
@Min(0)
- @NotNull(message = "数量不能为空")
+ //@NotNull(message = "数量不能为空")
@ApiModelProperty(value = "数量", example = "100")
private BigDecimal amount;
- @NotNull(message = "建仓价不能为空")
+ //@NotNull(message = "建仓价不能为空")
@ApiModelProperty(value = "建仓价", example = "20.0000")
private BigDecimal price;
+
+ @ApiModelProperty(value = "市价时输入的总金额", example = "20.0000")
+ private BigDecimal entrustAmount;
+
}
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
index a8f2914..8bf291c 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -1,11 +1,13 @@
package com.xcong.excoin.modules.coin.service;
import java.math.BigDecimal;
+import java.util.List;
import com.baomidou.mybatisplus.extension.service.IService;
import com.xcong.excoin.common.response.Result;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.parameter.dto.FindAllWalletCoinOrderDto;
+import com.xcong.excoin.trade.ExchangeTrade;
public interface OrderCoinService extends IService<OrderCoinsEntity>{
@@ -15,6 +17,19 @@
Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price,
BigDecimal amount);
+
+ /**
+ * 需要撮合交易的币种提交买卖单
+ * @param symbol
+ * @param type
+ * @param tradeType
+ * @param price
+ * @param amount
+ * @param entrustAmount
+ * @return
+ */
+ Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price,
+ BigDecimal amount,BigDecimal entrustAmount);
public Result getEntrustWalletCoinOrder(String symbol, Integer status);
@@ -34,4 +49,6 @@
public void dealEntrustCoinOrder();
+ public void handleOrder(List<ExchangeTrade> trades);
+
}
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
index c96299f..200f4a7 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -1,14 +1,9 @@
package com.xcong.excoin.modules.coin.service.impl;
import java.math.BigDecimal;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import javax.annotation.Resource;
@@ -16,6 +11,10 @@
import com.xcong.excoin.modules.platform.entity.PlatformCnyUsdtExchangeEntity;
import com.xcong.excoin.modules.platform.entity.PlatformSymbolsCoinEntity;
+import com.xcong.excoin.trade.CoinTrader;
+import com.xcong.excoin.trade.CoinTraderFactory;
+import com.xcong.excoin.trade.ExchangeTrade;
+import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -79,6 +78,10 @@
RedisUtils redisUtils;
@Resource
PlatformSymbolsCoinDao platformSymbolsCoinDao;
+
+ @Resource
+ private CoinTraderFactory factory;
+
@Override
public String generateSimpleSerialno(String userId) {
@@ -309,6 +312,174 @@
record.setBalance(walletCoinUsdt.getAvailableBalance());
memberAccountFlowEntityDao.insert(record);
+
+ return Result.ok(MessageSourceUtils.getString("order_service_0011"));
+ }
+
+ @Override
+ public Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) {
+ //获取用户ID
+ Long memberId = 13L;
+ BigDecimal nowPriceinBigDecimal = price;
+ //查询当前价
+ //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
+
+ // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
+ symbol = symbol.toUpperCase();
+ MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol);
+ if (ObjectUtil.isEmpty(walletCoin)) {
+ return Result.fail(MessageSourceUtils.getString("order_service_0003"));
+ }
+ // 查询交易设置
+ PlatformTradeSettingEntity tradeSetting = platformTradeSettingDao.findTradeSetting();
+ if (ObjectUtil.isEmpty(tradeSetting)) {
+ return Result.fail(MessageSourceUtils.getString("order_service_0009"));
+ }
+ // 手续费用(手续费=建仓价X数量X手续费率)
+ BigDecimal closingPrice ;
+ // 总费用分两种 1,限价交易 是价格*数量+手续费 2,市价交易 用户输入的金额+手续费
+ //总费用 = 成交价*数量+手续费
+ BigDecimal totalPayPrice ;
+ if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType) ){
+ // 限价
+ closingPrice = price.multiply(amount).multiply(tradeSetting.getCoinFeeRatio());
+ totalPayPrice = price.multiply(amount).add(closingPrice);
+ }else{
+ // 市价
+ closingPrice = entrustAmount.multiply(tradeSetting.getCoinFeeRatio());
+ totalPayPrice = entrustAmount.add(closingPrice);
+ }
+ // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice);
+
+ String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue();
+ MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode);
+ if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
+ //买入,所需总费用跟用户USDT金额进行比较
+ BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance();
+ if (totalPayPrice.compareTo(availableBalance) > 0) {
+ return Result.fail(MessageSourceUtils.getString("order_service_0010"));
+ }
+ } else {
+ //卖出,用户币是否足够
+ BigDecimal availableBalance = walletCoin.getAvailableBalance();
+ if (amount.compareTo(availableBalance) > 0) {
+ return Result.fail(MessageSourceUtils.getString("order_service_0010"));
+ }
+ }
+
+ // 首先将单插入到数据库主表(委托表)
+ // 创建订单
+ OrderCoinsEntity order = new OrderCoinsEntity();
+ //根据委托类型生成不同数据
+ // 如果是限价交易直接插入主表数据
+ order.setMemberId(memberId);
+ order.setOrderNo(generateSimpleSerialno(memberId.toString()));
+ order.setOrderType(type);
+ order.setSymbol(symbol);
+ //order.setMarkPrice(nowPrice);
+
+ // 成交量 先设置为0
+ order.setDealCnt(BigDecimal.ZERO);
+ // 成交价
+ //order.setDealPrice(price);
+ // 成交金额
+ order.setDealAmount(BigDecimal.ZERO);
+ order.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DODING);
+ order.setTradeType(tradeType);
+ // 手续费
+ order.setFeeAmount(closingPrice);
+ if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType)){
+
+ // 限价 是需要价格和数量 可以得到成交金额
+ // 下单量
+ order.setEntrustCnt(amount);
+ // 下单价格
+ order.setEntrustPrice(price);
+ order.setEntrustAmount(amount.multiply(price));
+
+
+ }else{
+ if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
+ // 市价 只有金额
+ order.setEntrustAmount(entrustAmount);
+ }else{
+ // 下单量
+ order.setEntrustCnt(amount);
+ // 下单价格
+ order.setEntrustPrice(price);
+ order.setEntrustAmount(amount.multiply(price));
+ }
+
+ }
+ orderCoinsDao.insert(order);
+
+ //更新用户钱包信息
+ //冻结相应的资产
+ if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
+ //如果是买入,所对应的币种增加,USDT账户减少金额
+ BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice);
+ BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice);
+ walletCoinUsdt.setAvailableBalance(availableBalance);
+ walletCoinUsdt.setFrozenBalance(frozenBalance);
+ memberWalletCoinDao.updateById(walletCoinUsdt);
+ } else {
+ //如果是卖出,币种减少,USDT增加
+ BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount);
+ BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount);
+ walletCoin.setAvailableBalance(availableBalance);
+ walletCoin.setFrozenBalance(frozenBalance);
+ memberWalletCoinDao.updateById(walletCoin);
+ }
+ // 加入到撮合
+ CoinTrader trader = factory.getTrader(symbol);
+// if(trader==null){
+//
+// trader = new CoinTrader("NEKK");
+// //newTrader.setKafkaTemplate(kafkaTemplate);
+// //newTrader.setBaseCoinScale(coin.getBaseCoinScale());
+// //newTrader.setCoinScale(coin.getCoinScale());
+// // newTrader.setPublishType(coin.getPublishType());
+// //newTrader.setClearTime(coin.getClearTime());
+//
+// // 创建成功以后需要对未处理订单预处理
+// //log.info("======CoinTrader Process: " + symbol + "======");
+// List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
+// if(CollectionUtils.isNotEmpty(orders)){
+// List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
+// List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+// orders.forEach(order1 -> {
+// tradingOrders.add(order1);
+// });
+// try {
+// trader.trade(tradingOrders);
+// } catch (ParseException e) {
+// e.printStackTrace();
+// // log.info("异常:trader.trade(tradingOrders);");
+// }
+// }
+//
+// trader.setReady(true);
+// factory.addTrader(symbol, trader);
+// }
+ trader.trade(order);
+
+
+// // 流水记录 TODO
+// MemberAccountFlowEntity record = new MemberAccountFlowEntity();
+// record.setMemberId(memberId);
+// if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
+// record.setPrice(totalPayPrice.setScale(4, BigDecimal.ROUND_DOWN));
+// record.setSource(MemberAccountFlowEntity.SOURCE_BUY + symbol);
+// record.setRemark(MemberAccountFlowEntity.REMARK_BUY + symbol + ":" + amount);
+// } else {
+// record.setPrice(totalPayPrice.negate().setScale(4, BigDecimal.ROUND_DOWN));
+// record.setSource(MemberAccountFlowEntity.SOURCE_SALE + symbol);
+// record.setRemark(MemberAccountFlowEntity.REMARK_SALE + symbol + ":" + amount);
+// }
+// record.setSymbol(symbol);
+// record.setBalance(walletCoinUsdt.getAvailableBalance());
+//
+// memberAccountFlowEntityDao.insert(record);
return Result.ok(MessageSourceUtils.getString("order_service_0011"));
}
@@ -638,4 +809,109 @@
}
}
}
+
+ public void handleOrder(List<ExchangeTrade> trades){
+ // 处理撮合交易的订单
+ for(ExchangeTrade exchangeTrade : trades){
+
+ BigDecimal amount = exchangeTrade.getAmount();
+ Long buyOrderId = exchangeTrade.getBuyOrderId();
+ BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
+ int direction = exchangeTrade.getDirection();
+ BigDecimal price = exchangeTrade.getPrice();
+ Long sellOrderId = exchangeTrade.getSellOrderId();
+ // 买卖单都需要处理
+ // 买单
+ OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId);
+ if(buyOrderCoinsEntity!=null){
+ // 比较剩余的量
+ BigDecimal dealAmount = buyOrderCoinsEntity.getDealAmount();
+ // 单的总金额
+ BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount();
+ BigDecimal add = dealAmount.add(buyTurnover);
+ // 创建一个完成的单
+ OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
+ detail.setMemberId(buyOrderCoinsEntity.getMemberId());
+ //detail.setOrderId(order.getId());
+ detail.setOrderNo(buyOrderCoinsEntity.getOrderNo());
+ detail.setOrderType(buyOrderCoinsEntity.getOrderType());
+ detail.setTradeType(buyOrderCoinsEntity.getTradeType());
+ detail.setSymbol(buyOrderCoinsEntity.getSymbol());
+ detail.setSymbolCnt(amount);
+ detail.setEntrustPrice(buyOrderCoinsEntity.getEntrustPrice());
+ detail.setDealPrice(price);
+ detail.setDealAmount(buyTurnover);
+ //detail.setFeeAmount(closingPrice);
+ detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
+ orderCoinDealDao.insert(detail);
+ // 如果这个单成交完 更改状态
+ if(add.compareTo(entrustAmount)>=0){
+ OrderCoinsEntity update = new OrderCoinsEntity();
+ update.setId(buyOrderId);
+ update.setDealAmount(entrustAmount);
+ update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount));
+ update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
+ update.setUpdateTime(new Date());
+ orderCoinsDao.updateById(update);
+ }else{
+ OrderCoinsEntity update = new OrderCoinsEntity();
+ update.setId(buyOrderId);
+ update.setDealAmount(add);
+ update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount));
+ update.setUpdateTime(new Date());
+ orderCoinsDao.updateById(update);
+ }
+ }
+ // 卖单
+ OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId);
+ if(sellOrderCoinsEntity!=null){
+ // 比较剩余的量
+ BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt();
+ // 单的总数量
+ BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt();
+ BigDecimal add = dealAmount.add(amount);
+ // 创建一个完成的单
+ OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
+ detail.setMemberId(sellOrderCoinsEntity.getMemberId());
+ //detail.setOrderId(order.getId());
+ detail.setOrderNo(sellOrderCoinsEntity.getOrderNo());
+ detail.setOrderType(sellOrderCoinsEntity.getOrderType());
+ detail.setTradeType(sellOrderCoinsEntity.getTradeType());
+ detail.setSymbol(sellOrderCoinsEntity.getSymbol());
+ detail.setSymbolCnt(amount);
+ detail.setEntrustPrice(sellOrderCoinsEntity.getEntrustPrice());
+ detail.setDealPrice(price);
+ detail.setDealAmount(buyTurnover);
+ //detail.setFeeAmount(closingPrice);
+ detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
+ orderCoinDealDao.insert(detail);
+ // 如果这个单成交完 更改状态
+ if(add.compareTo(entrustCnt)>=0){
+ OrderCoinsEntity update = new OrderCoinsEntity();
+ update.setId(sellOrderId);
+ // 总成交额
+ update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
+ update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
+ update.setDealCnt(entrustCnt);
+ update.setUpdateTime(new Date());
+ orderCoinsDao.updateById(update);
+ }else{
+ // 未完成
+ OrderCoinsEntity update = new OrderCoinsEntity();
+ update.setId(sellOrderId);
+ // 总成交额
+ update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
+ update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount));
+ update.setUpdateTime(new Date());
+ orderCoinsDao.updateById(update);
+ }
+ // 卖币得到的usdt
+ MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol());
+ if(memberWalletCoinEntity!=null){
+ memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),buyTurnover,buyTurnover,null);
+ }
+
+ }
+ }
+ }
}
diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java b/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java
new file mode 100644
index 0000000..cdedee9
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java
@@ -0,0 +1,14 @@
+package com.xcong.excoin.modules.exchange.service;
+
+import com.xcong.excoin.trade.ExchangeTrade;
+
+import java.util.List;
+
+public interface HandleKlineService {
+
+ /**
+ * 处理交易后的最新K线
+ * @param trades
+ */
+ void handleExchangeOrderToKline(List<ExchangeTrade> trades);
+}
diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
new file mode 100644
index 0000000..7add4f8
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
@@ -0,0 +1,56 @@
+package com.xcong.excoin.modules.exchange.service.impl;
+
+import cn.hutool.core.util.StrUtil;
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.modules.exchange.service.HandleKlineService;
+import com.xcong.excoin.processor.CoinProcessor;
+import com.xcong.excoin.processor.CoinProcessorFactory;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import com.xcong.excoin.utils.RedisUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class HandleKlineServiceImpl implements HandleKlineService {
+
+ @Resource
+ private CoinProcessorFactory processorFactory;
+
+ @Resource
+ private RedisUtils redisUtils;
+
+ @Override
+ public void handleExchangeOrderToKline(List<ExchangeTrade> trades) {
+ if (CollectionUtils.isEmpty(trades)) {
+ return;
+ }
+ String symbol = trades.get(0).getSymbol();
+ String symbolUsdt = symbol;
+ if(!symbol.contains("USDT")){
+ symbolUsdt = symbol+"/USDT";
+ }
+ CoinProcessor processor = processorFactory.getProcessor(symbol);
+ Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap();
+ Collection<Candlestick> values = currentKlineMap.values();
+ BigDecimal newPrice = trades.get(trades.size()-1).getPrice();
+ for (Candlestick candlestick : values) {
+ for (ExchangeTrade exchangeTrade : trades) {
+ processor.processTrade(candlestick, exchangeTrade);
+ }
+ }
+
+ // 存入redis,websocket去取
+ String key = "NEW_KINE_{}";
+ key = StrUtil.format(key, symbolUsdt);
+ redisUtils.set(key,currentKlineMap);
+ // 更新最新价
+ redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice);
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java b/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java
index 66a50dc..de32153 100644
--- a/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java
+++ b/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java
@@ -22,4 +22,5 @@
int subFrozenBalance(@Param("memberId") Long memberId, @Param("id") Long id, @Param("amount") BigDecimal amount);
int updateBlockBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("earlyBalance") BigDecimal earlyBalance, @Param("blockNumber") Integer blockNumber);
+ int updateWalletBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("totalBalance") BigDecimal totalBalance, @Param("frozenBalance") BigDecimal frozenBalance);
}
diff --git a/src/main/java/com/xcong/excoin/processor/CoinProcessor.java b/src/main/java/com/xcong/excoin/processor/CoinProcessor.java
new file mode 100644
index 0000000..6400c54
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/CoinProcessor.java
@@ -0,0 +1,54 @@
+package com.xcong.excoin.processor;
+
+
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.RedisUtils;
+
+import java.util.List;
+import java.util.Map;
+
+public interface CoinProcessor {
+
+ void setIsHalt(boolean status);
+
+ void setIsStopKLine(boolean stop);
+
+ boolean isStopKline();
+ /**
+ * 处理新生成的交易信息
+ * @param trades
+ * @return
+ */
+ void process(List<ExchangeTrade> trades);
+
+ void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade);
+ /**
+ * 添加存储器
+ * @param storage
+ */
+ void addHandler(MarketHandler storage);
+
+ CoinThumb getThumb();
+
+ void setMarketService(MarketService service);
+
+ void generateKLine(int range, int field, long time);
+
+ Candlestick getKLine();
+
+ void initializeThumb();
+
+ void autoGenerate();
+
+ void resetThumb();
+
+ // void setExchangeRate(CoinExchangeRate coinExchangeRate);
+
+ void update24HVolume(long time);
+
+ //void initializeUsdRate();
+ Map<String,Candlestick> getCurrentKlineMap();
+
+ void setRedisUtils(RedisUtils redisUtils);
+}
diff --git a/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java b/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java
new file mode 100644
index 0000000..8c3ebda
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java
@@ -0,0 +1,34 @@
+package com.xcong.excoin.processor;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+@Component
+public class CoinProcessorFactory {
+ private ConcurrentHashMap<String, CoinProcessor> processorMap;
+
+ public CoinProcessorFactory() {
+ processorMap = new ConcurrentHashMap<>();
+ }
+
+ public void addProcessor(String symbol, CoinProcessor processor) {
+ log.info("CoinProcessorFactory addProcessor = {}" + symbol);
+ processorMap.put(symbol, processor);
+ }
+
+ public boolean containsProcessor(String symbol) {
+ return processorMap != null && processorMap.containsKey(symbol);
+ }
+
+ public CoinProcessor getProcessor(String symbol) {
+ return processorMap.get(symbol);
+ }
+
+ public ConcurrentHashMap<String, CoinProcessor> getProcessorMap() {
+ return processorMap;
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/processor/CoinThumb.java b/src/main/java/com/xcong/excoin/processor/CoinThumb.java
new file mode 100644
index 0000000..1e9a198
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/CoinThumb.java
@@ -0,0 +1,26 @@
+package com.xcong.excoin.processor;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+public class CoinThumb {
+ private String symbol;
+ private BigDecimal open = BigDecimal.ZERO;
+ private BigDecimal high= BigDecimal.ZERO;
+ private BigDecimal low= BigDecimal.ZERO;
+ private BigDecimal close=BigDecimal.ZERO;
+ private BigDecimal chg = BigDecimal.ZERO.setScale(2);
+ private BigDecimal change = BigDecimal.ZERO.setScale(2);
+ private BigDecimal volume = BigDecimal.ZERO.setScale(2);
+ private BigDecimal turnover= BigDecimal.ZERO;
+ //昨日收盘价
+ private BigDecimal lastDayClose = BigDecimal.ZERO;
+ //交易币对usd汇率
+ private BigDecimal usdRate;
+ //基币对usd的汇率
+ private BigDecimal baseUsdRate;
+ // 交易區
+ private int zone;
+}
diff --git a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
new file mode 100644
index 0000000..ca6dc28
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -0,0 +1,392 @@
+package com.xcong.excoin.processor;
+
+
+import com.alibaba.fastjson.JSON;
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.RedisUtils;
+import lombok.ToString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 默认交易处理器,产生1mK线信息
+ */
+@ToString
+public class DefaultCoinProcessor implements CoinProcessor {
+ private Logger logger = LoggerFactory.getLogger(DefaultCoinProcessor.class);
+ private String symbol;
+ private String baseCoin;
+ private Candlestick currentKLine;
+ private List<MarketHandler> handlers;
+ private CoinThumb coinThumb;
+ private MarketService service;
+ private RedisUtils redisUtils;
+ //private CoinExchangeRate coinExchangeRate;
+ //是否暂时处理
+ private Boolean isHalt = true;
+ //是否停止K线生成
+ private Boolean stopKLine = false;
+
+ /**
+ * 每个时间段的K线在生成后,生成一个最新的K线
+ */
+ private Map<String,Candlestick> currentKlineMap = new ConcurrentHashMap<>();
+
+ public DefaultCoinProcessor(String symbol, String baseCoin) {
+ //handlers = new ArrayList<>();
+ createNewKLine();
+ this.baseCoin = baseCoin;
+ this.symbol = symbol;
+ }
+
+ public String getSymbol() {
+ return symbol;
+ }
+
+ @Override
+ public void initializeThumb() {
+ Calendar calendar = Calendar.getInstance();
+ //将秒、微秒字段置为0
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ long nowTime = calendar.getTimeInMillis();
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ long firstTimeOfToday = calendar.getTimeInMillis();
+ String period = "1min";
+ logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime);
+ List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period);
+ coinThumb = new CoinThumb();
+ synchronized (coinThumb) {
+ coinThumb.setSymbol(symbol);
+ for (Candlestick kline : lines) {
+ if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+ continue;
+ }
+ if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+ coinThumb.setOpen(kline.getOpen());
+ }
+ if (coinThumb.getHigh().compareTo(kline.getHigh()) < 0) {
+ coinThumb.setHigh(kline.getHigh());
+ }
+ if (kline.getLow().compareTo(BigDecimal.ZERO) > 0 && coinThumb.getLow().compareTo(kline.getLow()) > 0) {
+ coinThumb.setLow(kline.getLow());
+ }
+ if (kline.getClose().compareTo(BigDecimal.ZERO) > 0) {
+ coinThumb.setClose(kline.getClose());
+ }
+ coinThumb.setVolume(coinThumb.getVolume().add(kline.getVolume()));
+ // TODO
+ coinThumb.setTurnover(coinThumb.getTurnover().add(kline.getAmount()));
+ }
+ coinThumb.setChange(coinThumb.getClose().subtract(coinThumb.getOpen()));
+ // 此处计算涨幅并没有以开盘价为标准,而是以最低价
+ if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) {
+ coinThumb.setChg(coinThumb.getChange().divide(coinThumb.getLow(), 4, RoundingMode.UP));
+ }
+ }
+ }
+
+ public void createNewKLine() {
+ currentKLine = new Candlestick();
+ synchronized (currentKLine) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ //1Min时间要是下一整分钟的
+ calendar.add(Calendar.MINUTE, 1);
+ currentKLine.setTimestamp(calendar.getTimeInMillis());
+ // K线类型
+ //currentKLine.setPeriod("1min");
+ currentKLine.setCount(0);
+ }
+ }
+
+ /**
+ * 00:00:00 时重置CoinThumb
+ */
+ @Override
+ public void resetThumb() {
+ logger.info("reset coinThumb");
+ synchronized (coinThumb) {
+ coinThumb.setOpen(BigDecimal.ZERO);
+ coinThumb.setHigh(BigDecimal.ZERO);
+ //设置昨收价格
+ coinThumb.setLastDayClose(coinThumb.getClose());
+ //coinThumb.setClose(BigDecimal.ZERO);
+ coinThumb.setLow(BigDecimal.ZERO);
+ coinThumb.setChg(BigDecimal.ZERO);
+ coinThumb.setChange(BigDecimal.ZERO);
+ }
+ }
+
+// @Override
+// public void setExchangeRate(CoinExchangeRate coinExchangeRate) {
+// this.coinExchangeRate = coinExchangeRate;
+// }
+
+ @Override
+ public void update24HVolume(long time) {
+ if(coinThumb!=null) {
+ synchronized (coinThumb) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(time);
+ calendar.add(Calendar.HOUR_OF_DAY, -24);
+ long timeStart = calendar.getTimeInMillis();
+ // TODO
+ BigDecimal volume = service.findTradeVolume(this.symbol, timeStart, time);
+ coinThumb.setVolume(volume.setScale(4, RoundingMode.DOWN));
+ }
+ }
+ }
+
+// @Override
+// public void initializeUsdRate() {
+// //logger.info("symbol = {} ,baseCoin = {}",this.symbol,this.baseCoin);
+// BigDecimal baseUsdRate = coinExchangeRate.getUsdRate(baseCoin);
+// coinThumb.setBaseUsdRate(baseUsdRate);
+// //logger.info("setBaseUsdRate = ",baseUsdRate);
+// BigDecimal multiply = coinThumb.getClose().multiply(coinExchangeRate.getUsdRate(baseCoin));
+// //logger.info("setUsdRate = ",multiply);
+// coinThumb.setUsdRate(multiply);
+// }
+
+
+ @Override
+ public void autoGenerate() {
+ DateFormat df = new SimpleDateFormat("HH:mm:ss");
+ //logger.info("auto generate 1min kline in {},data={}", df.format(new Date(currentKLine.getTime())), JSON.toJSONString(currentKLine));
+ if(coinThumb != null) {
+ synchronized (currentKLine) {
+ //没有成交价时存储上一笔成交价
+ if(currentKLine.getOpen()==null){
+ currentKLine.setOpen(BigDecimal.ZERO);
+ }
+ if (currentKLine.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+ currentKLine.setOpen(coinThumb.getClose());
+ currentKLine.setLow(coinThumb.getClose());
+ currentKLine.setHigh(coinThumb.getClose());
+ currentKLine.setClose(coinThumb.getClose());
+ }
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ currentKLine.setTimestamp(calendar.getTimeInMillis());
+ handleKLineStorage(currentKLine);
+ createNewKLine();
+ }
+ }
+ }
+
+ @Override
+ public void setIsHalt(boolean status) {
+ this.isHalt = status;
+ }
+
+ @Override
+ public void process(List<ExchangeTrade> trades) {
+ if (!isHalt) {
+ if (trades == null || trades.size() == 0) {
+ return;
+ }
+ synchronized (currentKLine) {
+ for (ExchangeTrade exchangeTrade : trades) {
+ //处理K线
+ processTrade(currentKLine, exchangeTrade);
+ //处理今日概况信息
+ logger.debug("处理今日概况信息");
+ handleThumb(exchangeTrade);
+ //存储并推送成交信息
+ handleTradeStorage(exchangeTrade);
+ }
+ }
+ }
+ }
+
+ public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) {
+ if (kLine.getClose().compareTo(BigDecimal.ZERO) == 0) {
+ //第一次设置K线值
+ kLine.setOpen(exchangeTrade.getPrice());
+ kLine.setHigh(exchangeTrade.getPrice());
+ kLine.setLow(exchangeTrade.getPrice());
+ kLine.setClose(exchangeTrade.getPrice());
+ } else {
+ kLine.setHigh(exchangeTrade.getPrice().max(kLine.getHigh()));
+ kLine.setLow(exchangeTrade.getPrice().min(kLine.getLow()));
+ kLine.setClose(exchangeTrade.getPrice());
+ }
+ kLine.setCount(kLine.getCount() + 1);
+ kLine.setVolume(kLine.getVolume().add(exchangeTrade.getAmount()));
+ BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount());
+ // kLine.setTurnover(kLine.getTurnover().add(turnover));
+ }
+
+ public void handleTradeStorage(ExchangeTrade exchangeTrade) {
+ for (MarketHandler storage : handlers) {
+ storage.handleTrade(symbol, exchangeTrade, coinThumb);
+ }
+ }
+
+ public void handleKLineStorage(Candlestick kLine) {
+ // 存储交易信息 TODO 发送最新的一根K线
+// for (MarketHandler storage : handlers) {
+// storage.handleKLine(symbol, kLine);
+// }
+ }
+
+ public void handleThumb(ExchangeTrade exchangeTrade) {
+ logger.info("handleThumb symbol = {}", this.symbol);
+ synchronized (coinThumb) {
+ if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+ //第一笔交易记为开盘价
+ coinThumb.setOpen(exchangeTrade.getPrice());
+ }
+ coinThumb.setHigh(exchangeTrade.getPrice().max(coinThumb.getHigh()));
+ if (coinThumb.getLow().compareTo(BigDecimal.ZERO) == 0) {
+ coinThumb.setLow(exchangeTrade.getPrice());
+ } else {
+ coinThumb.setLow(exchangeTrade.getPrice().min(coinThumb.getLow()));
+ }
+ coinThumb.setClose(exchangeTrade.getPrice());
+ coinThumb.setVolume(coinThumb.getVolume().add(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP));
+ BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP);
+ coinThumb.setTurnover(coinThumb.getTurnover().add(turnover));
+ BigDecimal change = coinThumb.getClose().subtract(coinThumb.getOpen());
+ coinThumb.setChange(change);
+ if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) {
+ coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP));
+ }
+ if ("USDT".equalsIgnoreCase(baseCoin)) {
+ logger.info("setUsdRate", exchangeTrade.getPrice());
+ coinThumb.setUsdRate(exchangeTrade.getPrice());
+ } else {
+
+ }
+ //coinThumb.setBaseUsdRate(coinExchangeRate.getUsdRate(baseCoin));
+ //coinThumb.setUsdRate(exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin)));
+ //logger.info("setUsdRate", exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin)));
+ //logger.info("thumb = {}", coinThumb);
+ }
+ }
+
+ @Override
+ public void addHandler(MarketHandler storage) {
+ handlers.add(storage);
+ }
+
+
+ @Override
+ public CoinThumb getThumb() {
+ return coinThumb;
+ }
+
+ @Override
+ public void setMarketService(MarketService service) {
+ this.service = service;
+ }
+
+ @Override
+ public void generateKLine(int range, int field, long time) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTimeInMillis(time);
+ DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long endTick = calendar.getTimeInMillis();
+ String endTime = df.format(calendar.getTime());
+ //往前推range个时间单位
+ calendar.add(field, -range);
+ String fromTime = df.format(calendar.getTime());
+ long startTick = calendar.getTimeInMillis();
+ System.out.println("time range from " + fromTime + " to " + endTime);
+ List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick);
+
+ Candlestick kLine = new Candlestick();
+ kLine.setTimestamp(endTick);
+ kLine.setAmount(BigDecimal.ZERO);
+ kLine.setClose(BigDecimal.ZERO);
+ kLine.setLow(BigDecimal.ZERO);
+ kLine.setOpen(BigDecimal.ZERO);
+ kLine.setVolume(BigDecimal.ZERO);
+ kLine.setHigh(BigDecimal.ZERO);
+ String rangeUnit = "";
+ if (field == Calendar.MINUTE) {
+ rangeUnit = "min";
+ } else if (field == Calendar.HOUR_OF_DAY) {
+ rangeUnit = "hour";
+ } else if (field == Calendar.DAY_OF_WEEK) {
+ rangeUnit = "week";
+ } else if (field == Calendar.DAY_OF_YEAR) {
+ rangeUnit = "day";
+ } else if (field == Calendar.MONTH) {
+ rangeUnit = "month";
+ }
+ // kLine.setPeriod(range + rangeUnit);
+ String period = range + rangeUnit;
+ // 处理K线信息
+ for (ExchangeTrade exchangeTrade : exchangeTrades) {
+ processTrade(kLine, exchangeTrade);
+ }
+ // 如果开盘价为0,则设置为前一个价格
+ if(kLine.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+ kLine.setOpen(coinThumb.getClose());
+ kLine.setClose(coinThumb.getClose());
+ kLine.setLow(coinThumb.getClose());
+ kLine.setHigh(coinThumb.getClose());
+ }
+ logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine));
+ service.saveKLine(symbol,period, kLine);
+ // 生成一个对应的新K线 后续的交易会更新这个最新K线数据
+ Candlestick newKline = new Candlestick();
+ //kLine.setTimestamp(endTick);
+ newKline.setAmount(BigDecimal.ZERO);
+ newKline.setClose(kLine.getClose());
+ newKline.setLow(kLine.getClose());
+ // 开盘价是上个K线的收盘价
+ newKline.setOpen(kLine.getClose());
+ newKline.setVolume(BigDecimal.ZERO);
+ newKline.setHigh(kLine.getClose());
+ currentKlineMap.put(period,newKline);
+
+ // 存储昨日K线
+ if("day".equals(rangeUnit)){
+ redisUtils.set("NEKK/USDT",kLine);
+ }
+ }
+
+ @Override
+ public Candlestick getKLine() {
+ return currentKLine;
+ }
+
+ @Override
+ public void setIsStopKLine(boolean stop) {
+ this.stopKLine = stop;
+ }
+
+ @Override
+ public boolean isStopKline() {
+ return this.stopKLine;
+ }
+
+
+ @Override
+ public Map<String, Candlestick> getCurrentKlineMap() {
+ return currentKlineMap;
+ }
+
+ @Override
+ public void setRedisUtils(RedisUtils redisUtils) {
+ this.redisUtils = redisUtils;
+ }
+
+ public void setCurrentKlineMap(Map<String, Candlestick> currentKlineMap) {
+ this.currentKlineMap = currentKlineMap;
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java b/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java
new file mode 100644
index 0000000..4583886
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java
@@ -0,0 +1,5 @@
+package com.xcong.excoin.processor;
+
+public enum ExchangeOrderDirection {
+ BUY,SELL;
+}
diff --git a/src/main/java/com/xcong/excoin/processor/MarketHandler.java b/src/main/java/com/xcong/excoin/processor/MarketHandler.java
new file mode 100644
index 0000000..526fa71
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/MarketHandler.java
@@ -0,0 +1,21 @@
+package com.xcong.excoin.processor;
+
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.trade.ExchangeTrade;
+
+public interface MarketHandler {
+
+ /**
+ * 存储交易信息
+ * @param exchangeTrade
+ */
+ void handleTrade(String symbol, ExchangeTrade exchangeTrade, CoinThumb thumb);
+
+
+ /**
+ * 存储K线信息
+ *
+ * @param kLine
+ */
+ void handleKLine(String symbol, Candlestick kLine);
+}
diff --git a/src/main/java/com/xcong/excoin/processor/MarketService.java b/src/main/java/com/xcong/excoin/processor/MarketService.java
new file mode 100644
index 0000000..11c6c56
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/MarketService.java
@@ -0,0 +1,125 @@
+package com.xcong.excoin.processor;
+
+
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.RedisUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@Service
+public class MarketService {
+// @Autowired
+// private MongoTemplate mongoTemplate;
+
+ @Resource
+ private OrderCoinDealDao orderCoinDealDao;
+
+ @Resource
+ private RedisUtils redisUtils;
+
+ public List<Candlestick> findAllKLine(String symbol, String peroid) {
+// Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time"));
+// Query query = new Query().with(sort).limit(1000);
+//
+// return mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+peroid);
+ return null;
+ }
+
+ public List<Candlestick> findAllKLine(String symbol, long fromTime, long toTime, String period) {
+// Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
+// Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
+// Query query = new Query(criteria).with(sort);
+// List<KLine> kLines = mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+ period);
+// return kLines;
+ String key = "KINE_" + symbol + "_" + period;
+ Object data = redisUtils.get(key);
+ List list = new ArrayList();
+ if (data != null) {
+ list = (List) data;
+ }
+ return list;
+ }
+//
+// public ExchangeTrade findFirstTrade(String symbol,long fromTime,long toTime){
+// Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
+// Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
+// Query query = new Query(criteria).with(sort);
+// return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
+// }
+
+// public ExchangeTrade findLastTrade(String symbol,long fromTime,long toTime){
+// Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
+// Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time"));
+// Query query = new Query(criteria).with(sort);
+// return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
+// }
+//
+// public ExchangeTrade findTrade(String symbol, long fromTime, long toTime, Sort.Order order){
+// Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
+// Sort sort = new Sort(order);
+// Query query = new Query(criteria).with(sort);
+// return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
+// }
+
+ public List<ExchangeTrade> findTradeByTimeRange(String symbol, long timeStart, long timeEnd) {
+// Criteria criteria = Criteria.where("time").gte(timeStart).andOperator(Criteria.where("time").lt(timeEnd));
+// Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
+// Query query = new Query(criteria).with(sort);
+//
+// return mongoTemplate.find(query,ExchangeTrade.class,"exchange_trade_"+symbol);
+ return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart));
+ // return null;
+ }
+
+ public void saveKLine(String symbol, String period, Candlestick kLine) {
+ // 先获取
+ String key = "KINE_" + symbol + "_" + period;
+ Object data = redisUtils.get(key);
+ List list = new ArrayList();
+ if (data != null) {
+ list = (List) data;
+ }
+ list.add(kLine);
+ redisUtils.set("KINE_" + symbol + "_" + period, list);
+ // mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod());
+ }
+
+ /**
+ * 查找某时间段内的交易量
+ *
+ * @param symbol
+ * @param timeStart
+ * @param timeEnd
+ * @return
+ */
+ public BigDecimal findTradeVolume(String symbol, long timeStart, long timeEnd) {
+// Criteria criteria = Criteria.where("time").gt(timeStart)
+// .andOperator(Criteria.where("time").lte(timeEnd));
+// //.andOperator(Criteria.where("volume").gt(0));
+// Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
+// Query query = new Query(criteria).with(sort);
+// List<KLine> kLines = mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_1min");
+// BigDecimal totalVolume = BigDecimal.ZERO;
+// for(KLine kLine:kLines){
+// totalVolume = totalVolume.add(kLine.getVolume());
+// }
+// return totalVolume;
+ List<ExchangeTrade> exchangeTrades = orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart));
+ BigDecimal totalVolume = BigDecimal.ZERO;
+ if (CollectionUtils.isNotEmpty(exchangeTrades)) {
+ for (ExchangeTrade kLine : exchangeTrades) {
+ totalVolume = totalVolume.add(kLine.getBuyTurnover());
+ }
+ }
+
+ return totalVolume;
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
new file mode 100644
index 0000000..90a3e9c
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -0,0 +1,98 @@
+package com.xcong.excoin.quartz.job;
+
+import com.alibaba.fastjson.JSON;
+import com.huobi.client.SubscriptionClient;
+import com.huobi.client.SubscriptionOptions;
+import com.huobi.client.model.Candlestick;
+import com.huobi.client.model.enums.CandlestickInterval;
+import com.xcong.excoin.modules.coin.dao.OrderCoinsDao;
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+import com.xcong.excoin.modules.coin.service.OrderCoinService;
+import com.xcong.excoin.modules.symbols.service.SymbolsService;
+import com.xcong.excoin.processor.CoinProcessor;
+import com.xcong.excoin.processor.CoinProcessorFactory;
+import com.xcong.excoin.processor.DefaultCoinProcessor;
+import com.xcong.excoin.processor.MarketService;
+import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
+import com.xcong.excoin.trade.CoinTrader;
+import com.xcong.excoin.trade.CoinTraderFactory;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import com.xcong.excoin.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 开启撮合交易
+ *
+ * @author wzy
+ * @date 2020-05-28
+ **/
+@Slf4j
+@Component
+//@ConditionalOnProperty(prefix = "app", name = "trade", havingValue = "true")
+public class CoinTradeInitJob {
+
+ @Resource
+ private OrderCoinsDao orderCoinsDao;
+
+ @Resource
+ private CoinTraderFactory factory;
+
+ @Resource
+ private OrderCoinService coinService;
+
+ @Resource
+ private MarketService marketService;
+
+ @Resource
+ private CoinProcessorFactory processorFactory;
+
+ @PostConstruct
+ public void initCoinTrade() {
+ log.info("#=======撮合交易器开启=======#");
+ String symbol = "NEKK";
+ CoinTrader newTrader = new CoinTrader(symbol);
+ newTrader.setOrderCoinService(coinService);
+ //newTrader.setKafkaTemplate(kafkaTemplate);
+ //newTrader.setBaseCoinScale(coin.getBaseCoinScale());
+ //newTrader.setCoinScale(coin.getCoinScale());
+ // newTrader.setPublishType(coin.getPublishType());
+ //newTrader.setClearTime(coin.getClearTime());
+
+ // 创建成功以后需要对未处理订单预处理
+ log.info("======CoinTrader Process: " + symbol + "======");
+ List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
+ List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
+ List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+ orders.forEach(order -> {
+ tradingOrders.add(order);
+ });
+ try {
+ newTrader.trade(tradingOrders);
+ } catch (ParseException e) {
+ e.printStackTrace();
+ log.info("异常:trader.trade(tradingOrders);");
+ }
+ newTrader.setReady(true);
+ factory.addTrader(symbol, newTrader);
+
+ // 创建K线生成器
+ CoinProcessor processor = new DefaultCoinProcessor(symbol, "USDT");
+ processor.setMarketService(marketService);
+ //processor.setExchangeRate(exchangeRate);
+ processor.initializeThumb();
+ //processor.initializeUsdRate();
+ processor.setIsHalt(false);
+
+ processorFactory.addProcessor(symbol, processor);
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
new file mode 100644
index 0000000..110c1fc
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -0,0 +1,159 @@
+package com.xcong.excoin.quartz.job;
+
+import com.alibaba.fastjson.JSON;
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.processor.CoinProcessorFactory;
+import com.xcong.excoin.trade.TradePlateModel;
+import com.xcong.excoin.utils.RedisUtils;
+import com.xcong.excoin.websocket.TradePlateSendWebSocket;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+/**
+ * 生成各时间段的K线信息
+ *
+ */
+@Component
+@Slf4j
+public class KLineGeneratorJob {
+ @Resource
+ private CoinProcessorFactory processorFactory;
+
+ @Resource
+ private TradePlateSendWebSocket plateSendWebSocket;
+
+ @Resource
+ private RedisUtils redisUtils;
+
+ @Scheduled(cron = "0/2 * * * * *")
+ public void tradePlate(){
+ redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()));
+ Candlestick candlestick = new Candlestick();
+ candlestick.setOpen(new BigDecimal("10.33"));
+ candlestick.setHigh(new BigDecimal("15.23"));
+ candlestick.setVolume(new BigDecimal("12121.34"));
+ candlestick.setLow(new BigDecimal("8.234"));
+ candlestick.setAmount(new BigDecimal("1199"));
+ candlestick.setTimestamp(1599840000);
+ candlestick.setId(1599840000L);
+ candlestick.setCount(100002);
+ candlestick.setClose(new BigDecimal("12.2323"));
+ redisUtils.set("NEKK/USDT",candlestick);
+ // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]]
+ TradePlateModel tradePlateModel = new TradePlateModel();
+ List<BigDecimal> buy;
+ List<BigDecimal> sell;
+ for(int i=0;i<5;i++){
+ buy = new ArrayList<>(2);
+ buy.add(new BigDecimal(Math.random()*i));
+ buy.add(new BigDecimal(Math.random()*i));
+ tradePlateModel.getBuy().add(buy);
+ sell = new ArrayList<>(2);
+ sell.add(new BigDecimal(Math.random()*i*2));
+ sell.add(new BigDecimal(Math.random()*i*2));
+ tradePlateModel.getSell().add(sell);
+ }
+
+ plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null);
+ plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
+ plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+ plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+ }
+
+ /**
+ * 每分钟定时器,处理分钟K线
+ */
+ @Scheduled(cron = "0 * * * * *")
+ public void handle5minKLine(){
+
+ Calendar calendar = Calendar.getInstance();
+ log.debug("分钟K线:{}",calendar.getTime());
+ //将秒、微秒字段置为0
+ calendar.set(Calendar.SECOND,0);
+ calendar.set(Calendar.MILLISECOND,0);
+ long time = calendar.getTimeInMillis();
+ int minute = calendar.get(Calendar.MINUTE);
+ int hour = calendar.get(Calendar.HOUR_OF_DAY);
+ processorFactory.getProcessorMap().forEach((symbol,processor)->{
+ if(!processor.isStopKline()) {
+ log.debug("生成{}分钟k线:{}",symbol);
+ //生成1分钟K线
+ processor.autoGenerate();
+ //更新24H成交量
+ processor.update24HVolume(time);
+ if(minute%5 == 0) {
+ // 五分钟的当前K线
+ processor.generateKLine(5, Calendar.MINUTE, time);
+ }
+ if(minute%10 == 0){
+ processor.generateKLine(10, Calendar.MINUTE,time);
+ }
+ if(minute%15 == 0){
+ processor.generateKLine(15, Calendar.MINUTE,time);
+ }
+ if(minute%30 == 0){
+ processor.generateKLine(30, Calendar.MINUTE,time);
+ }
+ if(hour == 0 && minute == 0){
+ processor.resetThumb();
+ }
+ }
+ });
+ }
+
+ /**
+ * 每小时运行
+ */
+ @Scheduled(cron = "0 0 * * * *")
+ public void handleHourKLine(){
+ processorFactory.getProcessorMap().forEach((symbol,processor)-> {
+ if(!processor.isStopKline()) {
+ Calendar calendar = Calendar.getInstance();
+ log.info("小时K线:{}",calendar.getTime());
+ //将秒、微秒字段置为0
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ long time = calendar.getTimeInMillis();
+
+ processor.generateKLine(1, Calendar.HOUR_OF_DAY, time);
+ }
+ });
+ }
+
+ /**
+ * 每日0点处理器,处理日K线
+ */
+ @Scheduled(cron = "0 0 0 * * *")
+ public void handleDayKLine(){
+ processorFactory.getProcessorMap().forEach((symbol,processor)->{
+ if(!processor.isStopKline()) {
+ Calendar calendar = Calendar.getInstance();
+ log.info("日K线:{}",calendar.getTime());
+ //将秒、微秒字段置为0
+ calendar.set(Calendar.HOUR_OF_DAY,0);
+ calendar.set(Calendar.MINUTE,0);
+ calendar.set(Calendar.SECOND,0);
+ calendar.set(Calendar.MILLISECOND,0);
+ long time = calendar.getTimeInMillis();
+ int week = calendar.get(Calendar.DAY_OF_WEEK);
+ int dayOfMonth = calendar.get(Calendar.DAY_OF_MONTH);
+ if(week == 1){
+ processor.generateKLine(1, Calendar.DAY_OF_WEEK, time);
+ }
+ if(dayOfMonth == 1){
+ processor.generateKLine(1, Calendar.DAY_OF_MONTH, time);
+ }
+ processor.generateKLine(1, Calendar.DAY_OF_YEAR,time);
+ }
+ });
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
new file mode 100644
index 0000000..9b03546
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -0,0 +1,98 @@
+package com.xcong.excoin.rabbit.consumer;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import com.xcong.excoin.modules.coin.service.OrderCoinService;
+import com.xcong.excoin.modules.exchange.service.HandleKlineService;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.RedisUtils;
+import com.xcong.excoin.websocket.TradePlateSendWebSocket;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author wzy
+ * @date 2020-05-25
+ **/
+@Slf4j
+@Component
+public class ExchangeConsumer {
+
+ @Resource
+ private TradePlateSendWebSocket tradePlateSendWebSocket;
+
+ @Resource
+ private RedisUtils redisUtils;
+
+ @Resource
+ private HandleKlineService handleKlineService;
+
+ @Resource
+ private OrderCoinService orderCoinService;
+
+ /**
+ * 发送盘口信息
+ * @param content
+ */
+ @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
+ public void tradePlate(String content) {
+ log.info("#---->{}#", content);
+ tradePlateSendWebSocket.sendMessagePlate(content,null);
+ }
+
+ /**
+ * 处理订单
+ * @param content
+ */
+ @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE)
+ public void handleTradeExchange(String content) {
+ log.info("#---->{}#", content);
+ List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
+ // 处理K线 并更新最新价
+ handleKlineService.handleExchangeOrderToKline(exchangeTrades);
+ // 推送最新K线
+ String symbol = exchangeTrades.get(0).getSymbol();
+ String symbolUsdt = symbol;
+ if(!symbol.contains("USDT")){
+ symbolUsdt = symbol+"/USDT";
+ }
+ String key = "NEW_KINE_{}";
+ key = StrUtil.format(key, symbolUsdt);
+ Object o = redisUtils.get(key);
+ Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
+ Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
+ for(Map.Entry<String, Candlestick> map : entries){
+ tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
+ }
+ // 处理用户订单
+ orderCoinService.handleOrder(exchangeTrades);
+ }
+
+ /**
+ * 更新最新K线
+ * @param content
+ */
+// @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
+// public void newKling(String content) {
+// log.info("#---->{}#", content);
+// // 最新K线的币种
+// String key = "NEW_KINE_{}";
+// key = StrUtil.format(key, content);
+// Object o = redisUtils.get(key);
+// Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o;
+// // 推送最新K线
+// Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
+// for(Map.Entry<String, Candlestick> map : entries){
+// tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
+// }
+//
+// }
+}
diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
new file mode 100644
index 0000000..42e67d4
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
@@ -0,0 +1,47 @@
+package com.xcong.excoin.rabbit.producer;
+
+import cn.hutool.core.util.IdUtil;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @date 2020-05-25
+ **/
+@Slf4j
+@Component
+public class ExchangeProducer implements RabbitTemplate.ConfirmCallback {
+
+ private RabbitTemplate rabbitTemplate;
+
+ @Autowired
+ public ExchangeProducer(RabbitTemplate rabbitTemplate) {
+ this.rabbitTemplate = rabbitTemplate;
+ rabbitTemplate.setConfirmCallback(this);
+ }
+
+ public void sendPlateMsg(String content) {
+ CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
+ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_TRADE_PLATE, content, correlationData);
+ }
+
+ public void sendHandleTrade(String content) {
+ CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
+ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE, content, correlationData);
+ }
+
+
+ @Override
+ public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+ log.info("#----->{}#", correlationData);
+ if (ack) {
+ log.info("success");
+ } else {
+ log.info("--->{}", cause);
+ }
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
new file mode 100644
index 0000000..46289af
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -0,0 +1,762 @@
+package com.xcong.excoin.trade;
+
+import com.alibaba.fastjson.JSON;
+
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+import com.xcong.excoin.modules.coin.service.OrderCoinService;
+import com.xcong.excoin.rabbit.producer.ExchangeProducer;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class CoinTrader {
+ private String symbol;
+ private ExchangeProducer exchangeProducer;
+ private OrderCoinService orderCoinService;
+ //交易币种的精度
+ private int coinScale = 4;
+ //基币的精度
+ private int baseCoinScale = 4;
+ private Logger logger = LoggerFactory.getLogger(CoinTrader.class);
+ //买入限价订单链表,价格从高到低排列
+ private TreeMap<BigDecimal, MergeOrder> buyLimitPriceQueue;
+ //卖出限价订单链表,价格从低到高排列
+ private TreeMap<BigDecimal, MergeOrder> sellLimitPriceQueue;
+ //买入市价订单链表,按时间从小到大排序
+ private LinkedList<OrderCoinsEntity> buyMarketQueue;
+ //卖出市价订单链表,按时间从小到大排序
+ private LinkedList<OrderCoinsEntity> sellMarketQueue;
+ //卖盘盘口信息
+ private TradePlate sellTradePlate;
+ //买盘盘口信息
+ private TradePlate buyTradePlate;
+ //是否暂停交易
+ private boolean tradingHalt = false;
+ private boolean ready = false;
+ //交易对信息
+ //private ExchangeCoinPublishType publishType;
+ private String clearTime;
+
+ private SimpleDateFormat dateTimeFormat;
+
+
+ public CoinTrader(String symbol) {
+ this.symbol = symbol;
+ initialize();
+ }
+
+ /**
+ * 初始化交易线程
+ */
+ public void initialize() {
+ logger.info("init CoinTrader for symbol {}", symbol);
+ //买单队列价格降序排列
+ buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder());
+ //卖单队列价格升序排列
+ this.sellLimitPriceQueue = new TreeMap<>(Comparator.naturalOrder());
+ this.buyMarketQueue = new LinkedList<>();
+ this.sellMarketQueue = new LinkedList<>();
+ this.sellTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_SELL);
+ this.buyTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_BUY);
+ this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ }
+
+ /**
+ * 增加限价订单到队列,买入单按从价格高到低排,卖出单按价格从低到高排
+ *
+ * @param exchangeOrder
+ */
+ public void addLimitPriceOrder(OrderCoinsEntity exchangeOrder) {
+ if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+ return;
+ }
+ //logger.info("addLimitPriceOrder,orderId = {}", exchangeOrder.getOrderId());
+ TreeMap<BigDecimal, MergeOrder> list;
+ if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ list = buyLimitPriceQueue;
+ // 添加盘口信息 TODO
+ buyTradePlate.add(exchangeOrder);
+ if (ready) {
+ // 发送盘口信息
+ sendTradePlateMessage(buyTradePlate, sellTradePlate);
+ }
+ } else {
+ list = sellLimitPriceQueue;
+ sellTradePlate.add(exchangeOrder);
+ if (ready) {
+ sendTradePlateMessage(buyTradePlate, sellTradePlate);
+ }
+ }
+ synchronized (list) {
+ MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice());
+ if (mergeOrder == null) {
+ mergeOrder = new MergeOrder();
+ mergeOrder.add(exchangeOrder);
+ list.put(exchangeOrder.getEntrustPrice(), mergeOrder);
+ } else {
+ mergeOrder.add(exchangeOrder);
+ }
+ }
+ }
+
+ public void addMarketPriceOrder(OrderCoinsEntity exchangeOrder) {
+ if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+ return;
+ }
+ logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
+ LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue;
+ synchronized (list) {
+ list.addLast(exchangeOrder);
+ }
+ }
+
+ public void trade(List<OrderCoinsEntity> orders) throws ParseException {
+ if (tradingHalt) {
+ return;
+ }
+ for (OrderCoinsEntity order : orders) {
+ trade(order);
+ }
+ }
+
+
+ /**
+ * 主动交易输入的订单,交易不完成的会输入到队列
+ *
+ * @param exchangeOrder
+ * @throws ParseException
+ */
+ public void trade(OrderCoinsEntity exchangeOrder) {
+ if (tradingHalt) {
+ return;
+ }
+ //logger.info("trade order={}",exchangeOrder);
+ if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) {
+ logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
+ return;
+ }
+ // 如果
+ if (exchangeOrder.getEntrustAmount().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustAmount().subtract(exchangeOrder.getDealAmount()).compareTo(BigDecimal.ZERO) <= 0) {
+ return;
+ }
+
+ TreeMap<BigDecimal, MergeOrder> limitPriceOrderList;
+ LinkedList<OrderCoinsEntity> marketPriceOrderList;
+ if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ // 买单 和限价卖单以及市价市场卖单队列对比 完成撮合
+ limitPriceOrderList = sellLimitPriceQueue;
+ marketPriceOrderList = sellMarketQueue;
+ } else {
+ limitPriceOrderList = buyLimitPriceQueue;
+ marketPriceOrderList = buyMarketQueue;
+ }
+ if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+ //logger.info(">>>>>市价单>>>交易与限价单交易");
+ //与限价单交易
+ matchMarketPriceWithLPList(limitPriceOrderList, exchangeOrder);
+ } else if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+ //限价单价格必须大于0
+ if (exchangeOrder.getEntrustPrice().compareTo(BigDecimal.ZERO) <= 0) {
+ return;
+ }
+
+ //logger.info(">>>>>限价单>>>交易与限价单交易");
+ //先与限价单交易
+ matchLimitPriceWithLPList(limitPriceOrderList, exchangeOrder, false);
+ if (exchangeOrder.getEntrustCnt().compareTo(exchangeOrder.getDealCnt()) > 0) {
+ //logger.info(">>>>限价单未交易完>>>>与市价单交易>>>>");
+ //后与市价单交易
+ matchLimitPriceWithMPList(marketPriceOrderList, exchangeOrder);
+ }
+ }
+ }
+
+ /**
+ * 限价委托单与限价队列匹配
+ *
+ * @param lpList 限价对手单队列
+ * @param focusedOrder 交易订单
+ */
+ public void matchLimitPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder, boolean canEnterList) {
+ List<ExchangeTrade> exchangeTrades = new ArrayList<>();
+ List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+ synchronized (lpList) {
+ Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator();
+ boolean exitLoop = false;
+ while (!exitLoop && mergeOrderIterator.hasNext()) {
+ Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
+ MergeOrder mergeOrder = entry.getValue();
+ Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
+ //买入单需要匹配的价格不大于委托价,否则退出
+ if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) > 0) {
+ break;
+ }
+ //卖出单需要匹配的价格不小于委托价,否则退出
+ if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) < 0) {
+ break;
+ }
+ while (orderIterator.hasNext()) {
+ OrderCoinsEntity matchOrder = orderIterator.next();
+ //处理匹配
+ ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
+ exchangeTrades.add(trade);
+ //判断匹配单是否完成
+ if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+ //当前匹配的订单完成交易,删除该订单
+ orderIterator.remove();
+ completedOrders.add(matchOrder);
+ }
+ //判断交易单是否完成
+ if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+ //交易完成
+ completedOrders.add(focusedOrder);
+ //退出循环
+ exitLoop = true;
+ break;
+ }
+ }
+ if (mergeOrder.size() == 0) {
+ mergeOrderIterator.remove();
+ }
+ }
+ }
+ //如果还没有交易完,订单压入列表中
+ if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0 && canEnterList) {
+ addLimitPriceOrder(focusedOrder);
+ }
+ //每个订单的匹配批量推送
+ handleExchangeTrade(exchangeTrades);
+ if (completedOrders.size() > 0) {
+ orderCompleted(completedOrders);
+ TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate;
+ sendTradePlateMessage(buyTradePlate, sellTradePlate);
+ }
+ }
+
+ /**
+ * 限价委托单与市价队列匹配
+ *
+ * @param mpList 市价对手单队列
+ * @param focusedOrder 交易订单
+ */
+ public void matchLimitPriceWithMPList(LinkedList<OrderCoinsEntity> mpList, OrderCoinsEntity focusedOrder) {
+ List<ExchangeTrade> exchangeTrades = new ArrayList<>();
+ List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+ synchronized (mpList) {
+ Iterator<OrderCoinsEntity> iterator = mpList.iterator();
+ while (iterator.hasNext()) {
+ OrderCoinsEntity matchOrder = iterator.next();
+ ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
+ logger.info(">>>>>" + trade);
+ if (trade != null) {
+ exchangeTrades.add(trade);
+ }
+ //判断匹配单是否完成,市价单amount为成交量
+ if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+ iterator.remove();
+ completedOrders.add(matchOrder);
+ }
+ //判断吃单是否完成,判断成交量是否完成
+ if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+ //交易完成
+ completedOrders.add(focusedOrder);
+ //退出循环
+ break;
+ }
+ }
+ }
+ //如果还没有交易完,订单压入列表中
+ if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0) {
+ addLimitPriceOrder(focusedOrder);
+ }
+ //每个订单的匹配批量推送
+ handleExchangeTrade(exchangeTrades);
+ orderCompleted(completedOrders);
+ }
+
+
+ /**
+ * 市价委托单与限价对手单列表交易
+ *
+ * @param lpList 限价对手单列表
+ * @param focusedOrder 待交易订单
+ */
+ public void matchMarketPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder) {
+ List<ExchangeTrade> exchangeTrades = new ArrayList<>();
+ List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+ // 加锁 同步
+ synchronized (lpList) {
+ Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator();
+ boolean exitLoop = false;
+ while (!exitLoop && mergeOrderIterator.hasNext()) {
+ Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
+ MergeOrder mergeOrder = entry.getValue();
+ Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
+ while (orderIterator.hasNext()) {
+ OrderCoinsEntity matchOrder = orderIterator.next();
+ //处理匹配 用户单和买卖盘内的委托单进行比较,得到可以交易的量
+ // 例如 市价买单 1000个 卖1 500 卖2 300 卖3 500 此时从卖1开始吃到卖3只剩300个
+ ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
+ if (trade != null) {
+ exchangeTrades.add(trade);
+ }
+ //判断匹配单是否完成
+ if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+ //当前匹配的订单完成交易,删除该订单
+ orderIterator.remove();
+ completedOrders.add(matchOrder);
+ }
+ //判断焦点订单是否完成
+ if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+ completedOrders.add(focusedOrder);
+ //退出循环
+ exitLoop = true;
+ break;
+ }
+ }
+ if (mergeOrder.size() == 0) {
+ mergeOrderIterator.remove();
+ }
+ }
+ }
+ //如果还没有交易完,订单压入列表中,市价买单按成交量算
+ if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0
+ || focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0) {
+ addMarketPriceOrder(focusedOrder);
+ }
+ //每个订单的匹配批量推送
+ handleExchangeTrade(exchangeTrades);
+ if (completedOrders.size() > 0) {
+ orderCompleted(completedOrders);
+ TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate;
+ sendTradePlateMessage(buyTradePlate, sellTradePlate);
+ }
+ }
+
+ /**
+ * 计算委托单剩余可成交的数量
+ *
+ * @param order 委托单
+ * @param dealPrice 成交价
+ * @return
+ */
+ private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) {
+ if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+ //剩余成交量 TODO ?
+ // 委托量-成交量=剩余量
+ BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
+ return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN);
+ } else {
+ return order.getEntrustCnt().subtract(order.getDealCnt());
+ }
+ }
+
+ /**
+ * 调整市价单剩余成交额,当剩余成交额不足时设置订单完成
+ *
+ * @param order
+ * @param dealPrice
+ * @return
+ */
+ private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) {
+ if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+// BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover());
+// if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
+// .compareTo(BigDecimal.ZERO)==0){
+// order.setTurnover(order.getAmount());
+// return leftTurnover;
+// }
+ }
+ return BigDecimal.ZERO;
+ }
+
+ /**
+ * 处理两个匹配的委托订单
+ *
+ * @param focusedOrder 焦点单
+ * @param matchOrder 匹配单
+ * @return
+ */
+ private ExchangeTrade processMatch(OrderCoinsEntity focusedOrder, OrderCoinsEntity matchOrder) {
+ //需要交易的数量,成交量,成交价,可用数量
+ BigDecimal needAmount, dealPrice, availAmount;
+ //如果匹配单是限价单,则以其价格为成交价
+ if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+ dealPrice = matchOrder.getEntrustPrice();
+ } else {
+ dealPrice = focusedOrder.getEntrustPrice();
+ }
+ //成交价必须大于0
+ if (dealPrice.compareTo(BigDecimal.ZERO) <= 0) {
+ return null;
+ }
+ // 需要的成交量
+ needAmount = calculateTradedAmount(focusedOrder, dealPrice);
+ // 队列单可提供的成交量
+ availAmount = calculateTradedAmount(matchOrder, dealPrice);
+ //计算成交量 取少的
+ BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount);
+ logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
+ //如果成交额为0说明剩余额度无法成交,退出
+ if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) {
+ return null;
+ }
+
+ //计算成交额,成交额要保留足够精度
+ BigDecimal turnover = tradedAmount.multiply(dealPrice);
+ matchOrder.setDealCnt(matchOrder.getDealCnt().add(tradedAmount));
+ // 成交金额
+ matchOrder.setDealAmount(matchOrder.getDealAmount().add(turnover));
+ // 用户单成交量
+ focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount));
+ // 用户单成交金额
+ focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover));
+
+ //创建成交记录
+ ExchangeTrade exchangeTrade = new ExchangeTrade();
+ exchangeTrade.setSymbol(symbol);
+ // 成交量
+ exchangeTrade.setAmount(tradedAmount);
+ exchangeTrade.setDirection(focusedOrder.getOrderType());
+ // 成交价格
+ exchangeTrade.setPrice(dealPrice);
+ // 成交金额
+ exchangeTrade.setBuyTurnover(turnover);
+ exchangeTrade.setSellTurnover(turnover);
+ //校正市价单剩余成交额
+ if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == focusedOrder.getTradeType() && focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ BigDecimal adjustTurnover = adjustMarketOrderTurnover(focusedOrder, dealPrice);
+ exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
+ } else if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == matchOrder.getTradeType() && matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ BigDecimal adjustTurnover = adjustMarketOrderTurnover(matchOrder, dealPrice);
+ exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
+ }
+
+ if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ exchangeTrade.setBuyOrderId(focusedOrder.getId());
+ exchangeTrade.setSellOrderId(matchOrder.getId());
+ } else {
+ exchangeTrade.setBuyOrderId(matchOrder.getId());
+ exchangeTrade.setSellOrderId(focusedOrder.getId());
+ }
+
+ exchangeTrade.setTime(Calendar.getInstance().getTimeInMillis());
+ if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+ if (matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ buyTradePlate.remove(matchOrder, tradedAmount);
+ } else {
+ sellTradePlate.remove(matchOrder, tradedAmount);
+ }
+ }
+ return exchangeTrade;
+ }
+
+
+ // 这里是交易完的单
+ public void handleExchangeTrade(List<ExchangeTrade> trades) {
+ //logger.info("handleExchangeTrade:{}", trades);
+ if (trades.size() > 0) {
+ int maxSize = 1000;
+ //发送消息,key为交易对符号
+ if (trades.size() > maxSize) {
+ int size = trades.size();
+ for (int index = 0; index < size; index += maxSize) {
+ int length = (size - index) > maxSize ? maxSize : size - index;
+ List<ExchangeTrade> subTrades = trades.subList(index, index + length);
+ exchangeProducer.sendHandleTrade(JSON.toJSONString(subTrades));
+ //orderCoinService.handleOrder(subTrades);
+ }
+ } else {
+ trades.forEach(e -> {
+ System.out.println(e);
+ });
+ exchangeProducer.sendHandleTrade(JSON.toJSONString(trades));
+ //orderCoinService.handleOrder(trades);
+ // kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades));
+ }
+ // 更新最新K线 TODO
+
+ }
+ }
+
+ /**
+ * 订单完成,执行消息通知,订单数超1000个要拆分发送
+ *
+ * @param orders
+ */
+ public void orderCompleted(List<OrderCoinsEntity> orders) {
+ //logger.info("orderCompleted ,order={}",orders);
+ if (orders.size() > 0) {
+ int maxSize = 1000;
+ if (orders.size() > maxSize) {
+ int size = orders.size();
+ for (int index = 0; index < size; index += maxSize) {
+ int length = (size - index) > maxSize ? maxSize : size - index;
+ List<OrderCoinsEntity> subOrders = orders.subList(index, index + length);
+ // TODO 通知订单完成
+ //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders));
+ }
+ } else {
+ // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders));
+ }
+ }
+ }
+
+ /**
+ * 发送盘口变化消息
+ *
+ * @param buyTradePlate sellTradePlate
+ */
+ public void sendTradePlateMessage(TradePlate buyTradePlate, TradePlate sellTradePlate) {
+ //防止并发引起数组越界,造成盘口倒挂 TODO
+ List<List<BigDecimal>> plate;
+ List<BigDecimal> plateItem;
+ TradePlateModel tradePlateModel = new TradePlateModel();
+ // 转换格式
+ if (buyTradePlate != null && buyTradePlate.getItems() != null) {
+ plate = new ArrayList<>();
+ LinkedList<TradePlateItem> items = buyTradePlate.getItems();
+ for (TradePlateItem item : items) {
+ plateItem = new ArrayList<>(2);
+ BigDecimal price = item.getPrice();
+ BigDecimal amount = item.getAmount();
+ plateItem.add(price);
+ plateItem.add(amount);
+ plate.add(plateItem);
+ }
+ tradePlateModel.setBuy(plate);
+ }
+
+ if (sellTradePlate != null && sellTradePlate.getItems() != null) {
+ plate = new ArrayList<>();
+ LinkedList<TradePlateItem> items = sellTradePlate.getItems();
+ for (TradePlateItem item : items) {
+ plateItem = new ArrayList<>(2);
+ BigDecimal price = item.getPrice();
+ BigDecimal amount = item.getAmount();
+ plateItem.add(price);
+ plateItem.add(amount);
+ plate.add(plateItem);
+ }
+ tradePlateModel.setSell(plate);
+ }
+
+ // 盘口发生变化通知TODO
+
+ exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel));
+ }
+
+ /**
+ * 取消委托订单
+ *
+ * @param exchangeOrder
+ * @return
+ */
+ public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) {
+ logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
+ if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+ //处理市价单
+ Iterator<OrderCoinsEntity> orderIterator;
+ List<OrderCoinsEntity> list = null;
+ if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ list = this.buyMarketQueue;
+ } else {
+ list = this.sellMarketQueue;
+ }
+ synchronized (list) {
+ orderIterator = list.iterator();
+ while ((orderIterator.hasNext())) {
+ OrderCoinsEntity order = orderIterator.next();
+ if (order.getId().equals(exchangeOrder.getId())) {
+ orderIterator.remove();
+ onRemoveOrder(order);
+ return order;
+ }
+ }
+ }
+ } else {
+ //处理限价单
+ TreeMap<BigDecimal, MergeOrder> list = null;
+ Iterator<MergeOrder> mergeOrderIterator;
+ if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ list = this.buyLimitPriceQueue;
+ } else {
+ list = this.sellLimitPriceQueue;
+ }
+ synchronized (list) {
+ MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice());
+ if (mergeOrder != null) {
+ Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
+ while (orderIterator.hasNext()) {
+ OrderCoinsEntity order = orderIterator.next();
+ if (order.getId().equals(exchangeOrder.getId())) {
+ orderIterator.remove();
+ if (mergeOrder.size() == 0) {
+ list.remove(exchangeOrder.getEntrustPrice());
+ }
+ onRemoveOrder(order);
+ return order;
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public void onRemoveOrder(OrderCoinsEntity order) {
+ if (order.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+ if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+ buyTradePlate.remove(order);
+ sendTradePlateMessage(buyTradePlate, sellTradePlate);
+ } else {
+ sellTradePlate.remove(order);
+ sendTradePlateMessage(buyTradePlate, sellTradePlate);
+ }
+ }
+ }
+
+
+ public TradePlate getTradePlate(ExchangeOrderDirection direction) {
+ if (direction == ExchangeOrderDirection.BUY) {
+ return buyTradePlate;
+ } else {
+ return sellTradePlate;
+ }
+ }
+
+
+ /**
+ * 查询交易器里的订单
+ *
+ * @param orderId
+ * @param type
+ * @param direction
+ * @return
+ */
+ public OrderCoinsEntity findOrder(String orderId, int type, int direction) {
+ if (type == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+ LinkedList<OrderCoinsEntity> list;
+ if (direction == OrderCoinsEntity.ORDERTYPE_BUY) {
+ list = this.buyMarketQueue;
+ } else {
+ list = this.sellMarketQueue;
+ }
+ synchronized (list) {
+ Iterator<OrderCoinsEntity> orderIterator = list.iterator();
+ while ((orderIterator.hasNext())) {
+ OrderCoinsEntity order = orderIterator.next();
+ if (order.getId().equals(orderId)) {
+ return order;
+ }
+ }
+ }
+ } else {
+ TreeMap<BigDecimal, MergeOrder> list;
+ if (direction == OrderCoinsEntity.ORDERTYPE_BUY) {
+ list = this.buyLimitPriceQueue;
+ } else {
+ list = this.sellLimitPriceQueue;
+ }
+ synchronized (list) {
+ Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = list.entrySet().iterator();
+ while (mergeOrderIterator.hasNext()) {
+ Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
+ MergeOrder mergeOrder = entry.getValue();
+ Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
+ while ((orderIterator.hasNext())) {
+ OrderCoinsEntity order = orderIterator.next();
+ if (order.getId().equals(orderId)) {
+ return order;
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public TreeMap<BigDecimal, MergeOrder> getBuyLimitPriceQueue() {
+ return buyLimitPriceQueue;
+ }
+
+ public LinkedList<OrderCoinsEntity> getBuyMarketQueue() {
+ return buyMarketQueue;
+ }
+
+ public TreeMap<BigDecimal, MergeOrder> getSellLimitPriceQueue() {
+ return sellLimitPriceQueue;
+ }
+
+ public LinkedList<OrderCoinsEntity> getSellMarketQueue() {
+ return sellMarketQueue;
+ }
+
+
+ public void setCoinScale(int scale) {
+ this.coinScale = scale;
+ }
+
+ public void setBaseCoinScale(int scale) {
+ this.baseCoinScale = scale;
+ }
+
+ public boolean isTradingHalt() {
+ return this.tradingHalt;
+ }
+
+ /**
+ * 暂停交易,不接收新的订单
+ */
+ public void haltTrading() {
+ this.tradingHalt = true;
+ }
+
+ /**
+ * 恢复交易
+ */
+ public void resumeTrading() {
+ this.tradingHalt = false;
+ }
+
+ public void stopTrading() {
+ //TODO:停止交易,取消当前所有订单
+ }
+
+ public boolean getReady() {
+ return this.ready;
+ }
+
+ public void setReady(boolean ready) {
+ this.ready = ready;
+ }
+
+ public void setClearTime(String clearTime) {
+ this.clearTime = clearTime;
+ }
+
+ public int getLimitPriceOrderCount(ExchangeOrderDirection direction) {
+ int count = 0;
+ TreeMap<BigDecimal, MergeOrder> queue = direction == ExchangeOrderDirection.BUY ? buyLimitPriceQueue : sellLimitPriceQueue;
+ Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = queue.entrySet().iterator();
+ while (mergeOrderIterator.hasNext()) {
+ Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
+ MergeOrder mergeOrder = entry.getValue();
+ count += mergeOrder.size();
+ }
+ return count;
+ }
+
+ public OrderCoinService getOrderCoinService() {
+ return orderCoinService;
+ }
+
+ public void setOrderCoinService(OrderCoinService orderCoinService) {
+ this.orderCoinService = orderCoinService;
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java b/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java
new file mode 100644
index 0000000..e4eb159
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java
@@ -0,0 +1,40 @@
+package com.xcong.excoin.trade;
+
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+public class CoinTraderFactory {
+
+ private ConcurrentHashMap<String, CoinTrader> traderMap;
+
+ public CoinTraderFactory() {
+ traderMap = new ConcurrentHashMap<>();
+ }
+
+ //添加,已存在的无法添加
+ public void addTrader(String symbol, CoinTrader trader) {
+ if(!traderMap.containsKey(symbol)) {
+ traderMap.put(symbol, trader);
+ }
+ }
+
+ //重置,即使已经存在也会覆盖
+ public void resetTrader(String symbol, CoinTrader trader) {
+ traderMap.put(symbol, trader);
+ }
+
+ public boolean containsTrader(String symbol) {
+ return traderMap.containsKey(symbol);
+ }
+
+ public CoinTrader getTrader(String symbol) {
+ return traderMap.get(symbol);
+ }
+
+ public ConcurrentHashMap<String, CoinTrader> getTraderMap() {
+ return traderMap;
+ }
+
+}
diff --git a/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java b/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java
new file mode 100644
index 0000000..8da593e
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java
@@ -0,0 +1,5 @@
+package com.xcong.excoin.trade;
+
+public enum ExchangeOrderDirection {
+ BUY,SELL;
+}
diff --git a/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java b/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
new file mode 100644
index 0000000..ea56d87
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
@@ -0,0 +1,27 @@
+package com.xcong.excoin.trade;
+
+import com.alibaba.fastjson.JSON;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+
+/**
+ * 撮合交易信息
+ */
+@Data
+public class ExchangeTrade implements Serializable{
+ private String symbol;
+ private BigDecimal price;
+ private BigDecimal amount;
+ private BigDecimal buyTurnover;
+ private BigDecimal sellTurnover;
+ private int direction;
+ private Long buyOrderId;
+ private Long sellOrderId;
+ private Long time;
+ @Override
+ public String toString() {
+ return JSON.toJSONString(this);
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/MergeOrder.java b/src/main/java/com/xcong/excoin/trade/MergeOrder.java
new file mode 100644
index 0000000..76f1909
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/MergeOrder.java
@@ -0,0 +1,42 @@
+package com.xcong.excoin.trade;
+
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MergeOrder {
+ private List<OrderCoinsEntity> orders = new ArrayList<>();
+
+ //最后位置添加一个
+ public void add(OrderCoinsEntity order){
+ orders.add(order);
+ }
+
+
+ public OrderCoinsEntity get(){
+ return orders.get(0);
+ }
+
+ public int size(){
+ return orders.size();
+ }
+
+ public BigDecimal getPrice(){
+ return orders.get(0).getEntrustPrice();
+ }
+
+ public Iterator<OrderCoinsEntity> iterator(){
+ return orders.iterator();
+ }
+
+ public BigDecimal getTotalAmount() {
+ BigDecimal total = new BigDecimal(0);
+ for(OrderCoinsEntity item : orders) {
+ total = total.add(item.getEntrustCnt());
+ }
+ return total;
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/TradePlate.java b/src/main/java/com/xcong/excoin/trade/TradePlate.java
new file mode 100644
index 0000000..fe24e0b
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/TradePlate.java
@@ -0,0 +1,181 @@
+package com.xcong.excoin.trade;
+
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.math.BigDecimal;
+import java.util.LinkedList;
+
+/**
+ * 盘口信息
+ */
+@Data
+@Slf4j
+public class TradePlate {
+ private LinkedList<TradePlateItem> items;
+ //最大深度
+ private int maxDepth = 100;
+ //方向 订单类型 1、买入2、卖出
+ private int direction;
+ private String symbol;
+ public TradePlate(){
+
+ }
+
+ public TradePlate(String symbol,int direction) {
+ this.direction = direction;
+ this.symbol = symbol;
+ items = new LinkedList<>();
+ }
+
+ public boolean add(OrderCoinsEntity exchangeOrder) {
+ //log.info("add TradePlate order={}",exchangeOrder);
+ synchronized (items) {
+ int index = 0;
+ if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+ return false;
+ }
+ if (exchangeOrder.getOrderType() != direction) {
+ return false;
+ }
+ if (items.size() > 0) {
+ for (index = 0; index < items.size(); index++) {
+ TradePlateItem item = items.get(index);
+ if ((exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_BUY && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) > 0)
+ || (exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_SELL && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) < 0)) {
+ continue;
+ } else if (item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) == 0) {
+ BigDecimal deltaAmount = exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt());
+ item.setAmount(item.getAmount().add(deltaAmount));
+ return true;
+ } else {
+ break;
+ }
+ }
+ }
+ if(index < maxDepth) {
+ TradePlateItem newItem = new TradePlateItem();
+ newItem.setAmount(exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt()));
+ newItem.setPrice(exchangeOrder.getEntrustPrice());
+ items.add(index, newItem);
+ }
+ }
+ return true;
+ }
+
+ public void remove(OrderCoinsEntity order,BigDecimal amount) {
+ synchronized (items) {
+ //log.info("items>>init_size={},orderPrice={}",items.size(),order.getPrice());
+ for (int index = 0; index < items.size(); index++) {
+ TradePlateItem item = items.get(index);
+ if (item.getPrice().compareTo(order.getEntrustPrice()) == 0) {
+ item.setAmount(item.getAmount().subtract(amount));
+ if (item.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
+ items.remove(index);
+ }
+ //log.info("items>>final_size={},itemAmount={},itemPrice={}",items.size(),item.getAmount(),item.getPrice());
+ return;
+ }
+ }
+ log.info("items>>return_size={}",items.size());
+ }
+ }
+
+ public void remove(OrderCoinsEntity order){
+ remove(order,order.getEntrustCnt().subtract(order.getDealCnt()));
+ }
+
+ public void setItems(LinkedList<TradePlateItem> items){
+ this.items = items;
+ }
+
+ public BigDecimal getHighestPrice(){
+ if(items.size() == 0) {
+ return BigDecimal.ZERO;
+ }
+ if(direction == OrderCoinsEntity.ORDERTYPE_BUY){
+ return items.getFirst().getPrice();
+ }
+ else{
+ return items.getLast().getPrice();
+ }
+ }
+
+ public int getDepth(){
+ return items.size();
+ }
+
+
+ public BigDecimal getLowestPrice(){
+ if(items.size() == 0) {
+ return BigDecimal.ZERO;
+ }
+ if(direction == OrderCoinsEntity.ORDERTYPE_BUY){
+ return items.getLast().getPrice();
+ }
+ else{
+ return items.getFirst().getPrice();
+ }
+ }
+
+ /**
+ * 获取委托量最大的档位
+ * @return
+ */
+ public BigDecimal getMaxAmount(){
+ if(items.size() == 0) {
+ return BigDecimal.ZERO;
+ }
+ BigDecimal amount = BigDecimal.ZERO;
+ for(TradePlateItem item:items){
+ if(item.getAmount().compareTo(amount)>0){
+ amount = item.getAmount();
+ }
+ }
+ return amount;
+ }
+
+ /**
+ * 获取委托量最小的档位
+ * @return
+ */
+ public BigDecimal getMinAmount(){
+ if(items.size() == 0) {
+ return BigDecimal.ZERO;
+ }
+ BigDecimal amount = items.getFirst().getAmount();
+ for(TradePlateItem item:items){
+ if(item.getAmount().compareTo(amount) < 0){
+ amount = item.getAmount();
+ }
+ }
+ return amount;
+ }
+
+ public JSONObject toJSON(){
+ JSONObject json = new JSONObject();
+ json.put("direction",direction);
+ json.put("maxAmount",getMaxAmount());
+ json.put("minAmount",getMinAmount());
+ json.put("highestPrice",getHighestPrice());
+ json.put("lowestPrice",getLowestPrice());
+ json.put("symbol",getSymbol());
+ json.put("items",items);
+ return json;
+ }
+
+ public JSONObject toJSON(int limit){
+ JSONObject json = new JSONObject();
+ json.put("direction",direction);
+ json.put("maxAmount",getMaxAmount());
+ json.put("minAmount",getMinAmount());
+ json.put("highestPrice",getHighestPrice());
+ json.put("lowestPrice",getLowestPrice());
+ json.put("symbol",getSymbol());
+ json.put("items",items.size() > limit ? items.subList(0,limit) : items);
+ return json;
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/TradePlateItem.java b/src/main/java/com/xcong/excoin/trade/TradePlateItem.java
new file mode 100644
index 0000000..9a6f83a
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/TradePlateItem.java
@@ -0,0 +1,11 @@
+package com.xcong.excoin.trade;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+public class TradePlateItem {
+ private BigDecimal price;
+ private BigDecimal amount;
+}
diff --git a/src/main/java/com/xcong/excoin/trade/TradePlateModel.java b/src/main/java/com/xcong/excoin/trade/TradePlateModel.java
new file mode 100644
index 0000000..9760925
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/TradePlateModel.java
@@ -0,0 +1,13 @@
+package com.xcong.excoin.trade;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+public class TradePlateModel {
+ private List<List<BigDecimal>> buy = new ArrayList<>();
+ private List<List<BigDecimal>> sell = new ArrayList<>();
+}
diff --git a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
index fc2dc6c..e9ff270 100644
--- a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
+++ b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -22,6 +22,8 @@
return "EOS/USDT";
case "etcusdt":
return "ETC/USDT";
+ case "nekkusdt":
+ return "NEKK/USDT";
default:
return null;
}
@@ -43,6 +45,8 @@
return "EOS_NEW_PRICE";
case "ETC/USDT":
return "ETC_NEW_PRICE";
+ case "NEKK/USDT":
+ return "NEKK_NEW_PRICE";
default:
return null;
}
diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
new file mode 100644
index 0000000..b955f11
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -0,0 +1,220 @@
+package com.xcong.excoin.websocket;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.common.contants.AppContants;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import com.xcong.excoin.utils.RedisUtils;
+import com.xcong.excoin.utils.SpringContextHolder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+@ServerEndpoint(value = "/trade/market")
+@Component
+public class TradePlateSendWebSocket {
+ @Resource
+ RedisUtils redisUtils;
+
+ /**
+ * 记录当前在线连接数
+ */
+ private static AtomicInteger onlineCount = new AtomicInteger(0);
+
+
+ private static Map<String, Map<String, Session>> tradeplateClients = new ConcurrentHashMap<>();
+
+ private static Map<String, Map<String, Session>> klineClients = new ConcurrentHashMap<>();
+
+ /**
+ * 连接建立成功调用的方法
+ */
+ @OnOpen
+ public void onOpen(Session session) {
+ onlineCount.incrementAndGet(); // 在线数加1
+ log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
+ }
+
+ /**
+ * 连接关闭调用的方法
+ */
+ @OnClose
+ public void onClose(Session session) {
+ onlineCount.decrementAndGet(); // 在线数减1
+// Collection<Map<String, Session>> values = tradeplateClients.values();
+// if(CollectionUtils.isNotEmpty(values)){
+// for(Map<String,Session> map : values){
+// map.remove(session.getId());
+// }
+// }
+ log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
+ }
+
+ /**
+ * 收到客户端消息后调用的方法
+ *
+ * @param message 客户端发送过来的消息
+ */
+ @OnMessage
+ public void onMessage(String message, Session session) {
+ log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
+ // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol
+ //}
+ JSONObject jsonObject = JSON.parseObject(message);
+ // 盘口的判断
+ if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
+ String sub = jsonObject.get("sub").toString();
+ String symbol = sub.split("\\.")[1];
+ symbol = CoinTypeConvert.convert(symbol);
+ if (tradeplateClients.containsKey(symbol)) {
+ tradeplateClients.get(symbol).put(session.getId(), session);
+ } else {
+ Map<String, Session> map = new HashMap<>();
+ map.put(session.getId(), session);
+ tradeplateClients.put(symbol, map);
+ }
+ }
+ // 取消盘口订阅
+ if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
+ // `market.${symbol}.kline.${strPeriod}
+ String unsub = jsonObject.get("unsub").toString();
+ String[] split = unsub.split("\\.");
+ String symbol = split[1];
+ symbol = CoinTypeConvert.convert(symbol);
+ String key = symbol;
+ if (tradeplateClients.containsKey(key)) {
+ tradeplateClients.get(key).remove(session.getId());
+ }
+ }
+
+
+ // 最新K线订阅
+
+ // 根据消息判断这个用户需要订阅哪种数据
+ // {sub: `market.${symbol}.kline.${strPeriod}`,
+ // symbol: symbol,
+ // period: strPeriod
+ //}
+ // 取消订阅 {unsub: xxx(标识)}
+ if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
+ // 订阅
+ String sub = jsonObject.get("sub").toString();
+ String[] split = sub.split("\\.");
+ String symbol = split[1];
+ symbol = CoinTypeConvert.convert(symbol);
+ String period = split[3];
+ String key = symbol + "-" + period;
+ if (klineClients.containsKey(key)) {
+ // 有这个币种K线
+ Map<String, Session> stringSessionMap = klineClients.get(key);
+ if (!stringSessionMap.containsKey(session.getId())) {
+ stringSessionMap.put(session.getId(), session);
+ }
+ } else {
+ Map<String, Session> stringSessionMap = new HashMap<>();
+ stringSessionMap.put(session.getId(), session);
+ klineClients.put(key, stringSessionMap);
+ }
+ }
+
+ // 取消订阅
+ if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
+ // `market.${symbol}.kline.${strPeriod}
+ String unsub = jsonObject.get("unsub").toString();
+ String[] split = unsub.split("\\.");
+ String strPeriod = split[3];
+ String symbol = split[1];
+ symbol = CoinTypeConvert.convert(symbol);
+ String key = symbol + "-" + strPeriod;
+ if (klineClients.containsKey(key)) {
+ klineClients.get(key).remove(session.getId());
+ }
+ }
+
+ // 历史K线订阅
+ // {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"}
+ if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) {
+ String sub = jsonObject.get("req").toString();
+ String[] split = sub.split("\\.");
+ String symbol = split[1];
+ symbol = CoinTypeConvert.convert(symbol);
+ String period = split[3];
+ //String key = symbol+"-"+period;
+ // String key = "KINE_BCH/USDT_1week";
+ String key = "KINE_{}_{}";
+ // 币币k线数据
+ key = StrUtil.format(key, symbol, period);
+ RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
+ Object o = bean.get(key);
+ sendMessageHistory(JSON.toJSONString(o), session);
+ }
+ }
+
+ @OnError
+ public void onError(Session session, Throwable error) {
+ log.error("发生错误");
+ error.printStackTrace();
+ }
+
+ /**
+ * 群发消息
+ *
+ * @param message 消息内容
+ */
+ public void sendMessagePlate(String message, Session fromSession) {
+ if (tradeplateClients.containsKey("nekkusdt")) {
+ Map<String, Session> nekk = tradeplateClients.get("nekkusdt");
+ for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
+ Session toSession = sessionEntry.getValue();
+ // 排除掉自己
+ //if (!fromSession.getId().equals(toSession.getId())) {
+ log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+ boolean open = toSession.isOpen();
+ if (open) {
+ toSession.getAsyncRemote().sendText(message);
+ }
+
+ // }
+ }
+ }
+
+ }
+
+ public void sendMessageHistory(String message, Session toSession) {
+ log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+ boolean open = toSession.isOpen();
+ if (open) {
+ toSession.getAsyncRemote().sendText(message);
+ }
+ }
+
+ public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
+ String key = symbol + "-" + period;
+ if (klineClients.containsKey(key)) {
+ Map<String, Session> stringSessionMap = klineClients.get(key);
+ for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
+ Session toSession = sessionEntry.getValue();
+ // 排除掉自己
+ //if (!fromSession.getId().equals(toSession.getId())) {
+ log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+ boolean open = toSession.isOpen();
+ if (open) {
+ toSession.getAsyncRemote().sendText(message);
+ }
+ // }
+ }
+ }
+ }
+}
diff --git a/src/main/resources/mapper/member/MemberWalletCoinDao.xml b/src/main/resources/mapper/member/MemberWalletCoinDao.xml
index 79979c5..e7fd894 100644
--- a/src/main/resources/mapper/member/MemberWalletCoinDao.xml
+++ b/src/main/resources/mapper/member/MemberWalletCoinDao.xml
@@ -39,5 +39,21 @@
where id=#{id}
</update>
+ <update id="updateWalletBalance" parameterType="map">
+ update member_wallet_coin
+ <set>
+ <if test="availableBalance != null">
+ available_balance = IFNULL(available_balance, 0) + #{availableBalance},
+ </if>
+ <if test="totalBalance != null">
+ total_balance = IFNULL(total_balance, 0) + #{totalBalance},
+ </if>
+ <if test="frozenBalance != null">
+ frozen_balance = IFNULL(frozen_balance, 0) + #{frozenBalance},
+ </if>
+ </set>
+ where id=#{id}
+ </update>
+
</mapper>
\ No newline at end of file
diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
index 03a89ae..303667f 100644
--- a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
+++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
@@ -1,38 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
-<mapper namespace="com.xcong.excoin.modules.coin.dao.OrderCoinDealDao">
-
- <select id="selectAllWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
- select * from coins_order_deal
- <where>
- <if test="memberId != null and memberId != ''">
- and member_id = #{memberId}
- </if>
- </where>
- order by create_time desc
+<mapper namespace="com.xcong.excoin.modules.coin.dao.OrderCoinDealDao">
+
+ <select id="selectAllWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
+ select * from coins_order_deal
+ <where>
+ <if test="memberId != null and memberId != ''">
+ and member_id = #{memberId}
+ </if>
+ </where>
+ order by create_time desc
+ </select>
+
+ <select id="selectOrderCoinDealByTime" resultType="com.xcong.excoin.trade.ExchangeTrade">
+ SELECT
+ symbol symbol,
+ deal_price price,
+ symbol_cnt amount,
+ deal_amount buyTurnover,
+ deal_amount sellTurnover,
+ order_type direction,
+ create_time time
+ from coins_order_deal
+ where symbol = #{symbol}
+ and order_type = 1
+ and order_status = 3
+ and create_time between #{startTime} and #{endTime}
</select>
- <select id="selectAllWalletCoinOrderBySymbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
- select * from coins_order_deal
- <where>
- <if test="memberId != null and memberId != ''">
- and member_id = #{memberId}
- </if>
- <if test="symbol != null and symbol != ''">
- and symbol = #{symbol}
- </if>
- </where>
- order by create_time desc
- </select>
-
- <select id="selectWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
+ <select id="selectAllWalletCoinOrderBySymbol"
+ resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
+ select * from coins_order_deal
+ <where>
+ <if test="memberId != null and memberId != ''">
+ and member_id = #{memberId}
+ </if>
+ <if test="symbol != null and symbol != ''">
+ and symbol = #{symbol}
+ </if>
+ </where>
+ order by create_time desc
+ </select>
+
+ <select id="selectWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
select * from coins_order_deal where order_id= #{orderId} and member_id = #{memberId}
</select>
-
- <select id="findAllWalletCoinOrderInPage" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
+
+ <select id="findAllWalletCoinOrderInPage" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
select * from coins_order_deal
<if test="record != null">
<where>
- <if test="record.memberId != null" >
+ <if test="record.memberId != null">
and member_id=#{record.memberId}
</if>
<if test="record.symbol != null and record.symbol != ''">
@@ -42,5 +59,5 @@
</if>
order by create_time desc
</select>
-
+
</mapper>
diff --git a/src/test/java/com/xcong/excoin/GuijiTest.java b/src/test/java/com/xcong/excoin/GuijiTest.java
index 6b99989..8babbbe 100644
--- a/src/test/java/com/xcong/excoin/GuijiTest.java
+++ b/src/test/java/com/xcong/excoin/GuijiTest.java
@@ -116,8 +116,8 @@
e.printStackTrace();
}
}
-
-
+
+
}
--
Gitblit v1.9.1