From df1716a9abacac95261d686bdf0776bc7d6deca2 Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Sun, 13 Sep 2020 01:15:03 +0800
Subject: [PATCH] 撮合交易代码提交

---
 src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java                           |   16 
 src/main/java/com/xcong/excoin/processor/MarketHandler.java                                  |   21 
 src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java              |   14 
 src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java                                    |    4 
 src/main/java/com/xcong/excoin/trade/ExchangeTrade.java                                      |   27 
 src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java                        |  220 ++++
 src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java                           |  392 +++++++
 src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java                         |    5 
 pom.xml                                                                                      |    5 
 src/main/java/com/xcong/excoin/trade/TradePlateModel.java                                    |   13 
 src/main/java/com/xcong/excoin/processor/CoinProcessor.java                                  |   54 +
 src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java                                  |   40 
 src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java |    8 
 src/main/java/com/xcong/excoin/processor/CoinThumb.java                                      |   26 
 src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java                     |    5 
 src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java                    |   17 
 src/test/java/com/xcong/excoin/GuijiTest.java                                                |    4 
 src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java                              |   98 +
 src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java                         |   98 +
 src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java           |  290 +++++
 src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java     |   56 +
 src/main/java/com/xcong/excoin/trade/TradePlateItem.java                                     |   11 
 src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java                            |   65 +
 src/main/java/com/xcong/excoin/processor/MarketService.java                                  |  125 ++
 src/main/resources/mapper/member/MemberWalletCoinDao.xml                                     |   16 
 src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java                             |    5 
 src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java                             |    3 
 src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java                        |    3 
 src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml                               |   73 
 src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java                |    1 
 src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java                         |   47 
 src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java              |    7 
 src/main/java/com/xcong/excoin/trade/MergeOrder.java                                         |   42 
 src/main/java/com/xcong/excoin/trade/TradePlate.java                                         |  181 +++
 src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java                   |    1 
 src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java                           |   34 
 src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java                             |  159 +++
 src/main/java/com/xcong/excoin/trade/CoinTrader.java                                         |  762 +++++++++++++++
 38 files changed, 2,907 insertions(+), 41 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0438973..c77623f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-amqp</artifactId>
         </dependency>
+        <dependency>
+            <!-- websocket -->
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
 
 <!--        <dependency>-->
 <!--            <groupId>org.springframework.security</groupId>-->
diff --git a/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java b/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
index ca590aa..70888bd 100644
--- a/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
+++ b/src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
@@ -14,7 +14,8 @@
     ,BCH("BCH", "BCH/USDT")
     ,EOS("EOS", "EOS/USDT")
     ,XRP("XRP", "XRP/USDT")
-    ,ETC("ETC", "ETC/USDT");
+    ,ETC("ETC", "ETC/USDT")
+    ,NEKK("NEKK", "NEKK/USDT");
 
     private String name;
 
diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
index 0a791bf..1389b69 100644
--- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -29,6 +29,11 @@
 
     public static final String EXCHANGE_A = "biue-exchange-A";
 
+    /**
+     * 撮合交易
+     */
+    public static final String EXCHANGE_B = "biue-exchange-B";
+
 
     // 开多止盈队列
     public static final String QUEUE_MOREPRO = "QUEUE_MOREPRO_NEW";
@@ -51,6 +56,12 @@
     // 平仓队列
     public static final String QUEUE_CLOSETRADE = "QUEUE_CLOSETRADE_NEW";
 
+    // 盘口队列
+    public static final String QUEUE_TRADE_PLATE = "QUEUE_TRADE_PLATE";
+
+    // 处理交易
+    public static final String QUEUE_HANDLE_TRADE = "QUEUE_HANDLE_TRADE";
+
 
     // 开多止盈路由键
     public static final String ROUTINGKEY_MOREPRO = "ROUTINGKEY_MOREPRO";
@@ -71,6 +82,12 @@
     public static final String ROUTINGKEY_PRICEOPERATE = "ROUTINGKEY_PRICEOPERATE";
     // 平仓路由
     public static final String ROUTINGKEY_CLOSETRADE = "ROUTINGKEY_CLOSETRADE";
+
+    // 盘口理路由
+    public static final String ROUTINGKEY_TRADE_PLATE = "ROUTINGKEY_TRADE_PLATE";
+
+    // 交易订单处理
+    public static final String ROUTINGKEY_HANDLE_TRADE = "ROUTINGKEY_HANDLE_TRADE";
 
     @Resource
     private ConnectionFactory connectionFactory;
@@ -94,6 +111,7 @@
     public DirectExchange defaultExchange() {
         return new DirectExchange(EXCHANGE_ONE);
     }
+
 
     @Bean
     public Queue testQueue() {
@@ -204,6 +222,27 @@
 
 
     /**
+     * 盘口推送
+     *
+     * @return
+     */
+    @Bean
+    public Queue queuePlateTrade() {
+        return new Queue(QUEUE_TRADE_PLATE, true);
+    }
+
+    /**
+     * 交易订单处理
+     *
+     * @return
+     */
+    @Bean
+    public Queue queueHandleTrade() {
+        return new Queue(QUEUE_HANDLE_TRADE, true);
+    }
+
+
+    /**
      * 开多止盈
      *
      * @return
@@ -286,4 +325,30 @@
         return BindingBuilder.bind(queueCloseTrade()).to(orderExchange()).with(RabbitMqConfig.ROUTINGKEY_CLOSETRADE);
     }
 
+
+    @Bean
+    public DirectExchange matchTradeExchange() {
+        return new DirectExchange(EXCHANGE_B);
+    }
+
+    /**
+     * 盘口变化绑定
+     *
+     * @return
+     */
+    @Bean
+    public Binding bindingPlateTrade() {
+        return BindingBuilder.bind(queuePlateTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_TRADE_PLATE);
+    }
+
+    /**
+     *  交易订单处理
+     *
+     * @return
+     */
+    @Bean
+    public Binding bindingHandleTrade() {
+        return BindingBuilder.bind(queueHandleTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE);
+    }
+
 }
diff --git a/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java b/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java
new file mode 100644
index 0000000..31b5eeb
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java
@@ -0,0 +1,16 @@
+package com.xcong.excoin.configurations;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+@Configuration
+public class WebSocketConfig {
+    /**
+     * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
+     */
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java b/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
index dfe6523..dce5e35 100644
--- a/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
@@ -53,6 +53,7 @@
                 .antMatchers("/api/member/getAppVersionInfo").permitAll()
                 .antMatchers("/api/orderCoin/searchSymbolResultList").permitAll()
                 .antMatchers("/api/orderCoin/findCollect").permitAll()
+                .antMatchers("/trade/**").permitAll()
                 .anyRequest().authenticated()
                 .and().apply(securityConfiguereAdapter());
     }
diff --git a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
index f9f4ece..ca953be 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -67,7 +67,12 @@
 		Integer tradeType = submitSalesWalletCoinOrderDto.getTradeType();
 		BigDecimal price = submitSalesWalletCoinOrderDto.getPrice();
 		BigDecimal amount = submitSalesWalletCoinOrderDto.getAmount();
-		return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount);
+		if("NEKK".equals(symbol)){
+			return orderCoinService.submitSalesWalletCoinOrderWithMatch(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
+
+		}else{
+			return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount);
+		}
 	}
 	
 	/**
diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
index ab900b8..e38459d 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
@@ -1,7 +1,9 @@
 package com.xcong.excoin.modules.coin.dao;
 
+import java.util.Date;
 import java.util.List;
 
+import com.xcong.excoin.trade.ExchangeTrade;
 import org.apache.ibatis.annotations.Param;
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@@ -21,5 +23,6 @@
 	IPage<OrderCoinsDealEntity> findAllWalletCoinOrderInPage(Page<OrderCoinsDealEntity> page,
 			@Param("record") OrderCoinsDealEntity orderCoinsDealEntity);
 
+	List<ExchangeTrade> selectOrderCoinDealByTime(@Param("symbol")String symbol, @Param("startTime")Date startTime, @Param("endTime")Date endTime);
 	
 }
diff --git a/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java b/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java
index 54cadd8..f536c5d 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java
@@ -57,6 +57,11 @@
 	 * 成交价
 	 */
 	private BigDecimal dealPrice;
+
+	/**
+	 *  市价委托时的委托金额
+	 */
+	private BigDecimal entrustAmount;
 	/**
 	 * 成交金额
 	 */
diff --git a/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java b/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java
index b15186a..1395732 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java
@@ -26,12 +26,16 @@
 	private Integer tradeType;
 	
     @Min(0)
-	@NotNull(message = "数量不能为空")
+	//@NotNull(message = "数量不能为空")
 	@ApiModelProperty(value = "数量", example = "100")
 	private BigDecimal amount;
 
-	@NotNull(message = "建仓价不能为空")
+	//@NotNull(message = "建仓价不能为空")
 	@ApiModelProperty(value = "建仓价", example = "20.0000")
 	private BigDecimal price;
 
+
+	@ApiModelProperty(value = "市价时输入的总金额", example = "20.0000")
+	private BigDecimal entrustAmount;
+
 }
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
index a8f2914..8bf291c 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -1,11 +1,13 @@
 package com.xcong.excoin.modules.coin.service;
 
 import java.math.BigDecimal;
+import java.util.List;
 
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.xcong.excoin.common.response.Result;
 import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
 import com.xcong.excoin.modules.coin.parameter.dto.FindAllWalletCoinOrderDto;
+import com.xcong.excoin.trade.ExchangeTrade;
 
 public interface OrderCoinService extends IService<OrderCoinsEntity>{
 	
@@ -15,6 +17,19 @@
 
 	Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price,
 			BigDecimal amount);
+
+	/**
+	 *  需要撮合交易的币种提交买卖单
+	 * @param symbol
+	 * @param type
+	 * @param tradeType
+	 * @param price
+	 * @param amount
+	 * @param entrustAmount
+	 * @return
+	 */
+	Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price,
+			BigDecimal amount,BigDecimal entrustAmount);
 
 	public Result getEntrustWalletCoinOrder(String symbol, Integer status);
 
@@ -34,4 +49,6 @@
 
 	public void dealEntrustCoinOrder();
 
+	public void handleOrder(List<ExchangeTrade> trades);
+
 }
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
index c96299f..200f4a7 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -1,14 +1,9 @@
 package com.xcong.excoin.modules.coin.service.impl;
 
 import java.math.BigDecimal;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import javax.annotation.Resource;
 
@@ -16,6 +11,10 @@
 import com.xcong.excoin.modules.platform.entity.PlatformCnyUsdtExchangeEntity;
 import com.xcong.excoin.modules.platform.entity.PlatformSymbolsCoinEntity;
 
+import com.xcong.excoin.trade.CoinTrader;
+import com.xcong.excoin.trade.CoinTraderFactory;
+import com.xcong.excoin.trade.ExchangeTrade;
+import org.apache.commons.collections.CollectionUtils;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -79,6 +78,10 @@
     RedisUtils redisUtils;
     @Resource
     PlatformSymbolsCoinDao platformSymbolsCoinDao;
