Administrator
1 days ago d7953d723a0b629fd9adc55ca140a93d27ec1887
feat(rabbitmq): 新增知识库同步功能

- 在AgentConsumer中添加知识库同步的RabbitMQ消费者
- 在AgentProducer中添加发送知识库同步消息的方法
- 实现AiKnowledgeFileService中的getAddKnowledge方法
- 配置新的RabbitMQ队列、交换机和绑定关系
- 注释掉原有的多个RabbitMQ监听器及相关服务注入
- 添加知识库相关常量和枚举配置
7 files modified
320 ■■■■■ changed files
src/main/java/cc/mrbird/febs/ai/service/AiKnowledgeFileService.java 2 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/service/impl/AiKnowledgeFileServiceImpl.java 36 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/common/configure/RabbitConfigure.java 16 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/rabbit/constants/QueueConstants.java 1 ●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/rabbit/consumer/AgentConsumer.java 253 ●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/rabbit/enumerates/RabbitQueueEnum.java 1 ●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/rabbit/producter/AgentProducer.java 11 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/service/AiKnowledgeFileService.java
@@ -12,4 +12,6 @@
    FebsResponse add(AiKnowledgeFile dto);
    FebsResponse update(AiKnowledgeFile dto);
    void getAddKnowledge(String id);
}
src/main/java/cc/mrbird/febs/ai/service/impl/AiKnowledgeFileServiceImpl.java
@@ -12,6 +12,7 @@
import cc.mrbird.febs.common.entity.FebsResponse;
import cc.mrbird.febs.common.entity.QueryRequest;
import cc.mrbird.febs.common.exception.FebsException;
import cc.mrbird.febs.rabbit.producter.AgentProducer;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
@@ -38,6 +39,7 @@
    private final AiKnowledgeFileMapper aiKnowledgeFileMapper;
    private final AiCompanyService aiCompanyService;
    private final AgentProducer agentProducer;
    @Override
    public IPage<AiKnowledgeFile> listInPage(AiKnowledgeFile dto, QueryRequest request) {
@@ -62,10 +64,26 @@
        entity.setCreatedTime(new Date());
        this.save(entity);
        agentProducer.sendAddKnowledge( entity.getId());
        return new FebsResponse().success().message("操作成功");
    }
    @Override
    public FebsResponse update(AiKnowledgeFile dto) {
        return null;
    }
    @Override
    public void getAddKnowledge(String id) {
        AiKnowledgeFile aiKnowledgeFile = this.getById(id);
        if (ObjectUtil.isNull(aiKnowledgeFile)){
            return;
        }
        String categoryId = null;
        String knowledgeId = null;
        if (StrUtil.isNotEmpty(entity.getCompanyId())){
            AiCompany aiCompany = aiCompanyService.getById(entity.getCompanyId());
        if (StrUtil.isNotEmpty(aiKnowledgeFile.getCompanyId())){
            AiCompany aiCompany = aiCompanyService.getById(aiKnowledgeFile.getCompanyId());
            if (StrUtil.isNotEmpty(aiCompany.getCategoryId())){
                categoryId = aiCompany.getCategoryId();
                knowledgeId = aiCompany.getKnowledgeId();
@@ -75,9 +93,10 @@
            knowledgeId = KnowledgeBaseUtil.DEFAULT_KNOWLEDGE_ID;
        }
        String fileId = KnowledgeBaseUtil.uploadFileToAppData(entity.getSavePath(), categoryId);
        String fileId = KnowledgeBaseUtil.uploadFileToAppData(aiKnowledgeFile.getSavePath(), categoryId);
        if (StrUtil.isBlank(fileId)){
            throw new FebsException("初始化应用数据失败");
            log.info("初始化应用数据失败");
            return ;
        }
        String jobId = KnowledgeBaseUtil.updateKnowledgeBase(fileId, knowledgeId, null);
@@ -85,13 +104,8 @@
                Wrappers.lambdaUpdate(AiKnowledgeFile.class)
                        .set(AiKnowledgeFile::getFileId, fileId)
                        .set(AiKnowledgeFile::getJobId, jobId)
                        .eq(AiKnowledgeFile::getId, entity.getId())
                );
        return new FebsResponse().success().message("操作成功");
    }
                        .eq(AiKnowledgeFile::getId, aiKnowledgeFile.getId())
        );
    @Override
    public FebsResponse update(AiKnowledgeFile dto) {
        return null;
    }
}
src/main/java/cc/mrbird/febs/common/configure/RabbitConfigure.java
@@ -270,4 +270,20 @@
        return BindingBuilder.bind(clothesAddCollectQueue()).to(clothesAddCollectExchange()).with(RabbitQueueEnum.CLOTHES_ADD_COLLECT.getRoute());
    }
    @Bean
    public DirectExchange knowledgeAddExchange() {
        return new DirectExchange(RabbitQueueEnum.KNOWLEDGE_ADD_ALI.getExchange());
    }
    @Bean
    public Queue knowledgeAddQueue() {
        return new Queue(RabbitQueueEnum.KNOWLEDGE_ADD_ALI.getQueue());
    }
    @Bean
    public Binding knowledgeAddBind() {
        return BindingBuilder.bind(knowledgeAddQueue()).to(knowledgeAddExchange()).with(RabbitQueueEnum.KNOWLEDGE_ADD_ALI.getRoute());
    }
}
src/main/java/cc/mrbird/febs/rabbit/constants/QueueConstants.java
@@ -22,4 +22,5 @@
    public static final String CLOTHES_ORDER_CANCEL_DELAY = "queue_order_delay_qay_clothes";
    public static final String CLOTHES_ADD_LIKE = "queue_clothes_add_like";
    public static final String CLOTHES_ADD_COLLECT = "queue_clothes_add_collect";
    public static final String KNOWLEDGE_ADD_ALI = "queue_knowledge_add_ali";
}
src/main/java/cc/mrbird/febs/rabbit/consumer/AgentConsumer.java
@@ -1,5 +1,6 @@
package cc.mrbird.febs.rabbit.consumer;
import cc.mrbird.febs.ai.service.AiKnowledgeFileService;
import cc.mrbird.febs.mall.service.*;
import cc.mrbird.febs.rabbit.constants.QueueConstants;
import cc.mrbird.febs.rabbit.enumerates.RabbitQueueEnum;
@@ -20,7 +21,7 @@
 **/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "system", name = "job", havingValue = "true")
