zainali5120
2020-10-10 f8a0008705fd8067959151ce83c5dce19e72fb85
golden交易所分布式支持
3 files added
11 files modified
274 ■■■■ changed files
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java 23 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java 12 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java 3 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java 2 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java 26 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java 34 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java 4 ●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java 42 ●●●●● patch | view | raw | blame | history
src/main/resources/application-prod.yml 5 ●●●●● patch | view | raw | blame | history
src/main/resources/application-prodapp.yml 114 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -103,6 +103,12 @@
    public static final String ROUTING_KEY_ROC = "roc-transfer-routingKey";
    public static final String EXCHANGE_ROC_ORDER_SUBMIT = "roc-exchange-order-submit";
    public static final String QUEUE_ROC_ORDER_SUBMIT = "roc-order-queue-submit";
    public static final String ROUTING_KEY_ROC_ORDER_SUBMIT  = "roc-order-routingKey-submit";
    @Resource
    private ConnectionFactory connectionFactory;
@@ -138,6 +144,23 @@
        return BindingBuilder.bind(testQueue()).to(defaultExchange()).with(ROUTING_KEY_TEST);
    }
    // 交易订单
    @Bean
    public DirectExchange orderSubmitExchange() {
        return new DirectExchange(EXCHANGE_ROC_ORDER_SUBMIT);
    }
    @Bean
    public Queue ordereSubmitQueue() {
        return new Queue(QUEUE_ROC_ORDER_SUBMIT, true);
    }
    @Bean
    public Binding bindingSubmitOrder() {
        return BindingBuilder.bind(ordereSubmitQueue()).to(orderSubmitExchange()).with(ROUTING_KEY_ROC_ORDER_SUBMIT);
    }
   @Bean
    public DirectExchange usdtUpdateExchange() {
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -17,6 +17,7 @@
import com.xcong.excoin.modules.platform.entity.PlatformSymbolsCoinEntity;
import com.xcong.excoin.modules.symbols.constants.SymbolsConstats;
import com.xcong.excoin.rabbit.producer.OrderSubmitProducer;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.trade.ExchangeTrade;
@@ -91,6 +92,9 @@
    @Resource
    private MemberDao memberDao;
    @Resource
    private OrderSubmitProducer orderSubmitProducer;
    @Override
@@ -465,9 +469,11 @@
//            memberWalletCoinDao.updateById(walletCoin);
            memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),amount.negate(),amount.negate(),amount);
        }
        // 加入到撮合
        CoinTrader trader = factory.getTrader(symbol);
        trader.trade(order);
        // 加入到撮合 TODO  通过消息队列发送到交易撮合
        //CoinTrader trader = factory.getTrader(symbol);
        //trader.trade(order);
        order.setSymbol(symbol);
        orderSubmitProducer.sendMsg(JSONObject.toJSONString(order));
        return Result.ok(MessageSourceUtils.getString("order_service_0011"));
    }
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -44,7 +44,7 @@
 **/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "trade", havingValue = "true")