+
+    @Resource
+    private CoinTraderFactory factory;
+
 
     @Override
     public String generateSimpleSerialno(String userId) {
@@ -309,6 +312,174 @@
         record.setBalance(walletCoinUsdt.getAvailableBalance());
 
         memberAccountFlowEntityDao.insert(record);
+
+        return Result.ok(MessageSourceUtils.getString("order_service_0011"));
+    }
+
+    @Override
+    public Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) {
+        //获取用户ID
+        Long memberId = 13L;
+        BigDecimal nowPriceinBigDecimal = price;
+        //查询当前价
+        //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
+
+        // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
+        symbol = symbol.toUpperCase();
+        MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol);
+        if (ObjectUtil.isEmpty(walletCoin)) {
+            return Result.fail(MessageSourceUtils.getString("order_service_0003"));
+        }
+        // 查询交易设置
+        PlatformTradeSettingEntity tradeSetting = platformTradeSettingDao.findTradeSetting();
+        if (ObjectUtil.isEmpty(tradeSetting)) {
+            return Result.fail(MessageSourceUtils.getString("order_service_0009"));
+        }
+        // 手续费用(手续费=建仓价X数量X手续费率)
+        BigDecimal closingPrice ;
+        // 总费用分两种 1,限价交易 是价格*数量+手续费 2,市价交易 用户输入的金额+手续费
+        //总费用 = 成交价*数量+手续费
+        BigDecimal totalPayPrice ;
+        if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType) ){
+            // 限价
+            closingPrice = price.multiply(amount).multiply(tradeSetting.getCoinFeeRatio());
+            totalPayPrice = price.multiply(amount).add(closingPrice);
+        }else{
+            // 市价
+            closingPrice = entrustAmount.multiply(tradeSetting.getCoinFeeRatio());
+            totalPayPrice = entrustAmount.add(closingPrice);
+        }
+       // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice);
+
+        String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue();
+        MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode);
+        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
+            //买入,所需总费用跟用户USDT金额进行比较
+            BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance();
+            if (totalPayPrice.compareTo(availableBalance) > 0) {
+                return Result.fail(MessageSourceUtils.getString("order_service_0010"));
+            }
+        } else {
+            //卖出,用户币是否足够
+            BigDecimal availableBalance = walletCoin.getAvailableBalance();
+            if (amount.compareTo(availableBalance) > 0) {
+                return Result.fail(MessageSourceUtils.getString("order_service_0010"));
+            }
+        }
+
+        // 首先将单插入到数据库主表(委托表)
+        // 创建订单
+        OrderCoinsEntity order = new OrderCoinsEntity();
+        //根据委托类型生成不同数据
+        // 如果是限价交易直接插入主表数据
+        order.setMemberId(memberId);
+        order.setOrderNo(generateSimpleSerialno(memberId.toString()));
+        order.setOrderType(type);
+        order.setSymbol(symbol);
+        //order.setMarkPrice(nowPrice);
+
+        // 成交量 先设置为0
+        order.setDealCnt(BigDecimal.ZERO);
+        // 成交价
+        //order.setDealPrice(price);
+        // 成交金额
+        order.setDealAmount(BigDecimal.ZERO);
+        order.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DODING);
+        order.setTradeType(tradeType);
+        // 手续费
+        order.setFeeAmount(closingPrice);
+        if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType)){
+
+                // 限价 是需要价格和数量 可以得到成交金额
+                // 下单量
+                order.setEntrustCnt(amount);
+                // 下单价格
+                order.setEntrustPrice(price);
+                order.setEntrustAmount(amount.multiply(price));
+
+
+        }else{
+            if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
+                // 市价 只有金额
+                order.setEntrustAmount(entrustAmount);
+            }else{
+                // 下单量
+                order.setEntrustCnt(amount);
+                // 下单价格
+                order.setEntrustPrice(price);
+                order.setEntrustAmount(amount.multiply(price));
+            }
+
+        }
+        orderCoinsDao.insert(order);
+
+        //更新用户钱包信息
+        //冻结相应的资产
+        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
+            //如果是买入,所对应的币种增加,USDT账户减少金额
+            BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice);
+            BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice);
+            walletCoinUsdt.setAvailableBalance(availableBalance);
+            walletCoinUsdt.setFrozenBalance(frozenBalance);
+            memberWalletCoinDao.updateById(walletCoinUsdt);
+        } else {
+            //如果是卖出,币种减少,USDT增加
+            BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount);
+            BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount);
+            walletCoin.setAvailableBalance(availableBalance);
+            walletCoin.setFrozenBalance(frozenBalance);
+            memberWalletCoinDao.updateById(walletCoin);
+        }
+        // 加入到撮合
+        CoinTrader trader = factory.getTrader(symbol);
+//        if(trader==null){
+//
+//             trader = new CoinTrader("NEKK");
+//            //newTrader.setKafkaTemplate(kafkaTemplate);
+//            //newTrader.setBaseCoinScale(coin.getBaseCoinScale());
+//            //newTrader.setCoinScale(coin.getCoinScale());
+//            // newTrader.setPublishType(coin.getPublishType());
+//            //newTrader.setClearTime(coin.getClearTime());
+//
+//            // 创建成功以后需要对未处理订单预处理
+//            //log.info("======CoinTrader Process: " + symbol + "======");
+//            List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
+//            if(CollectionUtils.isNotEmpty(orders)){
+//                List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
+//                List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+//                orders.forEach(order1 -> {
+//                    tradingOrders.add(order1);
+//                });
+//                try {
+//                    trader.trade(tradingOrders);
+//                } catch (ParseException e) {
+//                    e.printStackTrace();
+//                    // log.info("异常:trader.trade(tradingOrders);");
+//                }
+//            }
+//
+//            trader.setReady(true);
+//            factory.addTrader(symbol, trader);
+//        }
+        trader.trade(order);
+
+
+//        // 流水记录 TODO
+//        MemberAccountFlowEntity record = new MemberAccountFlowEntity();
+//        record.setMemberId(memberId);
+//        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
+//            record.setPrice(totalPayPrice.setScale(4, BigDecimal.ROUND_DOWN));
+//            record.setSource(MemberAccountFlowEntity.SOURCE_BUY + symbol);
+//            record.setRemark(MemberAccountFlowEntity.REMARK_BUY + symbol + ":" + amount);
+//        } else {
+//            record.setPrice(totalPayPrice.negate().setScale(4, BigDecimal.ROUND_DOWN));
+//            record.setSource(MemberAccountFlowEntity.SOURCE_SALE + symbol);
+//            record.setRemark(MemberAccountFlowEntity.REMARK_SALE + symbol + ":" + amount);
+//        }
+//        record.setSymbol(symbol);
+//        record.setBalance(walletCoinUsdt.getAvailableBalance());
+//
+//        memberAccountFlowEntityDao.insert(record);
 
         return Result.ok(MessageSourceUtils.getString("order_service_0011"));
     }
@@ -638,4 +809,109 @@
             }
         }
     }
+
+    public void handleOrder(List<ExchangeTrade> trades){
+        // 处理撮合交易的订单
+        for(ExchangeTrade exchangeTrade : trades){
+
+            BigDecimal amount = exchangeTrade.getAmount();
+            Long buyOrderId = exchangeTrade.getBuyOrderId();
+            BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
+            int direction = exchangeTrade.getDirection();
+            BigDecimal price = exchangeTrade.getPrice();
+            Long sellOrderId = exchangeTrade.getSellOrderId();
+            // 买卖单都需要处理
+            // 买单
+            OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId);
+            if(buyOrderCoinsEntity!=null){
+                // 比较剩余的量
+                BigDecimal dealAmount = buyOrderCoinsEntity.getDealAmount();
+                // 单的总金额
+                BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount();
+                BigDecimal add = dealAmount.add(buyTurnover);
+                // 创建一个完成的单
+                OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
+                detail.setMemberId(buyOrderCoinsEntity.getMemberId());
+                //detail.setOrderId(order.getId());
+                detail.setOrderNo(buyOrderCoinsEntity.getOrderNo());
+                detail.setOrderType(buyOrderCoinsEntity.getOrderType());
+                detail.setTradeType(buyOrderCoinsEntity.getTradeType());
+                detail.setSymbol(buyOrderCoinsEntity.getSymbol());
+                detail.setSymbolCnt(amount);
+                detail.setEntrustPrice(buyOrderCoinsEntity.getEntrustPrice());
+                detail.setDealPrice(price);
+                detail.setDealAmount(buyTurnover);
+                //detail.setFeeAmount(closingPrice);
+                detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
+                orderCoinDealDao.insert(detail);
+                // 如果这个单成交完  更改状态
+                if(add.compareTo(entrustAmount)>=0){
+                    OrderCoinsEntity update = new OrderCoinsEntity();
+                    update.setId(buyOrderId);
+                    update.setDealAmount(entrustAmount);
+                    update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount));
+                    update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
+                    update.setUpdateTime(new Date());
+                    orderCoinsDao.updateById(update);
+                }else{
+                    OrderCoinsEntity update = new OrderCoinsEntity();
+                    update.setId(buyOrderId);
+                    update.setDealAmount(add);
+                    update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount));
+                    update.setUpdateTime(new Date());
+                    orderCoinsDao.updateById(update);
+                }
+            }
+            // 卖单
+            OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId);
+            if(sellOrderCoinsEntity!=null){
+                // 比较剩余的量
+                BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt();
+                // 单的总数量
+                BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt();
+                BigDecimal add = dealAmount.add(amount);
+                // 创建一个完成的单
+                OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
+                detail.setMemberId(sellOrderCoinsEntity.getMemberId());
+                //detail.setOrderId(order.getId());
+                detail.setOrderNo(sellOrderCoinsEntity.getOrderNo());
+                detail.setOrderType(sellOrderCoinsEntity.getOrderType());
+                detail.setTradeType(sellOrderCoinsEntity.getTradeType());
+                detail.setSymbol(sellOrderCoinsEntity.getSymbol());
+                detail.setSymbolCnt(amount);
+                detail.setEntrustPrice(sellOrderCoinsEntity.getEntrustPrice());
+                detail.setDealPrice(price);
+                detail.setDealAmount(buyTurnover);
+                //detail.setFeeAmount(closingPrice);
+                detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
+                orderCoinDealDao.insert(detail);
+                // 如果这个单成交完  更改状态
+                if(add.compareTo(entrustCnt)>=0){
+                    OrderCoinsEntity update = new OrderCoinsEntity();
+                    update.setId(sellOrderId);
+                    // 总成交额
+                    update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
+                    update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
+                    update.setDealCnt(entrustCnt);
+                    update.setUpdateTime(new Date());
+                    orderCoinsDao.updateById(update);
+                }else{
+                    // 未完成
+                    OrderCoinsEntity update = new OrderCoinsEntity();
+                    update.setId(sellOrderId);
+                    // 总成交额
+                    update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
+                    update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount));
+                    update.setUpdateTime(new Date());
+                    orderCoinsDao.updateById(update);
+                }
+                // 卖币得到的usdt
+                MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol());
+                if(memberWalletCoinEntity!=null){
+                    memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),buyTurnover,buyTurnover,null);
+                }
+
+            }
+        }
+    }
 }
diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java b/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java
new file mode 100644
index 0000000..cdedee9
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java
@@ -0,0 +1,14 @@
+package com.xcong.excoin.modules.exchange.service;
+
+import com.xcong.excoin.trade.ExchangeTrade;
+
+import java.util.List;
+
+public interface HandleKlineService {
+
+    /**
+     *  处理交易后的最新K线
+     * @param trades
+     */
+    void handleExchangeOrderToKline(List<ExchangeTrade> trades);
+}
diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
new file mode 100644
index 0000000..7add4f8
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
@@ -0,0 +1,56 @@
+package com.xcong.excoin.modules.exchange.service.impl;
+
+import cn.hutool.core.util.StrUtil;
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.modules.exchange.service.HandleKlineService;
+import com.xcong.excoin.processor.CoinProcessor;
+import com.xcong.excoin.processor.CoinProcessorFactory;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import com.xcong.excoin.utils.RedisUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class HandleKlineServiceImpl implements HandleKlineService {
+
+    @Resource
+    private CoinProcessorFactory processorFactory;
+
+    @Resource
+    private RedisUtils redisUtils;
+
+    @Override
+    public void handleExchangeOrderToKline(List<ExchangeTrade> trades) {
+        if (CollectionUtils.isEmpty(trades)) {
+            return;
+        }
+        String symbol = trades.get(0).getSymbol();
+        String symbolUsdt = symbol;
+        if(!symbol.contains("USDT")){
+            symbolUsdt = symbol+"/USDT";
+        }
+        CoinProcessor processor = processorFactory.getProcessor(symbol);
+        Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap();
+        Collection<Candlestick> values = currentKlineMap.values();
+        BigDecimal newPrice = trades.get(trades.size()-1).getPrice();
+        for (Candlestick candlestick : values) {
+            for (ExchangeTrade exchangeTrade : trades) {
+                processor.processTrade(candlestick, exchangeTrade);
+            }
+        }
+
+        // 存入redis,websocket去取
+        String key = "NEW_KINE_{}";
+        key = StrUtil.format(key, symbolUsdt);
+        redisUtils.set(key,currentKlineMap);
+        // 更新最新价
+        redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice);
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java b/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java
index 66a50dc..de32153 100644
--- a/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java
+++ b/src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java
@@ -22,4 +22,5 @@
     int subFrozenBalance(@Param("memberId") Long memberId, @Param("id") Long id, @Param("amount") BigDecimal amount);
 
     int updateBlockBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("earlyBalance") BigDecimal earlyBalance, @Param("blockNumber") Integer blockNumber);
+    int updateWalletBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("totalBalance") BigDecimal totalBalance, @Param("frozenBalance") BigDecimal frozenBalance);
 }
