From f8a0008705fd8067959151ce83c5dce19e72fb85 Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Sat, 10 Oct 2020 17:42:49 +0800 Subject: [PATCH] golden交易所分布式支持 --- src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 12 + src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java | 42 ++++++++ src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 23 ++++ src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java | 4 src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java | 1 src/main/resources/application-prod.yml | 5 src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java | 6 src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java | 1 src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java | 1 src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java | 3 src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java | 2 src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 26 ---- src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java | 34 ++++++ src/main/resources/application-prodapp.yml | 114 ++++++++++++++++++++++ 14 files changed, 240 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java index 501d615..80f17cb 100644 --- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java @@ -103,6 +103,12 @@ public static final String ROUTING_KEY_ROC = "roc-transfer-routingKey"; + public static final String EXCHANGE_ROC_ORDER_SUBMIT = "roc-exchange-order-submit"; + + public static final String QUEUE_ROC_ORDER_SUBMIT = "roc-order-queue-submit"; + + public static final String ROUTING_KEY_ROC_ORDER_SUBMIT = "roc-order-routingKey-submit"; + @Resource private ConnectionFactory connectionFactory; @@ -138,6 +144,23 @@ return BindingBuilder.bind(testQueue()).to(defaultExchange()).with(ROUTING_KEY_TEST); } + // 交易订单 + @Bean + public DirectExchange orderSubmitExchange() { + return new DirectExchange(EXCHANGE_ROC_ORDER_SUBMIT); + } + + + @Bean + public Queue ordereSubmitQueue() { + return new Queue(QUEUE_ROC_ORDER_SUBMIT, true); + } + + @Bean + public Binding bindingSubmitOrder() { + return BindingBuilder.bind(ordereSubmitQueue()).to(orderSubmitExchange()).with(ROUTING_KEY_ROC_ORDER_SUBMIT); + } + @Bean public DirectExchange usdtUpdateExchange() { diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java index 2b836cf..a8e174e 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java @@ -17,6 +17,7 @@ import com.xcong.excoin.modules.platform.entity.PlatformSymbolsCoinEntity; import com.xcong.excoin.modules.symbols.constants.SymbolsConstats; +import com.xcong.excoin.rabbit.producer.OrderSubmitProducer; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.trade.ExchangeTrade; @@ -91,6 +92,9 @@ @Resource private MemberDao memberDao; + + @Resource + private OrderSubmitProducer orderSubmitProducer; @Override @@ -465,9 +469,11 @@ // memberWalletCoinDao.updateById(walletCoin); memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),amount.negate(),amount.negate(),amount); } - // 加入到撮合 - CoinTrader trader = factory.getTrader(symbol); - trader.trade(order); + // 加入到撮合 TODO 通过消息队列发送到交易撮合 + //CoinTrader trader = factory.getTrader(symbol); + //trader.trade(order); + order.setSymbol(symbol); + orderSubmitProducer.sendMsg(JSONObject.toJSONString(order)); return Result.ok(MessageSourceUtils.getString("order_service_0011")); } diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java index 6f114b0..2b8c6cb 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java @@ -44,7 +44,7 @@ **/ @Slf4j @Component -@ConditionalOnProperty(prefix = "app", name = "trade", havingValue = "true") +@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true") public class CoinTradeInitJob { @Resource @@ -101,6 +101,7 @@ // 创建K线生成器 CoinProcessor processor = new DefaultCoinProcessor(symbol, "USDT"); processor.setMarketService(marketService); + processor.setRedisUtils(redisUtils); //processor.setExchangeRate(exchangeRate); processor.initializeThumb(); //processor.initializeUsdRate(); diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java index 6099153..b6194ee 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java @@ -4,6 +4,7 @@ import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.processor.CoinProcessorFactory; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -19,6 +20,7 @@ */ @Component @Slf4j +@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true") public class KLineGeneratorJob { @Resource private CoinProcessorFactory processorFactory; diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java index a124d69..2ed81da 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java @@ -15,6 +15,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -24,11 +25,11 @@ import java.util.Set; /** - * @author wzy - * @date 2020-05-25 - **/ + * websocket 只能后台撮合交易那台开启 + */ @Slf4j @Component +@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true") public class ExchangeConsumer { @Resource @@ -115,23 +116,4 @@ orderCoinService.handleOrder(exchangeTrades); } - /** - * 更新最新K线 - * @param content - */ -// @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) -// public void newKling(String content) { -// log.info("#---->{}#", content); -// // 最新K线的币种 -// String key = "NEW_KINE_{}"; -// key = StrUtil.format(key, content); -// Object o = redisUtils.get(key); -// Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o; -// // 推送最新K线 -// Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); -// for(Map.Entry<String, Candlestick> map : entries){ -// tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null); -// } -// -// } } diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java index bfb2d92..6e98b44 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java @@ -17,6 +17,7 @@ * @author helius */ @Component +@Deprecated @ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true") public class OperateOrderPriceConsumer { diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java new file mode 100644 index 0000000..1f7b33b --- /dev/null +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java @@ -0,0 +1,34 @@ +package com.xcong.excoin.rabbit.consumer; + +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; +import com.xcong.excoin.trade.CoinTrader; +import com.xcong.excoin.trade.CoinTraderFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 提交买卖单进入撮合系统 + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true") +public class OrderSubmitConsumer { + + @Resource + private CoinTraderFactory factory; + + @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_SUBMIT) + public void doSomething(String content) { + log.info("#提交的订单---->{}#", content); + OrderCoinsEntity coinsEntity = JSONObject.parseObject(content, OrderCoinsEntity.class); + String symbol = coinsEntity.getSymbol(); + CoinTrader trader = factory.getTrader(symbol); + trader.trade(coinsEntity); + } +} diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java index bbf2717..e2ee354 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java @@ -21,12 +21,10 @@ /** - * 用户修改止损止盈价格、提价限价委托、下单爆仓价等消息 - * 后台打包开启 APP 不开启 - * @author helius + * ROC币种同步 */ @Component -//@ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true") +@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true") public class RocBlockUpdateConsumer { @Resource diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java index 2e375be..c9560ed 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java @@ -6,6 +6,7 @@ import com.xcong.excoin.modules.coin.service.BlockCoinService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -16,6 +17,7 @@ **/ @Slf4j @Component +@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true") public class UsdtUpdateConsumer { @@ -25,7 +27,7 @@ @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE) public void doSomething(String content) { - log.info("#---->{}#", content); + log.info("#USDT同步---->{}#", content); EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class); // 更新这个用户的余额 blockCoinService.updateEthUsdtNew(ethUsdtChargeDto); diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java index b959b0e..ffbc2be 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java @@ -24,6 +24,7 @@ */ @Slf4j @Component +@Deprecated @ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true") public class WebsocketPriceConsumer { diff --git a/src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java b/src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java index 9fb3756..662cd9a 100644 --- a/src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java +++ b/src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java @@ -28,6 +28,7 @@ @Slf4j @Component @ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true") +@Deprecated public class OrderProducerInit { @Resource diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java new file mode 100644 index 0000000..060711c --- /dev/null +++ b/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java @@ -0,0 +1,42 @@ +package com.xcong.excoin.rabbit.producer; + +import cn.hutool.core.util.IdUtil; +import com.xcong.excoin.configurations.RabbitMqConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @date 2020-05-25 + **/ +@Slf4j +@Component +public class OrderSubmitProducer implements RabbitTemplate.ConfirmCallback { + + private RabbitTemplate rabbitTemplate; + + @Autowired + public OrderSubmitProducer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + rabbitTemplate.setConfirmCallback(this); + } + + public void sendMsg(String content) { + CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ROC_ORDER_SUBMIT, RabbitMqConfig.ROUTING_KEY_ROC_ORDER_SUBMIT, content, correlationData); + } + + + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + log.info("#----->{}#", correlationData); + if (ack) { + log.info("success"); + } else { + log.info("--->{}", cause); + } + } +} diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 171c41a..5519fb9 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -95,11 +95,10 @@ redis_expire: 3000 kline-update-job: false newest-price-update-job: true - #日线 该任务不能与最新价处于同一个服务器 - trade: true + exchange-trade: true day-line: false other-job: true - loop-job: true + loop-job: false rabbit-consumer: false block-job: true diff --git a/src/main/resources/application-prodapp.yml b/src/main/resources/application-prodapp.yml new file mode 100644 index 0000000..edf1a56 --- /dev/null +++ b/src/main/resources/application-prodapp.yml @@ -0,0 +1,114 @@ +server: + port: 8888 + servlet: + context-path: / + +spring: + profiles: + active: dev + datasource: + url: jdbc:mysql://rm-bp1i2g5rg5dubo9s40o.mysql.rds.aliyuncs.com:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 + username: roc_user + password: roc123pasd!@ + driver-class-name: com.mysql.jdbc.Driver + type: com.alibaba.druid.pool.DruidDataSource + druid: + initial-size: ${spring_datasource_druid_initial_size:10} + max-active: ${spring_datasource_druid_max_active:20} + min-idle: ${spring_datasource_druid_min_idle:3} + #配置获取连接等待超时的时间 + max-wait: 60000 + pool-prepared-statements: true + max-pool-prepared-statement-per-connection-size: 20 + validation-query: SELECT 'x' + test-on-borrow: true + test-on-return: true + test-while-idle: true + #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 + time-between-eviction-runs-millis: 60000 + #配置一个连接在池中最小生存的时间,单位是毫秒 + min-evictable-idle-time-millis: 300000 + #spring.datasource.druid.max-evguide.ftlictable-idle-time-millis= + filters: stat,wall + stat-view-servlet: + # 默认true 内置监控页面首页/druid/index.html + enabled: true + url-pattern: /druid/* + # 允许清空统计数据 + reset-enable: true + login-username: root + login-password: 123456 + # IP白名单 多个逗号分隔 + allow: ${spring_datasource_stat_view_servlet_allow:} + # IP黑名单 + deny: ${spring_datasource_stat_view_servlet_deny:} + ## 国际化配置 + messages: + basename: i18n/messages + ## redis配置 + redis: + ## Redis数据库索引(默认为0) + database: 1 + ## Redis服务器地址 + host: 47.114.114.219 + ## Redis服务器连接端口 + port: 6379 + ## Redis服务器连接密码(默认为空) + password: biyi123 + jedis: + pool: + ## 连接池最大连接数(使用负值表示没有限制) + #spring.redis.pool.max-active=8 + max-active: 300 + ## 连接池最大阻塞等待时间(使用负值表示没有限制) + #spring.redis.pool.max-wait=-1 + max-wait: -1 + ## 连接池中的最大空闲连接 + #spring.redis.pool.max-idle=8 + max-idle: 100 + ## 连接池中的最小空闲连接 + #spring.redis.pool.min-idle=0 + min-idle: 8 + ## 连接超时时间(毫秒) + timeout: 30000 + rabbitmq: + host: 47.114.114.219 + port: 5672 + username: roc_user + password: roc123456 + publisher-confirm-type: correlated + + +#custom: +# rabbitmq: +# host: 120.27.238.55 +# port: 5672 +# username: ct_rabbit +# password: 123456 + +mybatis-plus: + mapper-locations: classpath:mapper/**/*.xml + + +app: + debug: false + redis_expire: 3000 + kline-update-job: false + newest-price-update-job: false + exchange-trade: true + day-line: false + other-job: false + loop-job: false + rabbit-consumer: false + block-job: false + +aliyun: + oss: + end-point: https://oss-cn-hangzhou.aliyuncs.com + bucket-name: https://excoin.oss-cn-hangzhou.aliyuncs.com + access-key-id: LTAI4GBuydqbJ5bTsDP97Lpd + access-key-secret: vbCjQtPxABWjqtUlQfzjlA0qAY96fh + +rsa: + public_key: MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCCf8UFZK54AiK4PRu7tNd+Z8qZ77o/QXCnk25DRmygVpOEu5mGNSAvfnWmKp2pEV2RljeXq3Rid/+LQkonaebMJeXKSF0yxL/VgyeT8JaQ5gNbOrdfdlc+mFkXJyzyJt8YkvApEdPRNSU2ENBn7mgRfD0BYPM4vZ6/rv+de38FJwIDAQAB + private_key: MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAIJ/xQVkrngCIrg9G7u0135nypnvuj9BcKeTbkNGbKBWk4S7mYY1IC9+daYqnakRXZGWN5erdGJ3/4tCSidp5swl5cpIXTLEv9WDJ5PwlpDmA1s6t192Vz6YWRcnLPIm3xiS8CkR09E1JTYQ0GfuaBF8PQFg8zi9nr+u/517fwUnAgMBAAECgYBhPt9NvpI4wbanvnndLczr2GJkxfzvSE+vwLCJF4C5FusFHVsxZINggQcg1V75bwRgCiXRMyYefreCSdrCditS43PzTOmE4RRrpxLlm8oubJc0C98LQ2qlN9AsUqL5IHpVGgbHDyWAwjc1GBID6nwXKpxq1/VodFqhahG9D5EZsQJBALnkb+5VTxQbiyQI4Uc9NIvAyVcNY1OisbvY6tvNgdBbJkADgAb78M1HWxxYjUqsvzggNHc7cWqWBHMgpnJaqm8CQQCztze4D7uAk7OC9MJHY5eE980J8Kk+GEZKxz4LahzU6V6dcb9GFac3wEtgilj/tOAn9y0/Q8sm9vvCIbMDzgzJAkEAqRYcqhF26LdVDOX25DHMBgLKISDQZFbsjA13M4/usHL4i+mjHrc0BcUOHu59NpuDI65HitzLAUSLr5zXSdUmiQJAW77wOg4GCejdXsB3IhzMsHwU97sdm26nC+vVV9xvJZ6Rx8zW+f9543NOx9U5BCmhuaVtOvvwDU9PTVcI3atmSQJAXAIJ5gGdtXx0DXiX4VvzNFHqgaqHMGvXyjNVkU2FYQbSAd2A6app4uRO+BkZu9dSjh14m+oXMnV2HzAN2rRnjA== -- Gitblit v1.9.1