From f8a0008705fd8067959151ce83c5dce19e72fb85 Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Sat, 10 Oct 2020 17:42:49 +0800
Subject: [PATCH] golden交易所分布式支持
---
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 12 +
src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java | 42 ++++++++
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 23 ++++
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java | 4
src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java | 1
src/main/resources/application-prod.yml | 5
src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java | 6
src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java | 1
src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java | 1
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java | 3
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java | 2
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 26 ----
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java | 34 ++++++
src/main/resources/application-prodapp.yml | 114 ++++++++++++++++++++++
14 files changed, 240 insertions(+), 34 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
index 501d615..80f17cb 100644
--- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -103,6 +103,12 @@
public static final String ROUTING_KEY_ROC = "roc-transfer-routingKey";
+ public static final String EXCHANGE_ROC_ORDER_SUBMIT = "roc-exchange-order-submit";
+
+ public static final String QUEUE_ROC_ORDER_SUBMIT = "roc-order-queue-submit";
+
+ public static final String ROUTING_KEY_ROC_ORDER_SUBMIT = "roc-order-routingKey-submit";
+
@Resource
private ConnectionFactory connectionFactory;
@@ -138,6 +144,23 @@
return BindingBuilder.bind(testQueue()).to(defaultExchange()).with(ROUTING_KEY_TEST);
}
+ // 交易订单
+ @Bean
+ public DirectExchange orderSubmitExchange() {
+ return new DirectExchange(EXCHANGE_ROC_ORDER_SUBMIT);
+ }
+
+
+ @Bean
+ public Queue ordereSubmitQueue() {
+ return new Queue(QUEUE_ROC_ORDER_SUBMIT, true);
+ }
+
+ @Bean
+ public Binding bindingSubmitOrder() {
+ return BindingBuilder.bind(ordereSubmitQueue()).to(orderSubmitExchange()).with(ROUTING_KEY_ROC_ORDER_SUBMIT);
+ }
+
@Bean
public DirectExchange usdtUpdateExchange() {
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
index 2b836cf..a8e174e 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -17,6 +17,7 @@
import com.xcong.excoin.modules.platform.entity.PlatformSymbolsCoinEntity;
import com.xcong.excoin.modules.symbols.constants.SymbolsConstats;
+import com.xcong.excoin.rabbit.producer.OrderSubmitProducer;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.trade.ExchangeTrade;
@@ -91,6 +92,9 @@
@Resource
private MemberDao memberDao;
+
+ @Resource
+ private OrderSubmitProducer orderSubmitProducer;
@Override
@@ -465,9 +469,11 @@
// memberWalletCoinDao.updateById(walletCoin);
memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),amount.negate(),amount.negate(),amount);
}
- // 加入到撮合
- CoinTrader trader = factory.getTrader(symbol);
- trader.trade(order);
+ // 加入到撮合 TODO 通过消息队列发送到交易撮合
+ //CoinTrader trader = factory.getTrader(symbol);
+ //trader.trade(order);
+ order.setSymbol(symbol);
+ orderSubmitProducer.sendMsg(JSONObject.toJSONString(order));
return Result.ok(MessageSourceUtils.getString("order_service_0011"));
}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
index 6f114b0..2b8c6cb 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -44,7 +44,7 @@
**/
@Slf4j
@Component
-@ConditionalOnProperty(prefix = "app", name = "trade", havingValue = "true")
+@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
public class CoinTradeInitJob {
@Resource
@@ -101,6 +101,7 @@
// 创建K线生成器
CoinProcessor processor = new DefaultCoinProcessor(symbol, "USDT");
processor.setMarketService(marketService);
+ processor.setRedisUtils(redisUtils);
//processor.setExchangeRate(exchangeRate);
processor.initializeThumb();
//processor.initializeUsdRate();
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
index 6099153..b6194ee 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -4,6 +4,7 @@
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.processor.CoinProcessorFactory;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -19,6 +20,7 @@
*/
@Component
@Slf4j
+@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
public class KLineGeneratorJob {
@Resource
private CoinProcessorFactory processorFactory;
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
index a124d69..2ed81da 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -15,6 +15,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -24,11 +25,11 @@
import java.util.Set;
/**
- * @author wzy
- * @date 2020-05-25
- **/
+ * websocket 只能后台撮合交易那台开启
+ */
@Slf4j
@Component
+@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
public class ExchangeConsumer {
@Resource
@@ -115,23 +116,4 @@
orderCoinService.handleOrder(exchangeTrades);
}
- /**
- * 更新最新K线
- * @param content
- */
-// @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
-// public void newKling(String content) {
-// log.info("#---->{}#", content);
-// // 最新K线的币种
-// String key = "NEW_KINE_{}";
-// key = StrUtil.format(key, content);
-// Object o = redisUtils.get(key);
-// Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o;
-// // 推送最新K线
-// Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
-// for(Map.Entry<String, Candlestick> map : entries){
-// tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
-// }
-//
-// }
}
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java
index bfb2d92..6e98b44 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java
@@ -17,6 +17,7 @@
* @author helius
*/
@Component
+@Deprecated
@ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true")
public class OperateOrderPriceConsumer {
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java
new file mode 100644
index 0000000..1f7b33b
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java
@@ -0,0 +1,34 @@
+package com.xcong.excoin.rabbit.consumer;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
+import com.xcong.excoin.trade.CoinTrader;
+import com.xcong.excoin.trade.CoinTraderFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * 提交买卖单进入撮合系统
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
+public class OrderSubmitConsumer {
+
+ @Resource
+ private CoinTraderFactory factory;
+
+ @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_SUBMIT)
+ public void doSomething(String content) {
+ log.info("#提交的订单---->{}#", content);
+ OrderCoinsEntity coinsEntity = JSONObject.parseObject(content, OrderCoinsEntity.class);
+ String symbol = coinsEntity.getSymbol();
+ CoinTrader trader = factory.getTrader(symbol);
+ trader.trade(coinsEntity);
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java
index bbf2717..e2ee354 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/RocBlockUpdateConsumer.java
@@ -21,12 +21,10 @@
/**
- * 用户修改止损止盈价格、提价限价委托、下单爆仓价等消息
- * 后台打包开启 APP 不开启
- * @author helius
+ * ROC币种同步
*/
@Component
-//@ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true")
+@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true")
public class RocBlockUpdateConsumer {
@Resource
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
index 2e375be..c9560ed 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -6,6 +6,7 @@
import com.xcong.excoin.modules.coin.service.BlockCoinService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -16,6 +17,7 @@
**/
@Slf4j
@Component
+@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true")
public class UsdtUpdateConsumer {
@@ -25,7 +27,7 @@
@RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE)
public void doSomething(String content) {
- log.info("#---->{}#", content);
+ log.info("#USDT同步---->{}#", content);
EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class);
// 更新这个用户的余额
blockCoinService.updateEthUsdtNew(ethUsdtChargeDto);
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java
index b959b0e..ffbc2be 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java
@@ -24,6 +24,7 @@
*/
@Slf4j
@Component
+@Deprecated
@ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true")
public class WebsocketPriceConsumer {
diff --git a/src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java b/src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java
index 9fb3756..662cd9a 100644
--- a/src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java
+++ b/src/main/java/com/xcong/excoin/rabbit/init/OrderProducerInit.java
@@ -28,6 +28,7 @@
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "newest-price-update-job-contract", havingValue = "true")
+@Deprecated
public class OrderProducerInit {
@Resource
diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java
new file mode 100644
index 0000000..060711c
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java
@@ -0,0 +1,42 @@
+package com.xcong.excoin.rabbit.producer;
+
+import cn.hutool.core.util.IdUtil;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @date 2020-05-25
+ **/
+@Slf4j
+@Component
+public class OrderSubmitProducer implements RabbitTemplate.ConfirmCallback {
+
+ private RabbitTemplate rabbitTemplate;
+
+ @Autowired
+ public OrderSubmitProducer(RabbitTemplate rabbitTemplate) {
+ this.rabbitTemplate = rabbitTemplate;
+ rabbitTemplate.setConfirmCallback(this);
+ }
+
+ public void sendMsg(String content) {
+ CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
+ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ROC_ORDER_SUBMIT, RabbitMqConfig.ROUTING_KEY_ROC_ORDER_SUBMIT, content, correlationData);
+ }
+
+
+ @Override
+ public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+ log.info("#----->{}#", correlationData);
+ if (ack) {
+ log.info("success");
+ } else {
+ log.info("--->{}", cause);
+ }
+ }
+}
diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml
index 171c41a..5519fb9 100644
--- a/src/main/resources/application-prod.yml
+++ b/src/main/resources/application-prod.yml
@@ -95,11 +95,10 @@
redis_expire: 3000
kline-update-job: false
newest-price-update-job: true
- #日线 该任务不能与最新价处于同一个服务器
- trade: true
+ exchange-trade: true
day-line: false
other-job: true
- loop-job: true
+ loop-job: false
rabbit-consumer: false
block-job: true
diff --git a/src/main/resources/application-prodapp.yml b/src/main/resources/application-prodapp.yml
new file mode 100644
index 0000000..edf1a56
--- /dev/null
+++ b/src/main/resources/application-prodapp.yml
@@ -0,0 +1,114 @@
+server:
+ port: 8888
+ servlet:
+ context-path: /
+
+spring:
+ profiles:
+ active: dev
+ datasource:
+ url: jdbc:mysql://rm-bp1i2g5rg5dubo9s40o.mysql.rds.aliyuncs.com:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
+ username: roc_user
+ password: roc123pasd!@
+ driver-class-name: com.mysql.jdbc.Driver
+ type: com.alibaba.druid.pool.DruidDataSource
+ druid:
+ initial-size: ${spring_datasource_druid_initial_size:10}
+ max-active: ${spring_datasource_druid_max_active:20}
+ min-idle: ${spring_datasource_druid_min_idle:3}
+ #配置获取连接等待超时的时间
+ max-wait: 60000
+ pool-prepared-statements: true
+ max-pool-prepared-statement-per-connection-size: 20
+ validation-query: SELECT 'x'
+ test-on-borrow: true
+ test-on-return: true
+ test-while-idle: true
+ #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+ time-between-eviction-runs-millis: 60000
+ #配置一个连接在池中最小生存的时间,单位是毫秒
+ min-evictable-idle-time-millis: 300000
+ #spring.datasource.druid.max-evguide.ftlictable-idle-time-millis=
+ filters: stat,wall
+ stat-view-servlet:
+ # 默认true 内置监控页面首页/druid/index.html
+ enabled: true
+ url-pattern: /druid/*
+ # 允许清空统计数据
+ reset-enable: true
+ login-username: root
+ login-password: 123456
+ # IP白名单 多个逗号分隔
+ allow: ${spring_datasource_stat_view_servlet_allow:}
+ # IP黑名单
+ deny: ${spring_datasource_stat_view_servlet_deny:}
+ ## 国际化配置
+ messages:
+ basename: i18n/messages
+ ## redis配置
+ redis:
+ ## Redis数据库索引(默认为0)
+ database: 1
+ ## Redis服务器地址
+ host: 47.114.114.219
+ ## Redis服务器连接端口
+ port: 6379
+ ## Redis服务器连接密码(默认为空)
+ password: biyi123
+ jedis:
+ pool:
+ ## 连接池最大连接数(使用负值表示没有限制)
+ #spring.redis.pool.max-active=8
+ max-active: 300
+ ## 连接池最大阻塞等待时间(使用负值表示没有限制)
+ #spring.redis.pool.max-wait=-1
+ max-wait: -1
+ ## 连接池中的最大空闲连接
+ #spring.redis.pool.max-idle=8
+ max-idle: 100
+ ## 连接池中的最小空闲连接
+ #spring.redis.pool.min-idle=0
+ min-idle: 8
+ ## 连接超时时间(毫秒)
+ timeout: 30000
+ rabbitmq:
+ host: 47.114.114.219
+ port: 5672
+ username: roc_user
+ password: roc123456
+ publisher-confirm-type: correlated
+
+
+#custom:
+# rabbitmq:
+# host: 120.27.238.55
+# port: 5672
+# username: ct_rabbit
+# password: 123456
+
+mybatis-plus:
+ mapper-locations: classpath:mapper/**/*.xml
+
+
+app:
+ debug: false
+ redis_expire: 3000
+ kline-update-job: false
+ newest-price-update-job: false
+ exchange-trade: true
+ day-line: false
+ other-job: false
+ loop-job: false
+ rabbit-consumer: false
+ block-job: false
+
+aliyun:
+ oss:
+ end-point: https://oss-cn-hangzhou.aliyuncs.com
+ bucket-name: https://excoin.oss-cn-hangzhou.aliyuncs.com
+ access-key-id: LTAI4GBuydqbJ5bTsDP97Lpd
+ access-key-secret: vbCjQtPxABWjqtUlQfzjlA0qAY96fh
+
+rsa:
+ public_key: MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCCf8UFZK54AiK4PRu7tNd+Z8qZ77o/QXCnk25DRmygVpOEu5mGNSAvfnWmKp2pEV2RljeXq3Rid/+LQkonaebMJeXKSF0yxL/VgyeT8JaQ5gNbOrdfdlc+mFkXJyzyJt8YkvApEdPRNSU2ENBn7mgRfD0BYPM4vZ6/rv+de38FJwIDAQAB
+ private_key: MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAIJ/xQVkrngCIrg9G7u0135nypnvuj9BcKeTbkNGbKBWk4S7mYY1IC9+daYqnakRXZGWN5erdGJ3/4tCSidp5swl5cpIXTLEv9WDJ5PwlpDmA1s6t192Vz6YWRcnLPIm3xiS8CkR09E1JTYQ0GfuaBF8PQFg8zi9nr+u/517fwUnAgMBAAECgYBhPt9NvpI4wbanvnndLczr2GJkxfzvSE+vwLCJF4C5FusFHVsxZINggQcg1V75bwRgCiXRMyYefreCSdrCditS43PzTOmE4RRrpxLlm8oubJc0C98LQ2qlN9AsUqL5IHpVGgbHDyWAwjc1GBID6nwXKpxq1/VodFqhahG9D5EZsQJBALnkb+5VTxQbiyQI4Uc9NIvAyVcNY1OisbvY6tvNgdBbJkADgAb78M1HWxxYjUqsvzggNHc7cWqWBHMgpnJaqm8CQQCztze4D7uAk7OC9MJHY5eE980J8Kk+GEZKxz4LahzU6V6dcb9GFac3wEtgilj/tOAn9y0/Q8sm9vvCIbMDzgzJAkEAqRYcqhF26LdVDOX25DHMBgLKISDQZFbsjA13M4/usHL4i+mjHrc0BcUOHu59NpuDI65HitzLAUSLr5zXSdUmiQJAW77wOg4GCejdXsB3IhzMsHwU97sdm26nC+vVV9xvJZ6Rx8zW+f9543NOx9U5BCmhuaVtOvvwDU9PTVcI3atmSQJAXAIJ5gGdtXx0DXiX4VvzNFHqgaqHMGvXyjNVkU2FYQbSAd2A6app4uRO+BkZu9dSjh14m+oXMnV2HzAN2rRnjA==
--
Gitblit v1.9.1