diff --git a/src/main/java/com/xcong/excoin/processor/CoinProcessor.java b/src/main/java/com/xcong/excoin/processor/CoinProcessor.java
new file mode 100644
index 0000000..6400c54
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/CoinProcessor.java
@@ -0,0 +1,54 @@
+package com.xcong.excoin.processor;
+
+
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.RedisUtils;
+
+import java.util.List;
+import java.util.Map;
+
+public interface CoinProcessor {
+
+    void setIsHalt(boolean status);
+
+    void setIsStopKLine(boolean stop);
+    
+    boolean isStopKline();
+    /**
+     * 处理新生成的交易信息
+     * @param trades
+     * @return
+     */
+    void process(List<ExchangeTrade> trades);
+
+    void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade);
+    /**
+     * 添加存储器
+     * @param storage
+     */
+    void addHandler(MarketHandler storage);
+
+    CoinThumb getThumb();
+
+    void setMarketService(MarketService service);
+
+    void generateKLine(int range, int field, long time);
+
+    Candlestick getKLine();
+
+    void initializeThumb();
+
+    void autoGenerate();
+
+    void resetThumb();
+
+   // void setExchangeRate(CoinExchangeRate coinExchangeRate);
+
+    void update24HVolume(long time);
+
+    //void initializeUsdRate();
+     Map<String,Candlestick> getCurrentKlineMap();
+
+     void setRedisUtils(RedisUtils redisUtils);
+}
diff --git a/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java b/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java
new file mode 100644
index 0000000..8c3ebda
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java
@@ -0,0 +1,34 @@
+package com.xcong.excoin.processor;
+
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+@Component
+public class CoinProcessorFactory {
+    private ConcurrentHashMap<String, CoinProcessor> processorMap;
+
+    public CoinProcessorFactory() {
+        processorMap = new ConcurrentHashMap<>();
+    }
+
+    public void addProcessor(String symbol, CoinProcessor processor) {
+        log.info("CoinProcessorFactory addProcessor = {}" + symbol);
+        processorMap.put(symbol, processor);
+    }
+
+    public boolean containsProcessor(String symbol) {
+    	return processorMap != null && processorMap.containsKey(symbol);
+    }
+    
+    public CoinProcessor getProcessor(String symbol) {
+        return processorMap.get(symbol);
+    }
+
+    public ConcurrentHashMap<String, CoinProcessor> getProcessorMap() {
+        return processorMap;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/processor/CoinThumb.java b/src/main/java/com/xcong/excoin/processor/CoinThumb.java
new file mode 100644
index 0000000..1e9a198
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/CoinThumb.java
@@ -0,0 +1,26 @@
+package com.xcong.excoin.processor;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+public class CoinThumb {
+    private String symbol;
+    private BigDecimal open = BigDecimal.ZERO;
+    private BigDecimal high= BigDecimal.ZERO;
+    private BigDecimal low= BigDecimal.ZERO;
+    private BigDecimal close=BigDecimal.ZERO;
+    private BigDecimal chg = BigDecimal.ZERO.setScale(2);
+    private BigDecimal change = BigDecimal.ZERO.setScale(2);
+    private BigDecimal volume = BigDecimal.ZERO.setScale(2);
+    private BigDecimal turnover= BigDecimal.ZERO;
+    //昨日收盘价
+    private BigDecimal lastDayClose = BigDecimal.ZERO;
+    //交易币对usd汇率
+    private BigDecimal usdRate;
+    //基币对usd的汇率
+    private BigDecimal baseUsdRate;
+    // 交易區
+    private int zone;
+}
diff --git a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
new file mode 100644
index 0000000..ca6dc28
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -0,0 +1,392 @@
+package com.xcong.excoin.processor;
+
+
+import com.alibaba.fastjson.JSON;
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.RedisUtils;
+import lombok.ToString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 默认交易处理器,产生1mK线信息
+ */
+@ToString
+public class DefaultCoinProcessor implements CoinProcessor {
+    private Logger logger = LoggerFactory.getLogger(DefaultCoinProcessor.class);
+    private String symbol;
+    private String baseCoin;
+    private Candlestick currentKLine;
+    private List<MarketHandler> handlers;
+    private CoinThumb coinThumb;
+    private MarketService service;
+    private RedisUtils redisUtils;
+    //private CoinExchangeRate coinExchangeRate;
+    //是否暂时处理
+    private Boolean isHalt = true;
+    //是否停止K线生成
+    private Boolean stopKLine = false;
+
+    /**
+     *  每个时间段的K线在生成后,生成一个最新的K线
+     */
+    private Map<String,Candlestick> currentKlineMap = new ConcurrentHashMap<>();
+
+    public DefaultCoinProcessor(String symbol, String baseCoin) {
+        //handlers = new ArrayList<>();
+        createNewKLine();
+        this.baseCoin = baseCoin;
+        this.symbol = symbol;
+    }
+
+    public String getSymbol() {
+        return symbol;
+    }
+
+    @Override
+    public void initializeThumb() {
+        Calendar calendar = Calendar.getInstance();
+        //将秒、微秒字段置为0
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+        long nowTime = calendar.getTimeInMillis();
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        long firstTimeOfToday = calendar.getTimeInMillis();
+        String period = "1min";
+        logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime);
+        List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period);
+        coinThumb = new CoinThumb();
+        synchronized (coinThumb) {
+            coinThumb.setSymbol(symbol);
+            for (Candlestick kline : lines) {
+                if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+                    continue;
+                }
+                if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+                    coinThumb.setOpen(kline.getOpen());
+                }
+                if (coinThumb.getHigh().compareTo(kline.getHigh()) < 0) {
+                    coinThumb.setHigh(kline.getHigh());
+                }
+                if (kline.getLow().compareTo(BigDecimal.ZERO) > 0 && coinThumb.getLow().compareTo(kline.getLow()) > 0) {
+                    coinThumb.setLow(kline.getLow());
+                }
+                if (kline.getClose().compareTo(BigDecimal.ZERO) > 0) {
+                    coinThumb.setClose(kline.getClose());
+                }
+                coinThumb.setVolume(coinThumb.getVolume().add(kline.getVolume()));
+                // TODO
+                coinThumb.setTurnover(coinThumb.getTurnover().add(kline.getAmount()));
+            }
+            coinThumb.setChange(coinThumb.getClose().subtract(coinThumb.getOpen()));
+            // 此处计算涨幅并没有以开盘价为标准,而是以最低价
+            if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) {
+                coinThumb.setChg(coinThumb.getChange().divide(coinThumb.getLow(), 4, RoundingMode.UP));
+            }
+        }
+    }
+
+    public void createNewKLine() {
+        currentKLine = new Candlestick();
+        synchronized (currentKLine) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.set(Calendar.SECOND, 0);
+            calendar.set(Calendar.MILLISECOND, 0);
+            //1Min时间要是下一整分钟的
+            calendar.add(Calendar.MINUTE, 1);
+            currentKLine.setTimestamp(calendar.getTimeInMillis());
+            // K线类型
+            //currentKLine.setPeriod("1min");
+            currentKLine.setCount(0);
+        }
+    }
+
+    /**
+     * 00:00:00 时重置CoinThumb
+     */
+    @Override
+    public void resetThumb() {
+        logger.info("reset coinThumb");
+        synchronized (coinThumb) {
+            coinThumb.setOpen(BigDecimal.ZERO);
+            coinThumb.setHigh(BigDecimal.ZERO);
+            //设置昨收价格
+            coinThumb.setLastDayClose(coinThumb.getClose());
+            //coinThumb.setClose(BigDecimal.ZERO);
+            coinThumb.setLow(BigDecimal.ZERO);
+            coinThumb.setChg(BigDecimal.ZERO);
+            coinThumb.setChange(BigDecimal.ZERO);
+        }
+    }
+
+//    @Override
+//    public void setExchangeRate(CoinExchangeRate coinExchangeRate) {
+//        this.coinExchangeRate = coinExchangeRate;
+//    }
+
+    @Override
+    public void update24HVolume(long time) {
+        if(coinThumb!=null) {
+            synchronized (coinThumb) {
+                Calendar calendar = Calendar.getInstance();
+                calendar.setTimeInMillis(time);
+                calendar.add(Calendar.HOUR_OF_DAY, -24);
+                long timeStart = calendar.getTimeInMillis();
+                // TODO
+                BigDecimal volume = service.findTradeVolume(this.symbol, timeStart, time);
+                coinThumb.setVolume(volume.setScale(4, RoundingMode.DOWN));
+            }
+        }
+    }
+
+//    @Override
+//    public void initializeUsdRate() {
+//        //logger.info("symbol = {} ,baseCoin = {}",this.symbol,this.baseCoin);
+//        BigDecimal baseUsdRate = coinExchangeRate.getUsdRate(baseCoin);
+//        coinThumb.setBaseUsdRate(baseUsdRate);
+//        //logger.info("setBaseUsdRate = ",baseUsdRate);
+//        BigDecimal multiply = coinThumb.getClose().multiply(coinExchangeRate.getUsdRate(baseCoin));
+//        //logger.info("setUsdRate = ",multiply);
+//        coinThumb.setUsdRate(multiply);
+//    }
+
+
+    @Override
+    public void autoGenerate() {
+        DateFormat df = new SimpleDateFormat("HH:mm:ss");
+        //logger.info("auto generate 1min kline in {},data={}", df.format(new Date(currentKLine.getTime())), JSON.toJSONString(currentKLine));
+        if(coinThumb != null) {
+            synchronized (currentKLine) {
+                //没有成交价时存储上一笔成交价
+                if(currentKLine.getOpen()==null){
+                    currentKLine.setOpen(BigDecimal.ZERO);
+                }
+                if (currentKLine.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+                    currentKLine.setOpen(coinThumb.getClose());
+                    currentKLine.setLow(coinThumb.getClose());
+                    currentKLine.setHigh(coinThumb.getClose());
+                    currentKLine.setClose(coinThumb.getClose());
+                }
+                Calendar calendar = Calendar.getInstance();
+                calendar.set(Calendar.SECOND, 0);
+                calendar.set(Calendar.MILLISECOND, 0);
+                currentKLine.setTimestamp(calendar.getTimeInMillis());
+                handleKLineStorage(currentKLine);
+                createNewKLine();
+            }
+        }
+    }
+
+    @Override
+    public void setIsHalt(boolean status) {
+        this.isHalt = status;
+    }
+
+    @Override
+    public void process(List<ExchangeTrade> trades) {
+        if (!isHalt) {
+            if (trades == null || trades.size() == 0) {
+                return;
+            }
+            synchronized (currentKLine) {
+                for (ExchangeTrade exchangeTrade : trades) {
+                    //处理K线
+                    processTrade(currentKLine, exchangeTrade);
+                    //处理今日概况信息
+                    logger.debug("处理今日概况信息");
+                    handleThumb(exchangeTrade);
+                    //存储并推送成交信息
+                    handleTradeStorage(exchangeTrade);
+                }
+            }
+        }
+    }
+
+    public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) {
+        if (kLine.getClose().compareTo(BigDecimal.ZERO) == 0) {
+            //第一次设置K线值
+            kLine.setOpen(exchangeTrade.getPrice());
+            kLine.setHigh(exchangeTrade.getPrice());
+            kLine.setLow(exchangeTrade.getPrice());
+            kLine.setClose(exchangeTrade.getPrice());
+        } else {
+            kLine.setHigh(exchangeTrade.getPrice().max(kLine.getHigh()));
+            kLine.setLow(exchangeTrade.getPrice().min(kLine.getLow()));
+            kLine.setClose(exchangeTrade.getPrice());
+        }
+        kLine.setCount(kLine.getCount() + 1);
+        kLine.setVolume(kLine.getVolume().add(exchangeTrade.getAmount()));
+        BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount());
+       // kLine.setTurnover(kLine.getTurnover().add(turnover));
+    }
+
+    public void handleTradeStorage(ExchangeTrade exchangeTrade) {
+        for (MarketHandler storage : handlers) {
+            storage.handleTrade(symbol, exchangeTrade, coinThumb);
+        }
+    }
+
+    public void handleKLineStorage(Candlestick kLine) {
+        // 存储交易信息 TODO 发送最新的一根K线
+//        for (MarketHandler storage : handlers) {
+//            storage.handleKLine(symbol, kLine);
+//        }
+    }
+
+    public void handleThumb(ExchangeTrade exchangeTrade) {
+        logger.info("handleThumb symbol = {}", this.symbol);
+        synchronized (coinThumb) {
+            if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+                //第一笔交易记为开盘价
+                coinThumb.setOpen(exchangeTrade.getPrice());
+            }
+            coinThumb.setHigh(exchangeTrade.getPrice().max(coinThumb.getHigh()));
+            if (coinThumb.getLow().compareTo(BigDecimal.ZERO) == 0) {
+                coinThumb.setLow(exchangeTrade.getPrice());
+            } else {
+                coinThumb.setLow(exchangeTrade.getPrice().min(coinThumb.getLow()));
+            }
+            coinThumb.setClose(exchangeTrade.getPrice());
+            coinThumb.setVolume(coinThumb.getVolume().add(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP));
+            BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP);
+            coinThumb.setTurnover(coinThumb.getTurnover().add(turnover));
+            BigDecimal change = coinThumb.getClose().subtract(coinThumb.getOpen());
+            coinThumb.setChange(change);
+            if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) {
+                coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP));
+            }
+            if ("USDT".equalsIgnoreCase(baseCoin)) {
+                logger.info("setUsdRate", exchangeTrade.getPrice());
+                coinThumb.setUsdRate(exchangeTrade.getPrice());
+            } else {
+
+            }
+            //coinThumb.setBaseUsdRate(coinExchangeRate.getUsdRate(baseCoin));
+            //coinThumb.setUsdRate(exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin)));
+            //logger.info("setUsdRate", exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin)));
+            //logger.info("thumb = {}", coinThumb);
+        }
+    }
+
+    @Override
+    public void addHandler(MarketHandler storage) {
+        handlers.add(storage);
+    }
+
+
+    @Override
+    public CoinThumb getThumb() {
+        return coinThumb;
+    }
+
+    @Override
+    public void setMarketService(MarketService service) {
+        this.service = service;
+    }
+
+    @Override
+    public void generateKLine(int range, int field, long time) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTimeInMillis(time);
+        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        long endTick = calendar.getTimeInMillis();
+        String endTime = df.format(calendar.getTime());
+        //往前推range个时间单位
+        calendar.add(field, -range);
+        String fromTime = df.format(calendar.getTime());
+        long startTick = calendar.getTimeInMillis();
+        System.out.println("time range from " + fromTime + " to " + endTime);
+        List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick);
+
+        Candlestick kLine = new Candlestick();
+        kLine.setTimestamp(endTick);
+        kLine.setAmount(BigDecimal.ZERO);
+        kLine.setClose(BigDecimal.ZERO);
+        kLine.setLow(BigDecimal.ZERO);
+        kLine.setOpen(BigDecimal.ZERO);
+        kLine.setVolume(BigDecimal.ZERO);
+        kLine.setHigh(BigDecimal.ZERO);
+        String rangeUnit = "";
+        if (field == Calendar.MINUTE) {
+            rangeUnit = "min";
+        } else if (field == Calendar.HOUR_OF_DAY) {
+            rangeUnit = "hour";
+        } else if (field == Calendar.DAY_OF_WEEK) {
+            rangeUnit = "week";
+        } else if (field == Calendar.DAY_OF_YEAR) {
+            rangeUnit = "day";
+        } else if (field == Calendar.MONTH) {
+            rangeUnit = "month";
+        }
+       // kLine.setPeriod(range + rangeUnit);
+        String period = range + rangeUnit;
+        // 处理K线信息
+        for (ExchangeTrade exchangeTrade : exchangeTrades) {
+            processTrade(kLine, exchangeTrade);
+        }
+        // 如果开盘价为0,则设置为前一个价格
+        if(kLine.getOpen().compareTo(BigDecimal.ZERO) == 0) {
+        	kLine.setOpen(coinThumb.getClose());
+        	kLine.setClose(coinThumb.getClose());
+        	kLine.setLow(coinThumb.getClose());
+        	kLine.setHigh(coinThumb.getClose());
+        }
+        logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine));
+        service.saveKLine(symbol,period, kLine);
+        // 生成一个对应的新K线 后续的交易会更新这个最新K线数据
+        Candlestick newKline = new Candlestick();
+        //kLine.setTimestamp(endTick);
+        newKline.setAmount(BigDecimal.ZERO);
+        newKline.setClose(kLine.getClose());
+        newKline.setLow(kLine.getClose());
+        // 开盘价是上个K线的收盘价
+        newKline.setOpen(kLine.getClose());
+        newKline.setVolume(BigDecimal.ZERO);
+        newKline.setHigh(kLine.getClose());
+        currentKlineMap.put(period,newKline);
+
+        // 存储昨日K线
+        if("day".equals(rangeUnit)){
+            redisUtils.set("NEKK/USDT",kLine);
+        }
+    }
+
+    @Override
+    public Candlestick getKLine() {
+        return currentKLine;
+    }
+
+	@Override
+	public void setIsStopKLine(boolean stop) {
+		this.stopKLine = stop;
+	}
+
+	@Override
+	public boolean isStopKline() {
+		return this.stopKLine;
+	}
+
+
+	@Override
+    public Map<String, Candlestick> getCurrentKlineMap() {
+        return currentKlineMap;
+    }
+
+    @Override
+    public void setRedisUtils(RedisUtils redisUtils) {
+        this.redisUtils = redisUtils;
+    }
+
+    public void setCurrentKlineMap(Map<String, Candlestick> currentKlineMap) {
+        this.currentKlineMap = currentKlineMap;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java b/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java
new file mode 100644
index 0000000..4583886
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java
@@ -0,0 +1,5 @@
+package com.xcong.excoin.processor;
+
+public enum ExchangeOrderDirection {
+    BUY,SELL;
+}
diff --git a/src/main/java/com/xcong/excoin/processor/MarketHandler.java b/src/main/java/com/xcong/excoin/processor/MarketHandler.java
new file mode 100644
index 0000000..526fa71
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/MarketHandler.java
@@ -0,0 +1,21 @@
+package com.xcong.excoin.processor;
+
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.trade.ExchangeTrade;
+
+public interface MarketHandler {
+
+    /**
+     * 存储交易信息
+     * @param exchangeTrade
+     */
+    void handleTrade(String symbol, ExchangeTrade exchangeTrade, CoinThumb thumb);
+
+
+    /**
+     * 存储K线信息
+     *
+     * @param kLine
+     */
+    void handleKLine(String symbol, Candlestick kLine);
+}
diff --git a/src/main/java/com/xcong/excoin/processor/MarketService.java b/src/main/java/com/xcong/excoin/processor/MarketService.java
new file mode 100644
index 0000000..11c6c56
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/processor/MarketService.java
@@ -0,0 +1,125 @@
+package com.xcong.excoin.processor;
+
+
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.RedisUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@Service
+public class MarketService {
+//    @Autowired
+//    private MongoTemplate mongoTemplate;
+
+    @Resource
+    private OrderCoinDealDao orderCoinDealDao;
+
+    @Resource
+    private RedisUtils redisUtils;
+
+    public List<Candlestick> findAllKLine(String symbol, String peroid) {
+//        Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time"));
+//        Query query = new Query().with(sort).limit(1000);
+//
+//        return mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+peroid);
+        return null;
+    }
+
+    public List<Candlestick> findAllKLine(String symbol, long fromTime, long toTime, String period) {
+//        Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
+//        Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
+//        Query query = new Query(criteria).with(sort);
+//        List<KLine> kLines = mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+ period);
+//        return kLines;
+        String key = "KINE_" + symbol + "_" + period;
+        Object data = redisUtils.get(key);
+        List list = new ArrayList();
+        if (data != null) {
+            list = (List) data;
+        }
+        return list;
+    }
+//
+//    public ExchangeTrade findFirstTrade(String symbol,long fromTime,long toTime){
+//        Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
+//        Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
+//        Query query = new Query(criteria).with(sort);
+//        return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
+//    }
+
+//    public ExchangeTrade findLastTrade(String symbol,long fromTime,long toTime){
+//        Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
+//        Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time"));
+//        Query query = new Query(criteria).with(sort);
+//        return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
+//    }
+//
+//    public ExchangeTrade findTrade(String symbol, long fromTime, long toTime, Sort.Order order){
+//        Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
+//        Sort sort = new Sort(order);
+//        Query query = new Query(criteria).with(sort);
+//        return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
+//    }
+
+    public List<ExchangeTrade> findTradeByTimeRange(String symbol, long timeStart, long timeEnd) {
+//        Criteria criteria = Criteria.where("time").gte(timeStart).andOperator(Criteria.where("time").lt(timeEnd));
+//        Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
+//        Query query = new Query(criteria).with(sort);
+//
+//        return mongoTemplate.find(query,ExchangeTrade.class,"exchange_trade_"+symbol);
+        return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart));
+        // return null;
+    }
+
+    public void saveKLine(String symbol, String period, Candlestick kLine) {
+        // 先获取
+        String key = "KINE_" + symbol + "_" + period;
+        Object data = redisUtils.get(key);
+        List list = new ArrayList();
+        if (data != null) {
+            list = (List) data;
+        }
+        list.add(kLine);
+        redisUtils.set("KINE_" + symbol + "_" + period, list);
+        //  mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod());
+    }
+
+    /**
+     * 查找某时间段内的交易量
+     *
+     * @param symbol
+     * @param timeStart
+     * @param timeEnd
+     * @return
+     */
+    public BigDecimal findTradeVolume(String symbol, long timeStart, long timeEnd) {
+//        Criteria criteria = Criteria.where("time").gt(timeStart)
+//                .andOperator(Criteria.where("time").lte(timeEnd));
+//                //.andOperator(Criteria.where("volume").gt(0));
+//        Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
+//        Query query = new Query(criteria).with(sort);
+//        List<KLine> kLines =  mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_1min");
+//        BigDecimal totalVolume = BigDecimal.ZERO;
+//        for(KLine kLine:kLines){
+//            totalVolume = totalVolume.add(kLine.getVolume());
+//        }
+//        return totalVolume;
+        List<ExchangeTrade> exchangeTrades = orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart));
+        BigDecimal totalVolume = BigDecimal.ZERO;
+        if (CollectionUtils.isNotEmpty(exchangeTrades)) {
+            for (ExchangeTrade kLine : exchangeTrades) {
+                totalVolume = totalVolume.add(kLine.getBuyTurnover());
+            }
+        }
+
+        return totalVolume;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
new file mode 100644
index 0000000..90a3e9c
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -0,0 +1,98 @@
+package com.xcong.excoin.quartz.job;
+
+import com.alibaba.fastjson.JSON;
+import com.huobi.client.SubscriptionClient;
+import com.huobi.client.SubscriptionOptions;
+import com.huobi.client.model.Candlestick;
+import com.huobi.client.model.enums.CandlestickInterval;
+import com.xcong.excoin.modules.coin.dao.OrderCoinsDao;
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+import com.xcong.excoin.modules.coin.service.OrderCoinService;
+import com.xcong.excoin.modules.symbols.service.SymbolsService;
+import com.xcong.excoin.processor.CoinProcessor;
+import com.xcong.excoin.processor.CoinProcessorFactory;
+import com.xcong.excoin.processor.DefaultCoinProcessor;
+import com.xcong.excoin.processor.MarketService;
+import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
+import com.xcong.excoin.trade.CoinTrader;
+import com.xcong.excoin.trade.CoinTraderFactory;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import com.xcong.excoin.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *  开启撮合交易
+ *
+ * @author wzy
+ * @date 2020-05-28
+ **/
+@Slf4j
+@Component
+//@ConditionalOnProperty(prefix = "app", name = "trade", havingValue = "true")
+public class CoinTradeInitJob {
+
+    @Resource
+    private OrderCoinsDao orderCoinsDao;
+
+    @Resource
+    private CoinTraderFactory factory;
+
+    @Resource
+    private OrderCoinService coinService;
+
+    @Resource
+    private MarketService marketService;
+
+    @Resource
+    private CoinProcessorFactory processorFactory;
+
+    @PostConstruct
+    public void initCoinTrade() {
+        log.info("#=======撮合交易器开启=======#");
+        String symbol = "NEKK";
+        CoinTrader newTrader = new CoinTrader(symbol);
+        newTrader.setOrderCoinService(coinService);
+        //newTrader.setKafkaTemplate(kafkaTemplate);
+        //newTrader.setBaseCoinScale(coin.getBaseCoinScale());
+        //newTrader.setCoinScale(coin.getCoinScale());
+        // newTrader.setPublishType(coin.getPublishType());
+        //newTrader.setClearTime(coin.getClearTime());
+
+        // 创建成功以后需要对未处理订单预处理
+        log.info("======CoinTrader Process: " + symbol + "======");
+        List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
+        List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
+        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+        orders.forEach(order -> {
+            tradingOrders.add(order);
+        });
+        try {
+            newTrader.trade(tradingOrders);
+        } catch (ParseException e) {
+            e.printStackTrace();
+            log.info("异常:trader.trade(tradingOrders);");
+        }
+        newTrader.setReady(true);
+        factory.addTrader(symbol, newTrader);
+
+        // 创建K线生成器
+        CoinProcessor processor = new DefaultCoinProcessor(symbol, "USDT");
+        processor.setMarketService(marketService);
+        //processor.setExchangeRate(exchangeRate);
+        processor.initializeThumb();
+        //processor.initializeUsdRate();
+        processor.setIsHalt(false);
+
+        processorFactory.addProcessor(symbol, processor);
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
new file mode 100644
index 0000000..110c1fc
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -0,0 +1,159 @@
+package com.xcong.excoin.quartz.job;
+
+import com.alibaba.fastjson.JSON;
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.processor.CoinProcessorFactory;
+import com.xcong.excoin.trade.TradePlateModel;
+import com.xcong.excoin.utils.RedisUtils;
+import com.xcong.excoin.websocket.TradePlateSendWebSocket;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+/**
+ * 生成各时间段的K线信息
+ *
+ */
+@Component
+@Slf4j
+public class KLineGeneratorJob {
+    @Resource
+    private CoinProcessorFactory processorFactory;
+
+    @Resource
+	private TradePlateSendWebSocket plateSendWebSocket;
+
+	@Resource
+	private RedisUtils redisUtils;
+
+	@Scheduled(cron = "0/2 * * * * *")
+    public void tradePlate(){
+		redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()));
+		Candlestick candlestick = new Candlestick();
+		candlestick.setOpen(new BigDecimal("10.33"));
+		candlestick.setHigh(new BigDecimal("15.23"));
+		candlestick.setVolume(new BigDecimal("12121.34"));
+		candlestick.setLow(new BigDecimal("8.234"));
+		candlestick.setAmount(new BigDecimal("1199"));
+		candlestick.setTimestamp(1599840000);
+		candlestick.setId(1599840000L);
+		candlestick.setCount(100002);
+		candlestick.setClose(new BigDecimal("12.2323"));
+		redisUtils.set("NEKK/USDT",candlestick);
+		// [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]]
+		TradePlateModel tradePlateModel = new TradePlateModel();
+		List<BigDecimal> buy;
+		List<BigDecimal> sell;
+		for(int i=0;i<5;i++){
+			buy = new ArrayList<>(2);
+			buy.add(new BigDecimal(Math.random()*i));
+			buy.add(new BigDecimal(Math.random()*i));
+			tradePlateModel.getBuy().add(buy);
+			sell = new ArrayList<>(2);
+			sell.add(new BigDecimal(Math.random()*i*2));
+			sell.add(new BigDecimal(Math.random()*i*2));
+			tradePlateModel.getSell().add(sell);
+		}
+
+		plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null);
+		plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
+		plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+		plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+	}
+
+    /**
+     * 每分钟定时器,处理分钟K线
+     */
+    @Scheduled(cron = "0 * * * * *")
+    public void handle5minKLine(){
+
+        Calendar calendar = Calendar.getInstance();
+        log.debug("分钟K线:{}",calendar.getTime());
+        //将秒、微秒字段置为0
+        calendar.set(Calendar.SECOND,0);
+        calendar.set(Calendar.MILLISECOND,0);
+        long time = calendar.getTimeInMillis();
+        int minute = calendar.get(Calendar.MINUTE);
+        int hour = calendar.get(Calendar.HOUR_OF_DAY);
+        processorFactory.getProcessorMap().forEach((symbol,processor)->{
+        	if(!processor.isStopKline()) {
+	            log.debug("生成{}分钟k线:{}",symbol);
+	            //生成1分钟K线
+	            processor.autoGenerate();
+	            //更新24H成交量
+	            processor.update24HVolume(time);
+	            if(minute%5 == 0) {
+	            	// 五分钟的当前K线
+	                processor.generateKLine(5, Calendar.MINUTE, time);
+	            }
+	            if(minute%10 == 0){
+	                processor.generateKLine(10, Calendar.MINUTE,time);
+	            }
+	            if(minute%15 == 0){
+	                processor.generateKLine(15, Calendar.MINUTE,time);
+	            }
+	            if(minute%30 == 0){
+	                processor.generateKLine(30, Calendar.MINUTE,time);
+	            }
+	            if(hour == 0 && minute == 0){
+	                processor.resetThumb();
+	            }
+        	}
+        });
+    }
+
+    /**
+     * 每小时运行
+     */
+    @Scheduled(cron = "0 0 * * * *")
+    public void handleHourKLine(){
+        processorFactory.getProcessorMap().forEach((symbol,processor)-> {
+        	if(!processor.isStopKline()) {
+	            Calendar calendar = Calendar.getInstance();
+	            log.info("小时K线:{}",calendar.getTime());
+	            //将秒、微秒字段置为0
+	            calendar.set(Calendar.MINUTE, 0);
+	            calendar.set(Calendar.SECOND, 0);
+	            calendar.set(Calendar.MILLISECOND, 0);
+	            long time = calendar.getTimeInMillis();
+	
+	            processor.generateKLine(1, Calendar.HOUR_OF_DAY, time);
+        	}
+        });
+    }
+
+    /**
+     * 每日0点处理器,处理日K线
+     */
+    @Scheduled(cron = "0 0 0 * * *")
+    public void handleDayKLine(){
+        processorFactory.getProcessorMap().forEach((symbol,processor)->{
+        	if(!processor.isStopKline()) {
+	            Calendar calendar = Calendar.getInstance();
+	            log.info("日K线:{}",calendar.getTime());
+	            //将秒、微秒字段置为0
+	            calendar.set(Calendar.HOUR_OF_DAY,0);
+	            calendar.set(Calendar.MINUTE,0);
+	            calendar.set(Calendar.SECOND,0);
+	            calendar.set(Calendar.MILLISECOND,0);
+	            long time = calendar.getTimeInMillis();
+	            int week = calendar.get(Calendar.DAY_OF_WEEK);
+	            int dayOfMonth = calendar.get(Calendar.DAY_OF_MONTH);
+	            if(week == 1){
+	                processor.generateKLine(1, Calendar.DAY_OF_WEEK, time);
+	            }
+	            if(dayOfMonth == 1){
+	                processor.generateKLine(1, Calendar.DAY_OF_MONTH, time);
+	            }
+	            processor.generateKLine(1, Calendar.DAY_OF_YEAR,time);
+        	}
+        });
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
new file mode 100644
index 0000000..9b03546
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -0,0 +1,98 @@
+package com.xcong.excoin.rabbit.consumer;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import com.xcong.excoin.modules.coin.service.OrderCoinService;
+import com.xcong.excoin.modules.exchange.service.HandleKlineService;
+import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.RedisUtils;
+import com.xcong.excoin.websocket.TradePlateSendWebSocket;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author wzy
+ * @date 2020-05-25
+ **/
+@Slf4j
+@Component
+public class ExchangeConsumer {
+
+    @Resource
+    private TradePlateSendWebSocket tradePlateSendWebSocket;
+
+    @Resource
+    private RedisUtils redisUtils;
+
+    @Resource
+    private HandleKlineService handleKlineService;
+
+    @Resource
+    private OrderCoinService orderCoinService;
+
+    /**
+     *  发送盘口信息
+     * @param content
+     */
+    @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
+    public void tradePlate(String content) {
+        log.info("#---->{}#", content);
+        tradePlateSendWebSocket.sendMessagePlate(content,null);
+    }
+
+    /**
+     *  处理订单
+     * @param content
+     */
+    @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE)
+    public void handleTradeExchange(String content) {
+        log.info("#---->{}#", content);
+        List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
+        // 处理K线 并更新最新价
+        handleKlineService.handleExchangeOrderToKline(exchangeTrades);
+        // 推送最新K线
+        String symbol = exchangeTrades.get(0).getSymbol();
+        String symbolUsdt = symbol;
+        if(!symbol.contains("USDT")){
+            symbolUsdt = symbol+"/USDT";
+        }
+        String key = "NEW_KINE_{}";
+        key = StrUtil.format(key, symbolUsdt);
+        Object o = redisUtils.get(key);
+        Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
+        Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
+        for(Map.Entry<String, Candlestick> map : entries){
+            tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
+        }
+        // 处理用户订单
+        orderCoinService.handleOrder(exchangeTrades);
+    }
+
+    /**
+     *  更新最新K线
+     * @param content
+     */
+//    @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
+//    public void newKling(String content) {
+//        log.info("#---->{}#", content);
+//        // 最新K线的币种
+//        String key = "NEW_KINE_{}";
+//        key = StrUtil.format(key, content);
+//        Object o = redisUtils.get(key);
+//        Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o;
+//        // 推送最新K线
+//        Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
+//        for(Map.Entry<String, Candlestick> map : entries){
+//            tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
+//        }
+//
+//    }
+}
diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
new file mode 100644
index 0000000..42e67d4
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
@@ -0,0 +1,47 @@
+package com.xcong.excoin.rabbit.producer;
+
+import cn.hutool.core.util.IdUtil;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @date 2020-05-25
+ **/
+@Slf4j
+@Component
+public class ExchangeProducer implements RabbitTemplate.ConfirmCallback {
+
+    private RabbitTemplate rabbitTemplate;
+
+    @Autowired
+    public ExchangeProducer(RabbitTemplate rabbitTemplate) {
+        this.rabbitTemplate = rabbitTemplate;
+        rabbitTemplate.setConfirmCallback(this);
+    }
+
+    public void sendPlateMsg(String content) {
+        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
+        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_TRADE_PLATE, content, correlationData);
+    }
+
+    public void sendHandleTrade(String content) {
+        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
+        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE, content, correlationData);
+    }
+
+
+    @Override
+    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+        log.info("#----->{}#", correlationData);
+        if (ack) {
+            log.info("success");
+        } else {
+            log.info("--->{}", cause);
+        }
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
new file mode 100644
index 0000000..46289af
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -0,0 +1,762 @@
+package com.xcong.excoin.trade;
+
+import com.alibaba.fastjson.JSON;
+
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+import com.xcong.excoin.modules.coin.service.OrderCoinService;
+import com.xcong.excoin.rabbit.producer.ExchangeProducer;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class CoinTrader {
+    private String symbol;
+    private ExchangeProducer exchangeProducer;
+    private OrderCoinService orderCoinService;
+    //交易币种的精度
+    private int coinScale = 4;
+    //基币的精度
+    private int baseCoinScale = 4;
+    private Logger logger = LoggerFactory.getLogger(CoinTrader.class);
+    //买入限价订单链表,价格从高到低排列
+    private TreeMap<BigDecimal, MergeOrder> buyLimitPriceQueue;
+    //卖出限价订单链表,价格从低到高排列
+    private TreeMap<BigDecimal, MergeOrder> sellLimitPriceQueue;
+    //买入市价订单链表,按时间从小到大排序
+    private LinkedList<OrderCoinsEntity> buyMarketQueue;
+    //卖出市价订单链表,按时间从小到大排序
+    private LinkedList<OrderCoinsEntity> sellMarketQueue;
+    //卖盘盘口信息
+    private TradePlate sellTradePlate;
+    //买盘盘口信息
+    private TradePlate buyTradePlate;
+    //是否暂停交易
+    private boolean tradingHalt = false;
+    private boolean ready = false;
+    //交易对信息
+    //private ExchangeCoinPublishType publishType;
+    private String clearTime;
+
+    private SimpleDateFormat dateTimeFormat;
+
+
+    public CoinTrader(String symbol) {
+        this.symbol = symbol;
+        initialize();
+    }
+
+    /**
+     * 初始化交易线程
+     */
+    public void initialize() {
+        logger.info("init CoinTrader for symbol {}", symbol);
+        //买单队列价格降序排列
+        buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder());
+        //卖单队列价格升序排列
+        this.sellLimitPriceQueue = new TreeMap<>(Comparator.naturalOrder());
+        this.buyMarketQueue = new LinkedList<>();
+        this.sellMarketQueue = new LinkedList<>();
+        this.sellTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_SELL);
+        this.buyTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_BUY);
+        this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    }
+
+    /**
+     * 增加限价订单到队列,买入单按从价格高到低排,卖出单按价格从低到高排
+     *
+     * @param exchangeOrder
+     */
+    public void addLimitPriceOrder(OrderCoinsEntity exchangeOrder) {
+        if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+            return;
+        }
+        //logger.info("addLimitPriceOrder,orderId = {}", exchangeOrder.getOrderId());
+        TreeMap<BigDecimal, MergeOrder> list;
+        if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+            list = buyLimitPriceQueue;
+            // 添加盘口信息 TODO
+            buyTradePlate.add(exchangeOrder);
+            if (ready) {
+                // 发送盘口信息
+                sendTradePlateMessage(buyTradePlate, sellTradePlate);
+            }
+        } else {
+            list = sellLimitPriceQueue;
+            sellTradePlate.add(exchangeOrder);
+            if (ready) {
+                sendTradePlateMessage(buyTradePlate, sellTradePlate);
+            }
+        }
+        synchronized (list) {
+            MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice());
+            if (mergeOrder == null) {
+                mergeOrder = new MergeOrder();
+                mergeOrder.add(exchangeOrder);
+                list.put(exchangeOrder.getEntrustPrice(), mergeOrder);
+            } else {
+                mergeOrder.add(exchangeOrder);
+            }
+        }
+    }
+
+    public void addMarketPriceOrder(OrderCoinsEntity exchangeOrder) {
+        if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+            return;
+        }
+        logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
+        LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue;
+        synchronized (list) {
+            list.addLast(exchangeOrder);
+        }
+    }
+
+    public void trade(List<OrderCoinsEntity> orders) throws ParseException {
+        if (tradingHalt) {
+            return;
+        }
+        for (OrderCoinsEntity order : orders) {
+            trade(order);
+        }
+    }
+
+
+    /**
+     * 主动交易输入的订单,交易不完成的会输入到队列
+     *
+     * @param exchangeOrder
+     * @throws ParseException
+     */
+    public void trade(OrderCoinsEntity exchangeOrder) {
+        if (tradingHalt) {
+            return;
+        }
+        //logger.info("trade order={}",exchangeOrder);
+        if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) {
+            logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
+            return;
+        }
+        // 如果
+        if (exchangeOrder.getEntrustAmount().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustAmount().subtract(exchangeOrder.getDealAmount()).compareTo(BigDecimal.ZERO) <= 0) {
+            return;
+        }
+
+        TreeMap<BigDecimal, MergeOrder> limitPriceOrderList;
+        LinkedList<OrderCoinsEntity> marketPriceOrderList;
+        if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+            // 买单 和限价卖单以及市价市场卖单队列对比 完成撮合
+            limitPriceOrderList = sellLimitPriceQueue;
+            marketPriceOrderList = sellMarketQueue;
+        } else {
+            limitPriceOrderList = buyLimitPriceQueue;
+            marketPriceOrderList = buyMarketQueue;
+        }
+        if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+            //logger.info(">>>>>市价单>>>交易与限价单交易");
+            //与限价单交易
+            matchMarketPriceWithLPList(limitPriceOrderList, exchangeOrder);
+        } else if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+            //限价单价格必须大于0
+            if (exchangeOrder.getEntrustPrice().compareTo(BigDecimal.ZERO) <= 0) {
+                return;
+            }
+
+            //logger.info(">>>>>限价单>>>交易与限价单交易");
+            //先与限价单交易
+            matchLimitPriceWithLPList(limitPriceOrderList, exchangeOrder, false);
+            if (exchangeOrder.getEntrustCnt().compareTo(exchangeOrder.getDealCnt()) > 0) {
+                //logger.info(">>>>限价单未交易完>>>>与市价单交易>>>>");
+                //后与市价单交易
+                matchLimitPriceWithMPList(marketPriceOrderList, exchangeOrder);
+            }
+        }
+    }
+
+    /**
+     * 限价委托单与限价队列匹配
+     *
+     * @param lpList       限价对手单队列
+     * @param focusedOrder 交易订单
+     */
+    public void matchLimitPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder, boolean canEnterList) {
+        List<ExchangeTrade> exchangeTrades = new ArrayList<>();
+        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+        synchronized (lpList) {
+            Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator();
+            boolean exitLoop = false;
+            while (!exitLoop && mergeOrderIterator.hasNext()) {
+                Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
+                MergeOrder mergeOrder = entry.getValue();
+                Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
+                //买入单需要匹配的价格不大于委托价,否则退出
+                if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) > 0) {
+                    break;
+                }
+                //卖出单需要匹配的价格不小于委托价,否则退出
+                if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) < 0) {
+                    break;
+                }
+                while (orderIterator.hasNext()) {
+                    OrderCoinsEntity matchOrder = orderIterator.next();
+                    //处理匹配
+                    ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
+                    exchangeTrades.add(trade);
+                    //判断匹配单是否完成
+                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+                        //当前匹配的订单完成交易,删除该订单
+                        orderIterator.remove();
+                        completedOrders.add(matchOrder);
+                    }
+                    //判断交易单是否完成
+                    if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+                        //交易完成
+                        completedOrders.add(focusedOrder);
+                        //退出循环
+                        exitLoop = true;
+                        break;
+                    }
+                }
+                if (mergeOrder.size() == 0) {
+                    mergeOrderIterator.remove();
+                }
+            }
+        }
+        //如果还没有交易完,订单压入列表中
+        if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0 && canEnterList) {
+            addLimitPriceOrder(focusedOrder);
+        }
+        //每个订单的匹配批量推送
+        handleExchangeTrade(exchangeTrades);
+        if (completedOrders.size() > 0) {
+            orderCompleted(completedOrders);
+            TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate;
+            sendTradePlateMessage(buyTradePlate, sellTradePlate);
+        }
+    }
+
+    /**
+     * 限价委托单与市价队列匹配
+     *
+     * @param mpList       市价对手单队列
+     * @param focusedOrder 交易订单
+     */
+    public void matchLimitPriceWithMPList(LinkedList<OrderCoinsEntity> mpList, OrderCoinsEntity focusedOrder) {
+        List<ExchangeTrade> exchangeTrades = new ArrayList<>();
+        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+        synchronized (mpList) {
+            Iterator<OrderCoinsEntity> iterator = mpList.iterator();
+            while (iterator.hasNext()) {
+                OrderCoinsEntity matchOrder = iterator.next();
+                ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
+                logger.info(">>>>>" + trade);
+                if (trade != null) {
+                    exchangeTrades.add(trade);
+                }
+                //判断匹配单是否完成,市价单amount为成交量
+                if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+                    iterator.remove();
+                    completedOrders.add(matchOrder);
+                }
+                //判断吃单是否完成,判断成交量是否完成
+                if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+                    //交易完成
+                    completedOrders.add(focusedOrder);
+                    //退出循环
+                    break;
+                }
+            }
+        }
+        //如果还没有交易完,订单压入列表中
+        if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0) {
+            addLimitPriceOrder(focusedOrder);
+        }
+        //每个订单的匹配批量推送
+        handleExchangeTrade(exchangeTrades);
+        orderCompleted(completedOrders);
+    }
+
+
+    /**
+     * 市价委托单与限价对手单列表交易
+     *
+     * @param lpList       限价对手单列表
+     * @param focusedOrder 待交易订单
+     */
+    public void matchMarketPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder) {
+        List<ExchangeTrade> exchangeTrades = new ArrayList<>();
+        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
+        // 加锁 同步
+        synchronized (lpList) {
+            Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator();
+            boolean exitLoop = false;
+            while (!exitLoop && mergeOrderIterator.hasNext()) {
+                Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
+                MergeOrder mergeOrder = entry.getValue();
+                Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
+                while (orderIterator.hasNext()) {
+                    OrderCoinsEntity matchOrder = orderIterator.next();
+                    //处理匹配 用户单和买卖盘内的委托单进行比较,得到可以交易的量
+                    // 例如 市价买单 1000个 卖1 500 卖2 300 卖3 500 此时从卖1开始吃到卖3只剩300个
+                    ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
+                    if (trade != null) {
+                        exchangeTrades.add(trade);
+                    }
+                    //判断匹配单是否完成
+                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+                        //当前匹配的订单完成交易,删除该订单
+                        orderIterator.remove();
+                        completedOrders.add(matchOrder);
+                    }
+                    //判断焦点订单是否完成
+                    if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
+                        completedOrders.add(focusedOrder);
+                        //退出循环
+                        exitLoop = true;
+                        break;
+                    }
+                }
+                if (mergeOrder.size() == 0) {
+                    mergeOrderIterator.remove();
+                }
+            }
+        }
+        //如果还没有交易完,订单压入列表中,市价买单按成交量算
+        if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0
+                || focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0) {
+            addMarketPriceOrder(focusedOrder);
+        }
+        //每个订单的匹配批量推送
+        handleExchangeTrade(exchangeTrades);
+        if (completedOrders.size() > 0) {
+            orderCompleted(completedOrders);
+            TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate;
+            sendTradePlateMessage(buyTradePlate, sellTradePlate);
+        }
+    }
+
+    /**
+     * 计算委托单剩余可成交的数量
+     *
+     * @param order     委托单
+     * @param dealPrice 成交价
+     * @return
+     */
+    private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) {
+        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+            //剩余成交量 TODO ?
+            // 委托量-成交量=剩余量
+            BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
+            return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN);
+        } else {
+            return order.getEntrustCnt().subtract(order.getDealCnt());
+        }
+    }
+
+    /**
+     * 调整市价单剩余成交额,当剩余成交额不足时设置订单完成
+     *
+     * @param order
+     * @param dealPrice
+     * @return
+     */
+    private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) {
+        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+//            BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover());
+//            if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
+//                    .compareTo(BigDecimal.ZERO)==0){
+//                order.setTurnover(order.getAmount());
+//                return leftTurnover;
+//            }
+        }
+        return BigDecimal.ZERO;
+    }
+
+    /**
+     * 处理两个匹配的委托订单
+     *
+     * @param focusedOrder 焦点单
+     * @param matchOrder   匹配单
+     * @return
+     */
+    private ExchangeTrade processMatch(OrderCoinsEntity focusedOrder, OrderCoinsEntity matchOrder) {
+        //需要交易的数量,成交量,成交价,可用数量
+        BigDecimal needAmount, dealPrice, availAmount;
+        //如果匹配单是限价单,则以其价格为成交价
+        if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+            dealPrice = matchOrder.getEntrustPrice();
+        } else {
+            dealPrice = focusedOrder.getEntrustPrice();
+        }
+        //成交价必须大于0
+        if (dealPrice.compareTo(BigDecimal.ZERO) <= 0) {
+            return null;
+        }
+        // 需要的成交量
+        needAmount = calculateTradedAmount(focusedOrder, dealPrice);
+        // 队列单可提供的成交量
+        availAmount = calculateTradedAmount(matchOrder, dealPrice);
+        //计算成交量 取少的
+        BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount);
+        logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
+        //如果成交额为0说明剩余额度无法成交,退出
+        if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) {
+            return null;
+        }
+
+        //计算成交额,成交额要保留足够精度
+        BigDecimal turnover = tradedAmount.multiply(dealPrice);
+        matchOrder.setDealCnt(matchOrder.getDealCnt().add(tradedAmount));
+        // 成交金额
+        matchOrder.setDealAmount(matchOrder.getDealAmount().add(turnover));
+        // 用户单成交量
+        focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount));
+        // 用户单成交金额
+        focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover));
+
+        //创建成交记录
+        ExchangeTrade exchangeTrade = new ExchangeTrade();
+        exchangeTrade.setSymbol(symbol);
+        // 成交量
+        exchangeTrade.setAmount(tradedAmount);
+        exchangeTrade.setDirection(focusedOrder.getOrderType());
+        // 成交价格
+        exchangeTrade.setPrice(dealPrice);
+        // 成交金额
+        exchangeTrade.setBuyTurnover(turnover);
+        exchangeTrade.setSellTurnover(turnover);
+        //校正市价单剩余成交额
+        if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == focusedOrder.getTradeType() && focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+            BigDecimal adjustTurnover = adjustMarketOrderTurnover(focusedOrder, dealPrice);
+            exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
+        } else if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == matchOrder.getTradeType() && matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+            BigDecimal adjustTurnover = adjustMarketOrderTurnover(matchOrder, dealPrice);
+            exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
+        }
+
+        if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+            exchangeTrade.setBuyOrderId(focusedOrder.getId());
+            exchangeTrade.setSellOrderId(matchOrder.getId());
+        } else {
+            exchangeTrade.setBuyOrderId(matchOrder.getId());
+            exchangeTrade.setSellOrderId(focusedOrder.getId());
+        }
+
+        exchangeTrade.setTime(Calendar.getInstance().getTimeInMillis());
+        if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+            if (matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+                buyTradePlate.remove(matchOrder, tradedAmount);
+            } else {
+                sellTradePlate.remove(matchOrder, tradedAmount);
+            }
+        }
+        return exchangeTrade;
+    }
+
+
+    // 这里是交易完的单
+    public void handleExchangeTrade(List<ExchangeTrade> trades) {
+        //logger.info("handleExchangeTrade:{}", trades);
+        if (trades.size() > 0) {
+            int maxSize = 1000;
+            //发送消息,key为交易对符号
+            if (trades.size() > maxSize) {
+                int size = trades.size();
+                for (int index = 0; index < size; index += maxSize) {
+                    int length = (size - index) > maxSize ? maxSize : size - index;
+                    List<ExchangeTrade> subTrades = trades.subList(index, index + length);
+                    exchangeProducer.sendHandleTrade(JSON.toJSONString(subTrades));
+                    //orderCoinService.handleOrder(subTrades);
+                }
+            } else {
+                trades.forEach(e -> {
+                    System.out.println(e);
+                });
+                exchangeProducer.sendHandleTrade(JSON.toJSONString(trades));
+                //orderCoinService.handleOrder(trades);
+                // kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades));
+            }
+            // 更新最新K线 TODO
+
+        }
+    }
+
+    /**
+     * 订单完成,执行消息通知,订单数超1000个要拆分发送
+     *
+     * @param orders
+     */
+    public void orderCompleted(List<OrderCoinsEntity> orders) {
+        //logger.info("orderCompleted ,order={}",orders);
+        if (orders.size() > 0) {
+            int maxSize = 1000;
+            if (orders.size() > maxSize) {
+                int size = orders.size();
+                for (int index = 0; index < size; index += maxSize) {
+                    int length = (size - index) > maxSize ? maxSize : size - index;
+                    List<OrderCoinsEntity> subOrders = orders.subList(index, index + length);
+                    // TODO 通知订单完成
+                    //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders));
+                }
+            } else {
+                // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders));
+            }
+        }
+    }
+
+    /**
+     * 发送盘口变化消息
+     *
+     * @param buyTradePlate sellTradePlate
+     */
+    public void sendTradePlateMessage(TradePlate buyTradePlate, TradePlate sellTradePlate) {
+        //防止并发引起数组越界,造成盘口倒挂 TODO
+        List<List<BigDecimal>> plate;
+        List<BigDecimal> plateItem;
+        TradePlateModel tradePlateModel = new TradePlateModel();
+        // 转换格式
+        if (buyTradePlate != null && buyTradePlate.getItems() != null) {
+            plate = new ArrayList<>();
+            LinkedList<TradePlateItem> items = buyTradePlate.getItems();
+            for (TradePlateItem item : items) {
+                plateItem = new ArrayList<>(2);
+                BigDecimal price = item.getPrice();
+                BigDecimal amount = item.getAmount();
+                plateItem.add(price);
+                plateItem.add(amount);
+                plate.add(plateItem);
+            }
+            tradePlateModel.setBuy(plate);
+        }
+
+        if (sellTradePlate != null && sellTradePlate.getItems() != null) {
+            plate = new ArrayList<>();
+            LinkedList<TradePlateItem> items = sellTradePlate.getItems();
+            for (TradePlateItem item : items) {
+                plateItem = new ArrayList<>(2);
+                BigDecimal price = item.getPrice();
+                BigDecimal amount = item.getAmount();
+                plateItem.add(price);
+                plateItem.add(amount);
+                plate.add(plateItem);
+            }
+            tradePlateModel.setSell(plate);
+        }
+
+        // 盘口发生变化通知TODO
+
+        exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel));
+    }
+
+    /**
+     * 取消委托订单
+     *
+     * @param exchangeOrder
+     * @return
+     */
+    public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) {
+        logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
+        if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+            //处理市价单
+            Iterator<OrderCoinsEntity> orderIterator;
+            List<OrderCoinsEntity> list = null;
+            if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+                list = this.buyMarketQueue;
+            } else {
+                list = this.sellMarketQueue;
+            }
+            synchronized (list) {
+                orderIterator = list.iterator();
+                while ((orderIterator.hasNext())) {
+                    OrderCoinsEntity order = orderIterator.next();
+                    if (order.getId().equals(exchangeOrder.getId())) {
+                        orderIterator.remove();
+                        onRemoveOrder(order);
+                        return order;
+                    }
+                }
+            }
+        } else {
+            //处理限价单
+            TreeMap<BigDecimal, MergeOrder> list = null;
+            Iterator<MergeOrder> mergeOrderIterator;
+            if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+                list = this.buyLimitPriceQueue;
+            } else {
+                list = this.sellLimitPriceQueue;
+            }
+            synchronized (list) {
+                MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice());
+                if (mergeOrder != null) {
+                    Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
+                    while (orderIterator.hasNext()) {
+                        OrderCoinsEntity order = orderIterator.next();
+                        if (order.getId().equals(exchangeOrder.getId())) {
+                            orderIterator.remove();
+                            if (mergeOrder.size() == 0) {
+                                list.remove(exchangeOrder.getEntrustPrice());
+                            }
+                            onRemoveOrder(order);
+                            return order;
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public void onRemoveOrder(OrderCoinsEntity order) {
+        if (order.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
+            if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
+                buyTradePlate.remove(order);
+                sendTradePlateMessage(buyTradePlate, sellTradePlate);
+            } else {
+                sellTradePlate.remove(order);
+                sendTradePlateMessage(buyTradePlate, sellTradePlate);
+            }
+        }
+    }
+
+
+    public TradePlate getTradePlate(ExchangeOrderDirection direction) {
+        if (direction == ExchangeOrderDirection.BUY) {
+            return buyTradePlate;
+        } else {
+            return sellTradePlate;
+        }
+    }
+
+
+    /**
+     * 查询交易器里的订单
+     *
+     * @param orderId
+     * @param type
+     * @param direction
+     * @return
+     */
+    public OrderCoinsEntity findOrder(String orderId, int type, int direction) {
+        if (type == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+            LinkedList<OrderCoinsEntity> list;
+            if (direction == OrderCoinsEntity.ORDERTYPE_BUY) {
+                list = this.buyMarketQueue;
+            } else {
+                list = this.sellMarketQueue;
+            }
+            synchronized (list) {
+                Iterator<OrderCoinsEntity> orderIterator = list.iterator();
+                while ((orderIterator.hasNext())) {
+                    OrderCoinsEntity order = orderIterator.next();
+                    if (order.getId().equals(orderId)) {
+                        return order;
+                    }
+                }
+            }
+        } else {
+            TreeMap<BigDecimal, MergeOrder> list;
+            if (direction == OrderCoinsEntity.ORDERTYPE_BUY) {
+                list = this.buyLimitPriceQueue;
+            } else {
+                list = this.sellLimitPriceQueue;
+            }
+            synchronized (list) {
+                Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = list.entrySet().iterator();
+                while (mergeOrderIterator.hasNext()) {
+                    Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
+                    MergeOrder mergeOrder = entry.getValue();
+                    Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
+                    while ((orderIterator.hasNext())) {
+                        OrderCoinsEntity order = orderIterator.next();
+                        if (order.getId().equals(orderId)) {
+                            return order;
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public TreeMap<BigDecimal, MergeOrder> getBuyLimitPriceQueue() {
+        return buyLimitPriceQueue;
+    }
+
+    public LinkedList<OrderCoinsEntity> getBuyMarketQueue() {
+        return buyMarketQueue;
+    }
+
+    public TreeMap<BigDecimal, MergeOrder> getSellLimitPriceQueue() {
+        return sellLimitPriceQueue;
+    }
+
+    public LinkedList<OrderCoinsEntity> getSellMarketQueue() {
+        return sellMarketQueue;
+    }
+
+
+    public void setCoinScale(int scale) {
+        this.coinScale = scale;
+    }
+
+    public void setBaseCoinScale(int scale) {
+        this.baseCoinScale = scale;
+    }
+
+    public boolean isTradingHalt() {
+        return this.tradingHalt;
+    }
+
+    /**
+     * 暂停交易,不接收新的订单
+     */
+    public void haltTrading() {
+        this.tradingHalt = true;
+    }
+
+    /**
+     * 恢复交易
+     */
+    public void resumeTrading() {
+        this.tradingHalt = false;
+    }
+
+    public void stopTrading() {
+        //TODO:停止交易,取消当前所有订单
+    }
+
+    public boolean getReady() {
+        return this.ready;
+    }
+
+    public void setReady(boolean ready) {
+        this.ready = ready;
+    }
+
+    public void setClearTime(String clearTime) {
+        this.clearTime = clearTime;
+    }
+
+    public int getLimitPriceOrderCount(ExchangeOrderDirection direction) {
+        int count = 0;
+        TreeMap<BigDecimal, MergeOrder> queue = direction == ExchangeOrderDirection.BUY ? buyLimitPriceQueue : sellLimitPriceQueue;
+        Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = queue.entrySet().iterator();
+        while (mergeOrderIterator.hasNext()) {
+            Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
+            MergeOrder mergeOrder = entry.getValue();
+            count += mergeOrder.size();
+        }
+        return count;
+    }
+
+    public OrderCoinService getOrderCoinService() {
+        return orderCoinService;
+    }
+
+    public void setOrderCoinService(OrderCoinService orderCoinService) {
+        this.orderCoinService = orderCoinService;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java b/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java
new file mode 100644
index 0000000..e4eb159
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java
@@ -0,0 +1,40 @@
+package com.xcong.excoin.trade;
+
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+@Service
+public class CoinTraderFactory {
+
+	private ConcurrentHashMap<String, CoinTrader> traderMap;
+
+	public CoinTraderFactory() {
+		traderMap = new ConcurrentHashMap<>();
+	}
+
+	//添加,已存在的无法添加
+	public void addTrader(String symbol, CoinTrader trader) {
+		if(!traderMap.containsKey(symbol)) {
+			traderMap.put(symbol, trader);
+		}
+	}
+
+	//重置,即使已经存在也会覆盖
+	public void resetTrader(String symbol, CoinTrader trader) {
+		traderMap.put(symbol, trader);
+	}
+	
+	public boolean containsTrader(String symbol) {
+		return traderMap.containsKey(symbol);
+	}
+	
+	public CoinTrader getTrader(String symbol) {
+		return traderMap.get(symbol);
+	}
+
+	public ConcurrentHashMap<String, CoinTrader> getTraderMap() {
+		return traderMap;
+	}
+
+}
diff --git a/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java b/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java
new file mode 100644
index 0000000..8da593e
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java
@@ -0,0 +1,5 @@
+package com.xcong.excoin.trade;
+
+public enum ExchangeOrderDirection {
+    BUY,SELL;
+}
diff --git a/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java b/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
new file mode 100644
index 0000000..ea56d87
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
@@ -0,0 +1,27 @@
+package com.xcong.excoin.trade;
+
+import com.alibaba.fastjson.JSON;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+
+/**
+ * 撮合交易信息
+ */
+@Data
+public class ExchangeTrade implements Serializable{
+    private String symbol;
+    private BigDecimal price;
+    private BigDecimal amount;
+    private BigDecimal buyTurnover;
+    private BigDecimal sellTurnover;
+    private int direction;
+    private Long buyOrderId;
+    private Long sellOrderId;
+    private Long time;
+    @Override
+    public String toString() {
+        return  JSON.toJSONString(this);
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/MergeOrder.java b/src/main/java/com/xcong/excoin/trade/MergeOrder.java
new file mode 100644
index 0000000..76f1909
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/MergeOrder.java
@@ -0,0 +1,42 @@
+package com.xcong.excoin.trade;
+
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MergeOrder {
+    private List<OrderCoinsEntity> orders = new ArrayList<>();
+
+    //最后位置添加一个
+    public void add(OrderCoinsEntity order){
+        orders.add(order);
+    }
+
+
+    public OrderCoinsEntity get(){
+        return orders.get(0);
+    }
+
+    public int size(){
+        return orders.size();
+    }
+
+    public BigDecimal getPrice(){
+        return orders.get(0).getEntrustPrice();
+    }
+
+    public Iterator<OrderCoinsEntity> iterator(){
+        return orders.iterator();
+    }
+    
+    public BigDecimal getTotalAmount() {
+    	BigDecimal total = new BigDecimal(0);
+    	for(OrderCoinsEntity item : orders) {
+    		total = total.add(item.getEntrustCnt());
+    	}
+    	return total;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/TradePlate.java b/src/main/java/com/xcong/excoin/trade/TradePlate.java
new file mode 100644
index 0000000..fe24e0b
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/TradePlate.java
@@ -0,0 +1,181 @@
+package com.xcong.excoin.trade;
+
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.math.BigDecimal;
+import java.util.LinkedList;
+
+/**
+ * 盘口信息
+ */
+@Data
+@Slf4j
+public class TradePlate {
+    private LinkedList<TradePlateItem> items;
+    //最大深度
+    private int maxDepth = 100;
+    //方向 订单类型 1、买入2、卖出
+    private int  direction;
+    private String symbol;
+    public TradePlate(){
+
+    }
+
+    public TradePlate(String symbol,int direction) {
+        this.direction = direction;
+        this.symbol = symbol;
+        items = new LinkedList<>();
+    }
+
+    public boolean add(OrderCoinsEntity exchangeOrder) {
+        //log.info("add TradePlate order={}",exchangeOrder);
+        synchronized (items) {
+            int index = 0;
+            if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
+                return false;
+            }
+            if (exchangeOrder.getOrderType() != direction) {
+                return false;
+            }
+            if (items.size() > 0) {
+                for (index = 0; index < items.size(); index++) {
+                    TradePlateItem item = items.get(index);
+                    if ((exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_BUY && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) > 0)
+                            || (exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_SELL && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) < 0)) {
+                        continue;
+                    } else if (item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) == 0) {
+                        BigDecimal deltaAmount = exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt());
+                        item.setAmount(item.getAmount().add(deltaAmount));
+                        return true;
+                    } else {
+                        break;
+                    }
+                }
+            }
+            if(index < maxDepth) {
+                TradePlateItem newItem = new TradePlateItem();
+                newItem.setAmount(exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt()));
+                newItem.setPrice(exchangeOrder.getEntrustPrice());
+                items.add(index, newItem);
+            }
+        }
+        return true;
+    }
+
+    public void remove(OrderCoinsEntity order,BigDecimal amount) {
+        synchronized (items) {
+            //log.info("items>>init_size={},orderPrice={}",items.size(),order.getPrice());
+            for (int index = 0; index < items.size(); index++) {
+                TradePlateItem item = items.get(index);
+                if (item.getPrice().compareTo(order.getEntrustPrice()) == 0) {
+                    item.setAmount(item.getAmount().subtract(amount));
+                    if (item.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
+                        items.remove(index);
+                    }
+                    //log.info("items>>final_size={},itemAmount={},itemPrice={}",items.size(),item.getAmount(),item.getPrice());
+                    return;
+                }
+            }
+            log.info("items>>return_size={}",items.size());
+        }
+    }
+
+    public void remove(OrderCoinsEntity order){
+        remove(order,order.getEntrustCnt().subtract(order.getDealCnt()));
+    }
+
+    public void setItems(LinkedList<TradePlateItem> items){
+        this.items = items;
+    }
+
+    public BigDecimal getHighestPrice(){
+        if(items.size() == 0) {
+            return BigDecimal.ZERO;
+        }
+        if(direction == OrderCoinsEntity.ORDERTYPE_BUY){
+            return items.getFirst().getPrice();
+        }
+        else{
+            return items.getLast().getPrice();
+        }
+    }
+
+    public int getDepth(){
+        return items.size();
+    }
+
+
+    public BigDecimal getLowestPrice(){
+        if(items.size() == 0) {
+            return BigDecimal.ZERO;
+        }
+        if(direction == OrderCoinsEntity.ORDERTYPE_BUY){
+            return items.getLast().getPrice();
+        }
+        else{
+            return items.getFirst().getPrice();
+        }
+    }
+
+    /**
+     * 获取委托量最大的档位
+     * @return
+     */
+    public BigDecimal getMaxAmount(){
+        if(items.size() == 0) {
+            return BigDecimal.ZERO;
+        }
+        BigDecimal amount = BigDecimal.ZERO;
+        for(TradePlateItem item:items){
+            if(item.getAmount().compareTo(amount)>0){
+                amount = item.getAmount();
+            }
+        }
+        return amount;
+    }
+
+    /**
+     * 获取委托量最小的档位
+     * @return
+     */
+    public BigDecimal getMinAmount(){
+        if(items.size() == 0) {
+            return BigDecimal.ZERO;
+        }
+        BigDecimal amount = items.getFirst().getAmount();
+        for(TradePlateItem item:items){
+            if(item.getAmount().compareTo(amount) < 0){
+                amount = item.getAmount();
+            }
+        }
+        return amount;
+    }
+
+    public JSONObject toJSON(){
+        JSONObject json = new JSONObject();
+        json.put("direction",direction);
+        json.put("maxAmount",getMaxAmount());
+        json.put("minAmount",getMinAmount());
+        json.put("highestPrice",getHighestPrice());
+        json.put("lowestPrice",getLowestPrice());
+        json.put("symbol",getSymbol());
+        json.put("items",items);
+        return json;
+    }
+
+    public JSONObject toJSON(int limit){
+        JSONObject json = new JSONObject();
+        json.put("direction",direction);
+        json.put("maxAmount",getMaxAmount());
+        json.put("minAmount",getMinAmount());
+        json.put("highestPrice",getHighestPrice());
+        json.put("lowestPrice",getLowestPrice());
+        json.put("symbol",getSymbol());
+        json.put("items",items.size() > limit ? items.subList(0,limit) : items);
+        return json;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/trade/TradePlateItem.java b/src/main/java/com/xcong/excoin/trade/TradePlateItem.java
new file mode 100644
index 0000000..9a6f83a
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/TradePlateItem.java
@@ -0,0 +1,11 @@
+package com.xcong.excoin.trade;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+public class TradePlateItem {
+    private BigDecimal price;
+    private BigDecimal amount;
+}
diff --git a/src/main/java/com/xcong/excoin/trade/TradePlateModel.java b/src/main/java/com/xcong/excoin/trade/TradePlateModel.java
new file mode 100644
index 0000000..9760925
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/trade/TradePlateModel.java
@@ -0,0 +1,13 @@
+package com.xcong.excoin.trade;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+public class TradePlateModel {
+    private List<List<BigDecimal>> buy  = new ArrayList<>();
+    private List<List<BigDecimal>> sell = new ArrayList<>();
+}
diff --git a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
index fc2dc6c..e9ff270 100644
--- a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
+++ b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -22,6 +22,8 @@
                 return "EOS/USDT";
             case "etcusdt":
                 return "ETC/USDT";
+            case "nekkusdt":
+                return "NEKK/USDT";
             default:
                 return null;
         }
@@ -43,6 +45,8 @@
                 return "EOS_NEW_PRICE";
             case "ETC/USDT":
                 return "ETC_NEW_PRICE";
+            case "NEKK/USDT":
+                return "NEKK_NEW_PRICE";
             default:
                 return null;
         }
diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
new file mode 100644
index 0000000..b955f11
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -0,0 +1,220 @@
+package com.xcong.excoin.websocket;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.common.contants.AppContants;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import com.xcong.excoin.utils.RedisUtils;
+import com.xcong.excoin.utils.SpringContextHolder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+@ServerEndpoint(value = "/trade/market")
+@Component
+public class TradePlateSendWebSocket {
+    @Resource
+    RedisUtils redisUtils;
+
+    /**
+     * 记录当前在线连接数
+     */
+    private static AtomicInteger onlineCount = new AtomicInteger(0);
+
+
+    private static Map<String, Map<String, Session>> tradeplateClients = new ConcurrentHashMap<>();
+
+    private static Map<String, Map<String, Session>> klineClients = new ConcurrentHashMap<>();
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session) {
+        onlineCount.incrementAndGet(); // 在线数加1
+        log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
+    }
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose(Session session) {
+        onlineCount.decrementAndGet(); // 在线数减1
+//        Collection<Map<String, Session>> values = tradeplateClients.values();
+//        if(CollectionUtils.isNotEmpty(values)){
+//            for(Map<String,Session> map : values){
+//                map.remove(session.getId());
+//            }
+//        }
+        log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message 客户端发送过来的消息
+     */
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
+        // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol
+        //}
+        JSONObject jsonObject = JSON.parseObject(message);
+        // 盘口的判断
+        if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
+            String sub = jsonObject.get("sub").toString();
+            String symbol = sub.split("\\.")[1];
+            symbol = CoinTypeConvert.convert(symbol);
+            if (tradeplateClients.containsKey(symbol)) {
+                tradeplateClients.get(symbol).put(session.getId(), session);
+            } else {
+                Map<String, Session> map = new HashMap<>();
+                map.put(session.getId(), session);
+                tradeplateClients.put(symbol, map);
+            }
+        }
+        // 取消盘口订阅
+        if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
+            // `market.${symbol}.kline.${strPeriod}
+            String unsub = jsonObject.get("unsub").toString();
+            String[] split = unsub.split("\\.");
+            String symbol = split[1];
+            symbol = CoinTypeConvert.convert(symbol);
+            String key = symbol;
+            if (tradeplateClients.containsKey(key)) {
+                tradeplateClients.get(key).remove(session.getId());
+            }
+        }
+
+
+        // 最新K线订阅
+
+        // 根据消息判断这个用户需要订阅哪种数据
+        // {sub: `market.${symbol}.kline.${strPeriod}`,
+        //            symbol: symbol,
+        //            period: strPeriod
+        //}
+        // 取消订阅 {unsub: xxx(标识)}
+        if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
+            // 订阅
+            String sub = jsonObject.get("sub").toString();
+            String[] split = sub.split("\\.");
+            String symbol = split[1];
+            symbol = CoinTypeConvert.convert(symbol);
+            String period = split[3];
+            String key = symbol + "-" + period;
+            if (klineClients.containsKey(key)) {
+                // 有这个币种K线
+                Map<String, Session> stringSessionMap = klineClients.get(key);
+                if (!stringSessionMap.containsKey(session.getId())) {
+                    stringSessionMap.put(session.getId(), session);
+                }
+            } else {
+                Map<String, Session> stringSessionMap = new HashMap<>();
+                stringSessionMap.put(session.getId(), session);
+                klineClients.put(key, stringSessionMap);
+            }
+        }
+
+        // 取消订阅
+        if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
+            // `market.${symbol}.kline.${strPeriod}
+            String unsub = jsonObject.get("unsub").toString();
+            String[] split = unsub.split("\\.");
+            String strPeriod = split[3];
+            String symbol = split[1];
+            symbol = CoinTypeConvert.convert(symbol);
+            String key = symbol + "-" + strPeriod;
+            if (klineClients.containsKey(key)) {
+                klineClients.get(key).remove(session.getId());
+            }
+        }
+
+        // 历史K线订阅
+        // {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"}
+        if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) {
+            String sub = jsonObject.get("req").toString();
+            String[] split = sub.split("\\.");
+            String symbol = split[1];
+            symbol = CoinTypeConvert.convert(symbol);
+            String period = split[3];
+            //String key = symbol+"-"+period;
+            // String key = "KINE_BCH/USDT_1week";
+            String key = "KINE_{}_{}";
+            // 币币k线数据
+            key = StrUtil.format(key, symbol, period);
+            RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
+            Object o = bean.get(key);
+            sendMessageHistory(JSON.toJSONString(o), session);
+        }
+    }
+
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.error("发生错误");
+        error.printStackTrace();
+    }
+
+    /**
+     * 群发消息
+     *
+     * @param message 消息内容
+     */
+    public void sendMessagePlate(String message, Session fromSession) {
+        if (tradeplateClients.containsKey("nekkusdt")) {
+            Map<String, Session> nekk = tradeplateClients.get("nekkusdt");
+            for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
+                Session toSession = sessionEntry.getValue();
+                // 排除掉自己
+                //if (!fromSession.getId().equals(toSession.getId())) {
+                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+                boolean open = toSession.isOpen();
+                if (open) {
+                    toSession.getAsyncRemote().sendText(message);
+                }
+
+                //  }
+            }
+        }
+
+    }
+
+    public void sendMessageHistory(String message, Session toSession) {
+        log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+        boolean open = toSession.isOpen();
+        if (open) {
+            toSession.getAsyncRemote().sendText(message);
+        }
+    }
+
+    public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
+        String key = symbol + "-" + period;
+        if (klineClients.containsKey(key)) {
+            Map<String, Session> stringSessionMap = klineClients.get(key);
+            for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
+                Session toSession = sessionEntry.getValue();
+                // 排除掉自己
+                //if (!fromSession.getId().equals(toSession.getId())) {
+                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+                boolean open = toSession.isOpen();
+                if (open) {
+                    toSession.getAsyncRemote().sendText(message);
+                }
+                //  }
+            }
+        }
+    }
+}
diff --git a/src/main/resources/mapper/member/MemberWalletCoinDao.xml b/src/main/resources/mapper/member/MemberWalletCoinDao.xml
index 79979c5..e7fd894 100644
--- a/src/main/resources/mapper/member/MemberWalletCoinDao.xml
+++ b/src/main/resources/mapper/member/MemberWalletCoinDao.xml
@@ -39,5 +39,21 @@
 		where id=#{id}
 	</update>
 
