From d23645e976981bc9b670eea1d469fe8a36be309c Mon Sep 17 00:00:00 2001 From: KKSU <15274802129@163.com> Date: Wed, 17 Apr 2024 17:19:53 +0800 Subject: [PATCH] 55测试环境 --- src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java | 248 +++++++++++++++++++++++++++++-------------------- 1 files changed, 145 insertions(+), 103 deletions(-) diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java index d7f8234..12a9257 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java @@ -3,133 +3,175 @@ import com.alibaba.fastjson.JSONArray; import com.rabbitmq.client.Channel; import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.modules.contract.service.RabbitOrderService; +import com.xcong.excoin.modules.contract.service.impl.OrderWebsocketServiceImpl; import com.xcong.excoin.rabbit.pricequeue.OrderModel; +import com.xcong.excoin.rabbit.pricequeue.whole.WholePriceDataModel; +import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.List; /** - * APP和后台打包都开启 + * APP和后台打包都开启 * + * @author helius */ +@Slf4j @Component -@ConditionalOnProperty(name="useRabbit",havingValue="true") +@ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true") public class WebsocketPriceConsumer { - //@Autowired - //OrderWebsocketService orderWebsocketService; + @Resource + OrderWebsocketServiceImpl orderWebsocketService; - //@Autowired - //OrderService orderService; + @Resource + RabbitOrderService orderService; - /** - * 开多止盈 - * @param message 消息体 - * @param channel 信道 - */ - @RabbitListener(queues = RabbitMqConfig.QUEUE_MOREPRO) - public void onMessageMorePro(Message message, Channel channel) { - String content = new String(message.getBody()); - System.out.println("我收到消息了开多止盈:"+content); - List<OrderModel> list = JSONArray.parseArray(content,OrderModel.class); - // 开始处理 TODO - //orderWebsocketService.dealOrderFromMq(list,9); - } - // 1:买入委托2:开多3:开空4:平多5:平空6:爆仓平多7:爆仓平空8:撤单9:止盈平多10:止盈平空11:止损平多12:止损平空 + /** + * 开多止盈 + * + * @param message 消息体 + * @param channel 信道 + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_MOREPRO) + public void onMessageMorePro(Message message, Channel channel) { + String content = new String(message.getBody()); + log.info("==message-price-consumer==我收到消息了开多止盈 : {}", content); + List<OrderModel> list = JSONArray.parseArray(content, OrderModel.class); + // 开始处理 + orderWebsocketService.dealOrderFromMq(list, 9); + } + // 1:买入委托2:开多3:开空4:平多5:平空6:爆仓平多7:爆仓平空8:撤单9:止盈平多10:止盈平空11:止损平多12:止损平空 - /** - * 开空止盈 - * @param message - * @param channel - */ - @RabbitListener(queues = RabbitMqConfig.QUEUE_LESSPRO) - public void onMessageLessPro(Message message, Channel channel) { - String content = new String(message.getBody()); - System.out.println("我收到消息了开空止盈:"+content); - // 开始处理 - List<OrderModel> list = JSONArray.parseArray(content,OrderModel.class); - // 开始处理 - //orderWebsocketService.dealOrderFromMq(list,10); - } + /** + * 开空止盈 + * + * @param message + * @param channel + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_LESSPRO) + public void onMessageLessPro(Message message, Channel channel) { + String content = new String(message.getBody()); + log.info("==message-price-consumer==我收到消息了开空止盈 : {}", content); + // 开始处理 + List<OrderModel> list = JSONArray.parseArray(content, OrderModel.class); + // 开始处理 + orderWebsocketService.dealOrderFromMq(list, 10); + } - /** - * 开多止损 - * @param message - * @param channel - */ - @RabbitListener(queues = RabbitMqConfig.QUEUE_MORELOSS) - public void onMessageMoreLoss(Message message, Channel channel) { - String content = new String(message.getBody()); - System.out.println("我收到消息了开多止损:"+content); - // 开始处理 - List<OrderModel> list = JSONArray.parseArray(content,OrderModel.class); - // 开始处理 - //orderWebsocketService.dealOrderFromMq(list,11); - } + /** + * 开多止损 + * + * @param message + * @param channel + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_MORELOSS) + public void onMessageMoreLoss(Message message, Channel channel) { + String content = new String(message.getBody()); + log.info("==message-price-consumer==我收到消息了开多止损 : {}", content); + // 开始处理 + List<OrderModel> list = JSONArray.parseArray(content, OrderModel.class); + // 开始处理 + orderWebsocketService.dealOrderFromMq(list, 11); + } - /** - * 开空止损 - * @param message - * @param channel - */ - @RabbitListener(queues = RabbitMqConfig.QUEUE_LESSLOSS) - public void onMessageLessLoss(Message message, Channel channel) { - String content = new String(message.getBody()); - System.out.println("我收到消息了开空止损:"+content); - // 开始处理 - List<OrderModel> list = JSONArray.parseArray(content,OrderModel.class); - // 开始处理 - //orderWebsocketService.dealOrderFromMq(list,12); - } + /** + * 开空止损 + * + * @param message + * @param channel + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_LESSLOSS) + public void onMessageLessLoss(Message message, Channel channel) { + String content = new String(message.getBody()); + log.info("==message-price-consumer==我收到消息了开空止损 : {}", content); + // 开始处理 + List<OrderModel> list = JSONArray.parseArray(content, OrderModel.class); + // 开始处理 + orderWebsocketService.dealOrderFromMq(list, 12); + } - /** - * 限价委托 - * @param message - * @param channel - */ - @RabbitListener(queues = RabbitMqConfig.QUEUE_LIMIT) - public void onMessageLimit(Message message, Channel channel) { - String content = new String(message.getBody()); - System.out.println("我收到消息了限价委托:"+content); - // 开始处理 - List<OrderModel> list = JSONArray.parseArray(content,OrderModel.class); - // 开始处理 - //orderWebsocketService.dealForLimitMq(list); - } + /** + * 限价委托 + * + * @param message + * @param channel + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_LIMIT) + public void onMessageLimit(Message message, Channel channel) { + String content = new String(message.getBody()); + log.info("==message-price-consumer==我收到消息了限价委托 : {}", content); + // 开始处理 + List<OrderModel> list = JSONArray.parseArray(content, OrderModel.class); + // 开始处理 + orderWebsocketService.dealForLimitMq(list); + } - /** - * 爆仓消费者 - * @param message - * @param channel - */ - @RabbitListener(queues = RabbitMqConfig.QUEUE_COINOUT) - public void onMessageCoinout(Message message, Channel channel) { - String content = new String(message.getBody()); - System.out.println("我收到消息了爆仓:"+content); - // 开始处理 - List<OrderModel> list = JSONArray.parseArray(content,OrderModel.class); - // 开始处理 - //orderWebsocketService.dealOrderFromMq(list,6); - } + /** + * 爆仓消费者 + * + * @param message + * @param channel + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_COINOUT) + public void onMessageCoinout(Message message, Channel channel) { + String content = new String(message.getBody()); + log.info("==message-price-consumer==我收到消息了爆仓 : {}", content); + // 开始处理 + List<OrderModel> list = JSONArray.parseArray(content, OrderModel.class); + // 开始处理 + orderWebsocketService.dealOrderFromMq(list, 6); + } - /** - * 平仓 - * @param message - * @param channel - */ - @RabbitListener(queues = RabbitMqConfig.QUEUE_CLOSETRADE) - public void onMessageCloseTrade(Message message, Channel channel) { - String content = new String(message.getBody()); - System.out.println("我收到消息了平仓:"+content); - // 订单 - List<Long> ids = JSONArray.parseArray(content, Long.class); - //orderService.closeTradeForMq(ids); - } + /** + * 平仓 + * + * @param message + * @param channel + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_CLOSETRADE) + public void onMessageCloseTrade(Message message, Channel channel) { + String content = new String(message.getBody()); + log.info("==message-price-consumer==我收到消息了平仓: {}", content); + // 订单 + List<Long> ids = JSONArray.parseArray(content, Long.class); + orderService.cancelHoldOrder(ids); + } + + /** + * 委托平仓 + * + * @param message + * @param channel + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_LIMIT_CLOSE) + public void onMessageLimitClose(Message message, Channel channel) { + String content = new String(message.getBody()); + log.info("==message-price-consumer==我收到消息了委托平仓: {}", content); + List<OrderModel> list = JSONArray.parseArray(content, OrderModel.class); + orderService.entrustCloseOrder(list); + } + + /** + * 全仓爆仓 + * + * @param message + * @param channel + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_WHOLE_BOMB) + public void onMessageWholeBomb(Message message, Channel channel) { + String content = new String(message.getBody()); + WholePriceDataModel wholePriceDataModel = JSONArray.parseObject(content, WholePriceDataModel.class); + log.info("==message-price-consumer==我收到消息了全仓爆仓: {}", wholePriceDataModel); + orderService.wholeBombOrder(wholePriceDataModel); + } } -- Gitblit v1.9.1