@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
public class CoinTradeInitJob {
    @Resource
@@ -101,6 +101,7 @@
        // 创建K线生成器
        CoinProcessor processor = new DefaultCoinProcessor(symbol, "USDT");
        processor.setMarketService(marketService);
        processor.setRedisUtils(redisUtils);
        //processor.setExchangeRate(exchangeRate);
        processor.initializeThumb();
        //processor.initializeUsdRate();
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -4,6 +4,7 @@
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.processor.CoinProcessorFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -19,6 +20,7 @@
 */
@Component
@Slf4j
@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
public class KLineGeneratorJob {
    @Resource
    private CoinProcessorFactory processorFactory;
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -15,6 +15,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -24,11 +25,11 @@
import java.util.Set;
/**
 * @author wzy
 * @date 2020-05-25
 **/
 *  websocket 只能后台撮合交易那台开启
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
public class ExchangeConsumer {
    @Resource
@@ -115,23 +116,4 @@
        orderCoinService.handleOrder(exchangeTrades);
    }
    /**
     *  更新最新K线
     * @param content
     */
//    @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
//    public void newKling(String content) {
//        log.info("#---->{}#", content);
//        // 最新K线的币种
//        String key = "NEW_KINE_{}";
//        key = StrUtil.format(key, content);
//        Object o = redisUtils.get(key);
//        Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o;
//        // 推送最新K线
//        Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
//        for(Map.Entry<String, Candlestick> map : entries){
//            tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
//        }
//
//    }
}
src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java
@@ -17,6 +17,7 @@
 * @author helius
 */
@Component
@Deprecated
@ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true")
public class OperateOrderPriceConsumer {
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java
New file
@@ -0,0 +1,34 @@
package com.xcong.excoin.rabbit.consumer;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 *  提交买卖单进入撮合系统
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
public class OrderSubmitConsumer {
    @Resource
    private CoinTraderFactory factory;
    @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_SUBMIT)
    public void doSomething(String content) {
        log.info("#提交的订单---->{}#", content);
        OrderCoinsEntity coinsEntity = JSONObject.parseObject(content, OrderCoinsEntity.class);
        String symbol = coinsEntity.getSymbol();
        CoinTrader trader = factory.getTrader(symbol);
        trader.trade(coinsEntity);
    }
}
src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java
@@ -21,12 +21,10 @@
/**
 * 用户修改止损止盈价格、提价限价委托、下单爆仓价等消息
 * 后台打包开启 APP 不开启
 * @author helius
 * ROC币种同步
 */
@Component
//@ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true")
@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true")
public class RocBlockUpdateConsumer {
    @Resource
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -6,6 +6,7 @@
import com.xcong.excoin.modules.coin.service.BlockCoinService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -16,6 +17,7 @@
 **/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true")
public class UsdtUpdateConsumer {
@@ -25,7 +27,7 @@
    @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE)
    public void doSomething(String content) {
        log.info("#---->{}#", content);
        log.info("#USDT同步---->{}#", content);
        EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class);
        // 更新这个用户的余额
        blockCoinService.updateEthUsdtNew(ethUsdtChargeDto);
src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java
@@ -24,6 +24,7 @@
 */
@Slf4j
@Component
@Deprecated
@ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true")
public class WebsocketPriceConsumer {
src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java
@@ -28,6 +28,7 @@
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true")
@Deprecated
public class OrderProducerInit {
    @Resource
src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java
New file
@@ -0,0 +1,42 @@
package com.xcong.excoin.rabbit.producer;
import cn.hutool.core.util.IdUtil;
import com.xcong.excoin.configurations.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author wzy
 * @date 2020-05-25
 **/
@Slf4j
@Component
public class OrderSubmitProducer implements RabbitTemplate.ConfirmCallback {
    private RabbitTemplate rabbitTemplate;
    @Autowired
    public OrderSubmitProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }
    public void sendMsg(String content) {
        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ROC_ORDER_SUBMIT, RabbitMqConfig.ROUTING_KEY_ROC_ORDER_SUBMIT, content, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("#----->{}#", correlationData);
        if (ack) {
            log.info("success");
        } else {
            log.info("--->{}", cause);
        }
    }
}
src/main/resources/application-prod.yml
@@ -95,11 +95,10 @@
  redis_expire: 3000
  kline-update-job: false
  newest-price-update-job: true
  #日线 该任务不能与最新价处于同一个服务器
  trade: true
  exchange-trade: true
  day-line: false
  other-job: true
  loop-job: true
  loop-job: false
  rabbit-consumer: false
  block-job: true
src/main/resources/application-prodapp.yml
New file
@@ -0,0 +1,114 @@
server:
  port: 8888
  servlet:
    context-path: /
spring:
  profiles:
    active: dev
  datasource:
    url: jdbc:mysql://rm-bp1i2g5rg5dubo9s40o.mysql.rds.aliyuncs.com:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: roc_user
    password: roc123pasd!@
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      initial-size: ${spring_datasource_druid_initial_size:10}
      max-active: ${spring_datasource_druid_max_active:20}
      min-idle: ${spring_datasource_druid_min_idle:3}
      #配置获取连接等待超时的时间
      max-wait: 60000
      pool-prepared-statements: true
      max-pool-prepared-statement-per-connection-size: 20
      validation-query: SELECT 'x'
      test-on-borrow: true
      test-on-return: true
      test-while-idle: true
      #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
      time-between-eviction-runs-millis: 60000
      #配置一个连接在池中最小生存的时间,单位是毫秒
      min-evictable-idle-time-millis: 300000
      #spring.datasource.druid.max-evguide.ftlictable-idle-time-millis=
      filters: stat,wall
      stat-view-servlet:
        # 默认true 内置监控页面首页/druid/index.html
        enabled: true
        url-pattern: /druid/*
        # 允许清空统计数据
        reset-enable: true
        login-username: root
        login-password: 123456
        # IP白名单 多个逗号分隔
        allow: ${spring_datasource_stat_view_servlet_allow:}
        # IP黑名单
        deny: ${spring_datasource_stat_view_servlet_deny:}
  ## 国际化配置
  messages:
    basename: i18n/messages
  ## redis配置
  redis:
    ## Redis数据库索引(默认为0)
    database: 1
    ## Redis服务器地址
    host: 47.114.114.219
    ## Redis服务器连接端口
    port: 6379
    ## Redis服务器连接密码(默认为空)
    password: biyi123
    jedis:
      pool:
        ## 连接池最大连接数(使用负值表示没有限制)
        #spring.redis.pool.max-active=8
        max-active: 300
        ## 连接池最大阻塞等待时间(使用负值表示没有限制)
        #spring.redis.pool.max-wait=-1
        max-wait: -1
        ## 连接池中的最大空闲连接
        #spring.redis.pool.max-idle=8
        max-idle: 100
        ## 连接池中的最小空闲连接
        #spring.redis.pool.min-idle=0
        min-idle: 8
    ## 连接超时时间(毫秒)
    timeout: 30000
  rabbitmq:
    host: 47.114.114.219
    port: 5672
    username: roc_user
    password: roc123456
    publisher-confirm-type: correlated
#custom:
#  rabbitmq:
#    host: 120.27.238.55
#    port: 5672
#    username: ct_rabbit
#    password: 123456
mybatis-plus:
  mapper-locations: classpath:mapper/**/*.xml
app:
  debug: false
  redis_expire: 3000
  kline-update-job: false
  newest-price-update-job: false
  exchange-trade: true
  day-line: false
  other-job: false
  loop-job: false
  rabbit-consumer: false
  block-job: false
aliyun:
  oss:
    end-point: https://oss-cn-hangzhou.aliyuncs.com
    bucket-name: https://excoin.oss-cn-hangzhou.aliyuncs.com
    access-key-id: LTAI4GBuydqbJ5bTsDP97Lpd
    access-key-secret: vbCjQtPxABWjqtUlQfzjlA0qAY96fh
rsa:
  public_key: MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCCf8UFZK54AiK4PRu7tNd+Z8qZ77o/QXCnk25DRmygVpOEu5mGNSAvfnWmKp2pEV2RljeXq3Rid/+LQkonaebMJeXKSF0yxL/VgyeT8JaQ5gNbOrdfdlc+mFkXJyzyJt8YkvApEdPRNSU2ENBn7mgRfD0BYPM4vZ6/rv+de38FJwIDAQAB
  private_key: MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAIJ/xQVkrngCIrg9G7u0135nypnvuj9BcKeTbkNGbKBWk4S7mYY1IC9+daYqnakRXZGWN5erdGJ3/4tCSidp5swl5cpIXTLEv9WDJ5PwlpDmA1s6t192Vz6YWRcnLPIm3xiS8CkR09E1JTYQ0GfuaBF8PQFg8zi9nr+u/517fwUnAgMBAAECgYBhPt9NvpI4wbanvnndLczr2GJkxfzvSE+vwLCJF4C5FusFHVsxZINggQcg1V75bwRgCiXRMyYefreCSdrCditS43PzTOmE4RRrpxLlm8oubJc0C98LQ2qlN9AsUqL5IHpVGgbHDyWAwjc1GBID6nwXKpxq1/VodFqhahG9D5EZsQJBALnkb+5VTxQbiyQI4Uc9NIvAyVcNY1OisbvY6tvNgdBbJkADgAb78M1HWxxYjUqsvzggNHc7cWqWBHMgpnJaqm8CQQCztze4D7uAk7OC9MJHY5eE980J8Kk+GEZKxz4LahzU6V6dcb9GFac3wEtgilj/tOAn9y0/Q8sm9vvCIbMDzgzJAkEAqRYcqhF26LdVDOX25DHMBgLKISDQZFbsjA13M4/usHL4i+mjHrc0BcUOHu59NpuDI65HitzLAUSLr5zXSdUmiQJAW77wOg4GCejdXsB3IhzMsHwU97sdm26nC+vVV9xvJZ6Rx8zW+f9543NOx9U5BCmhuaVtOvvwDU9PTVcI3atmSQJAXAIJ5gGdtXx0DXiX4VvzNFHqgaqHMGvXyjNVkU2FYQbSAd2A6app4uRO+BkZu9dSjh14m+oXMnV2HzAN2rRnjA==