+	<update id="updateWalletBalance" parameterType="map">
+		update member_wallet_coin
+		<set>
+			<if test="availableBalance != null">
+				available_balance = IFNULL(available_balance, 0) + #{availableBalance},
+			</if>
+			<if test="totalBalance != null">
+				total_balance = IFNULL(total_balance, 0) + #{totalBalance},
+			</if>
+			<if test="frozenBalance != null">
+				frozen_balance = IFNULL(frozen_balance, 0) + #{frozenBalance},
+			</if>
+		</set>
+		where id=#{id}
+	</update>
+
 
 </mapper>
\ No newline at end of file
diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
index 03a89ae..303667f 100644
--- a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
+++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
@@ -1,38 +1,55 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
-<mapper namespace="com.xcong.excoin.modules.coin.dao.OrderCoinDealDao">	
-	
-	<select id="selectAllWalletCoinOrder"  resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
-		 select * from coins_order_deal 
-		 <where>
-	 		<if test="memberId != null  and  memberId  != ''">
-	 			 and member_id = #{memberId}
-	 		</if>
-		 </where>
-		 order by create_time desc
+<mapper namespace="com.xcong.excoin.modules.coin.dao.OrderCoinDealDao">
+
+    <select id="selectAllWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
+        select * from coins_order_deal
+        <where>
+            <if test="memberId != null  and  memberId  != ''">
+                and member_id = #{memberId}
+            </if>
+        </where>
+        order by create_time desc
+    </select>
+
+    <select id="selectOrderCoinDealByTime" resultType="com.xcong.excoin.trade.ExchangeTrade">
+		SELECT
+            symbol symbol,
+            deal_price price,
+            symbol_cnt amount,
+            deal_amount buyTurnover,
+            deal_amount sellTurnover,
+            order_type direction,
+            create_time time
+		  from coins_order_deal
+		  where symbol = #{symbol}
+		  and order_type  = 1
+		  and order_status = 3
+		  and create_time between #{startTime} and #{endTime}
 	</select>