//@ConditionalOnProperty(prefix = "system", name = "job", havingValue = "true")
public class AgentConsumer {
    @Autowired
@@ -34,130 +35,142 @@
    private IMemberProfitService memberProfitService;
    @Autowired
    private HappyActivityService happyActivityService;
    @RabbitListener(queues = QueueConstants.QUEUE_DEFAULT)
    public void agentReturn(Message message, Channel channel) {
        log.info("消费者:{}", new String(message.getBody()));
    }
    @RabbitListener(queues = QueueConstants.CLOTHES_ORDER_CANCEL_DELAY)
    public void orderCancelDelayClothes(String id) {
        try {
            apiClothesOrderService.orderCancelDelayClothes(Long.parseLong(id));
        } catch (Exception e) {
            log.error("订单超时支付异常", e);
        }
    }
    @RabbitListener(queues = "queue_order_delay_qay")
    public void orderCancelDelay(String id) {
        try {
            orderInfoService.autoCancelOrder(Long.parseLong(id));
        } catch (Exception e) {
            log.error("订单超时支付异常", e);
        }
    }
    @RabbitListener(queues = QueueConstants.AGENT_AUTO_LEVEL_UP)
    public void agentAutoLevelUp(String id) {
        log.info("收到代理自动升级消息:{}", id);
        try {
            agentService.autoUpAgentLevel(Long.parseLong(id));
        } catch (Exception e) {
            log.error("代理自动升级异常", e);
        }
    }
    @RabbitListener(queues = QueueConstants.AGENT_RETURN_MONEY)
    public void agentReturnMoney(String orderId) {
        log.info("收到返利消息:{}", orderId);
        try {
            agentService.returnMoneyToAgent(Long.parseLong(orderId));
        } catch (Exception e) {
            log.error("返利异常", e);
        }
    }
    @RabbitListener(queues = QueueConstants.ORDER_RETURN_MONEY)
    public void orderReturnMoney(String orderId) {
        log.info("收到订单返利消息:{}", orderId);
        try {
            memberProfitService.dynamicProfit(Long.parseLong(orderId));
        } catch (Exception e) {
            log.error("订单返利异常:", e);
        }
    }
    @RabbitListener(queues = QueueConstants.ORDER_COUPON)
    public void getOrderCoupon(Long orderId) {
        log.info("收到使用优惠卷消息:{}", orderId);
        try {
            memberProfitService.getOrderCoupon(orderId);
        } catch (Exception e) {
            log.error("使用优惠卷异常:", e);
        }
    }
    @Autowired
    private IVipCommonService vipCommonService;
    private AiKnowledgeFileService aiKnowledgeFileService;
    @RabbitListener(queues = QueueConstants.GET_SCORE_MSG)
    public void getScoreMsg(Long orderId) {
        log.info("收到积分消息:{}", orderId);
        try {
            vipCommonService.getScore(orderId);
        } catch (Exception e) {
            log.error("获取积分消息异常", e);
        }
    }
//    @RabbitListener(queues = QueueConstants.QUEUE_DEFAULT)
//    public void agentReturn(Message message, Channel channel) {
//        log.info("消费者:{}", new String(message.getBody()));
//    }
//
//    @RabbitListener(queues = QueueConstants.CLOTHES_ORDER_CANCEL_DELAY)
//    public void orderCancelDelayClothes(String id) {
//        try {
//            apiClothesOrderService.orderCancelDelayClothes(Long.parseLong(id));
//        } catch (Exception e) {
//            log.error("订单超时支付异常", e);
//        }
//    }
//
//    @RabbitListener(queues = "queue_order_delay_qay")
//    public void orderCancelDelay(String id) {
//        try {
//            orderInfoService.autoCancelOrder(Long.parseLong(id));
//        } catch (Exception e) {
//            log.error("订单超时支付异常", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.AGENT_AUTO_LEVEL_UP)
//    public void agentAutoLevelUp(String id) {
//        log.info("收到代理自动升级消息:{}", id);
//        try {
//            agentService.autoUpAgentLevel(Long.parseLong(id));
//        } catch (Exception e) {
//            log.error("代理自动升级异常", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.AGENT_RETURN_MONEY)
//    public void agentReturnMoney(String orderId) {
//        log.info("收到返利消息:{}", orderId);
//        try {
//            agentService.returnMoneyToAgent(Long.parseLong(orderId));
//        } catch (Exception e) {
//            log.error("返利异常", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.ORDER_RETURN_MONEY)
//    public void orderReturnMoney(String orderId) {
//        log.info("收到订单返利消息:{}", orderId);
//        try {
//            memberProfitService.dynamicProfit(Long.parseLong(orderId));
//        } catch (Exception e) {
//            log.error("订单返利异常:", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.ORDER_COUPON)
//    public void getOrderCoupon(Long orderId) {
//        log.info("收到使用优惠卷消息:{}", orderId);
//        try {
//            memberProfitService.getOrderCoupon(orderId);
//        } catch (Exception e) {
//            log.error("使用优惠卷异常:", e);
//        }
//    }
//
//    @Autowired
//    private IVipCommonService vipCommonService;
//
//    @RabbitListener(queues = QueueConstants.GET_SCORE_MSG)
//    public void getScoreMsg(Long orderId) {
//        log.info("收到积分消息:{}", orderId);
//        try {
//            vipCommonService.getScore(orderId);
//        } catch (Exception e) {
//            log.error("获取积分消息异常", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.VIP_LEVEL_UP)
//    public void vipLevelUp(Long orderId) {
//        log.info("收到会员升级消息:{}", orderId);
//        try {
//            vipCommonService.levelUp(orderId);
//        } catch (Exception e) {
//            log.error("会员升级消息异常", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.SALE_LEVEL_UP)
//    public void saleLevelUp(Long orderId) {
//        log.info("收到团长升级:{}", orderId);
//        try {
//            vipCommonService.saleLevelUp(orderId);
//        } catch (Exception e) {
//            log.error("团长升级消息异常", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.ACTIVITY_ORDER_ITEM_CHECK)
//    public void checkActivityItem(Long orderId) {
//        log.info("收到核销活动门票:{}", orderId);
//        try {
//            happyActivityService.checkActivityItem(orderId);
//        } catch (Exception e) {
//            log.error("核销活动门票异常", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.CLOTHES_ADD_LIKE)
//    public void getAddLike(Long socialId) {
//        log.info("点赞:{}", socialId);
//        try {
//            happyActivityService.getAddLike(socialId);
//        } catch (Exception e) {
//            log.error("点赞异常", e);
//        }
//    }
//
//    @RabbitListener(queues = QueueConstants.CLOTHES_ADD_COLLECT)
//    public void getAddCollect(Long socialId) {
//        log.info("收藏:{}", socialId);
//        try {
//            happyActivityService.getAddCollect(socialId);
//        } catch (Exception e) {
//            log.error("收藏异常", e);
//        }
//    }
    @RabbitListener(queues = QueueConstants.VIP_LEVEL_UP)
    public void vipLevelUp(Long orderId) {
        log.info("收到会员升级消息:{}", orderId);
    @RabbitListener(queues = QueueConstants.KNOWLEDGE_ADD_ALI)
    public void getAddKnowledge(String id) {
        log.info("知识库:{}", id);
        try {
            vipCommonService.levelUp(orderId);
            aiKnowledgeFileService.getAddKnowledge(id);
        } catch (Exception e) {
            log.error("会员升级消息异常", e);
        }
    }
    @RabbitListener(queues = QueueConstants.SALE_LEVEL_UP)
    public void saleLevelUp(Long orderId) {
        log.info("收到团长升级:{}", orderId);
        try {
            vipCommonService.saleLevelUp(orderId);
        } catch (Exception e) {
            log.error("团长升级消息异常", e);
        }
    }
    @RabbitListener(queues = QueueConstants.ACTIVITY_ORDER_ITEM_CHECK)
    public void checkActivityItem(Long orderId) {
        log.info("收到核销活动门票:{}", orderId);
        try {
            happyActivityService.checkActivityItem(orderId);
        } catch (Exception e) {
            log.error("核销活动门票异常", e);
        }
    }
    @RabbitListener(queues = QueueConstants.CLOTHES_ADD_LIKE)
    public void getAddLike(Long socialId) {
        log.info("点赞:{}", socialId);
        try {
            happyActivityService.getAddLike(socialId);
        } catch (Exception e) {
            log.error("点赞异常", e);
        }
    }
    @RabbitListener(queues = QueueConstants.CLOTHES_ADD_COLLECT)
    public void getAddCollect(Long socialId) {
        log.info("收藏:{}", socialId);
        try {
            happyActivityService.getAddCollect(socialId);
        } catch (Exception e) {
            log.error("收藏异常", e);
            log.error("知识库异常", e);
        }
    }
}
src/main/java/cc/mrbird/febs/rabbit/enumerates/RabbitQueueEnum.java
@@ -7,6 +7,7 @@
public enum RabbitQueueEnum {
    KNOWLEDGE_ADD_ALI("exchange_knowledge_add_ali", "route_key_knowledge_add_ali", QueueConstants.KNOWLEDGE_ADD_ALI),
    CLOTHES_ADD_COLLECT("exchange_clothes_add_collect", "route_key_clothes_add_collect", QueueConstants.CLOTHES_ADD_COLLECT),
    CLOTHES_ADD_LIKE("exchange_clothes_add_like", "route_key_clothes_add_like", QueueConstants.CLOTHES_ADD_LIKE),
    ACTIVITY_ORDER_ITEM_CHECK("exchange_activity_order_item_check", "route_key_activity_order_item_check", QueueConstants.ACTIVITY_ORDER_ITEM_CHECK),
src/main/java/cc/mrbird/febs/rabbit/producter/AgentProducer.java
@@ -136,4 +136,15 @@
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitQueueEnum.CLOTHES_ADD_COLLECT.getExchange(), RabbitQueueEnum.CLOTHES_ADD_COLLECT.getRoute(), socialId, correlationData);
    }
    public void sendAddKnowledge(String id) {
        log.info("知识库:{}", id);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(
                RabbitQueueEnum.CLOTHES_ADD_COLLECT.getExchange(),
                RabbitQueueEnum.CLOTHES_ADD_COLLECT.getRoute(),
                id,
                correlationData);
    }
}