-	<select id="selectAllWalletCoinOrderBySymbol"  resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
-		 select * from coins_order_deal 
-		 <where>
-	 		<if test="memberId != null  and  memberId  != ''">
-	 			 and member_id = #{memberId}
-	 		</if>
-	 		<if test="symbol != null  and  symbol  != ''">
-	 			 and symbol = #{symbol}
-	 		</if>
-		 </where>
-		 order by create_time desc
-	</select>
-	
-	<select id="selectWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
+    <select id="selectAllWalletCoinOrderBySymbol"
+            resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
+        select * from coins_order_deal
+        <where>
+            <if test="memberId != null  and  memberId  != ''">
+                and member_id = #{memberId}
+            </if>
+            <if test="symbol != null  and  symbol  != ''">
+                and symbol = #{symbol}
+            </if>
+        </where>
+        order by create_time desc
+    </select>
+
+    <select id="selectWalletCoinOrder" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
 		select * from coins_order_deal where order_id= #{orderId} and member_id = #{memberId}
 	</select>
-	
-	<select id="findAllWalletCoinOrderInPage" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
+
+    <select id="findAllWalletCoinOrderInPage" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
         select * from coins_order_deal
         <if test="record != null">
             <where>
-                <if test="record.memberId != null" >
+                <if test="record.memberId != null">
                     and member_id=#{record.memberId}
                 </if>
                 <if test="record.symbol != null and record.symbol != ''">
@@ -42,5 +59,5 @@
         </if>
         order by create_time desc
     </select>
-	
+
 </mapper>
diff --git a/src/test/java/com/xcong/excoin/GuijiTest.java b/src/test/java/com/xcong/excoin/GuijiTest.java
index 6b99989..8babbbe 100644
--- a/src/test/java/com/xcong/excoin/GuijiTest.java
+++ b/src/test/java/com/xcong/excoin/GuijiTest.java
@@ -116,8 +116,8 @@
 			e.printStackTrace();
 		}
     }
-    
-    
+
+
     
 }
 

--
Gitblit v1.9.1