Administrator
2025-08-22 fb0722b250d3f77c93a32cbdd347c474008638ca
feat(ai): 添加 AI 对话流式接口并优化相关服务

- 在 AiMemberTalkService 和 AiService 中新增 answerStream 方法支持流式回答
- 实现 ApiMemberTalkStreamDto 和 ApiMemberTalkStreamVo 用于流式对话的数据传输- 在控制器中添加 /answer-stream 接口支持流式对话
- 优化线程池配置,增加专门用于 MVC 异步处理的线程池
-调整 WebMvcConfigurer 以支持异步请求处理
7 files modified
2 files added
154 ■■■■■ changed files
src/main/java/cc/mrbird/febs/ai/controller/memberTalk/ApiMemberTalkController.java 18 ●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/req/memberTalk/ApiMemberTalkStreamDto.java 19 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/res/memberTalk/ApiMemberTalkStreamVo.java 16 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/service/AiMemberTalkService.java 4 ●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/service/AiService.java 4 ●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/service/impl/AiMemberTalkServiceImpl.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/service/impl/AiServiceImpl.java 57 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/common/configure/FebsConfigure.java 15 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/common/configure/WebMvcConfigure.java 11 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/ai/controller/memberTalk/ApiMemberTalkController.java
@@ -3,7 +3,9 @@
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkAnswerDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkItemPageDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkStreamDto;
import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkItemVo;
import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkStreamVo;
import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkVo;
import cc.mrbird.febs.ai.service.AiMemberTalkService;
import cc.mrbird.febs.common.entity.FebsResponse;
@@ -11,11 +13,9 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
@@ -93,4 +93,14 @@
        return emitter;
    }
    // 修改接口定义
    @ApiOperation("开始AI对话(流式)")
    @ApiResponses({
            @ApiResponse(code = 200, message = "流式响应", response = ApiMemberTalkStreamVo.class),
    })
    @GetMapping("/answer-stream")
    public Flux<FebsResponse> answerStream(@RequestParam String question) {
        return aiMemberTalkService.answerStream(question);
    }
}
src/main/java/cc/mrbird/febs/ai/req/memberTalk/ApiMemberTalkStreamDto.java
New file
@@ -0,0 +1,19 @@
package cc.mrbird.febs.ai.req.memberTalk;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
/**
 * @author Administrator
 */
@Data
@ApiModel(value = "ApiMemberTalkStreamDto", description = "参数")
public class ApiMemberTalkStreamDto {
    @NotBlank(message = "用户提问不能为空")
    @ApiModelProperty(value = "用户提问", example = "10")
    private String question;
}
src/main/java/cc/mrbird/febs/ai/res/memberTalk/ApiMemberTalkStreamVo.java
New file
@@ -0,0 +1,16 @@
package cc.mrbird.febs.ai.res.memberTalk;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
 * @author Administrator
 */
@Data
@ApiModel(value = "ApiMemberTalkStreamVo", description = "参数")
public class ApiMemberTalkStreamVo {
    @ApiModelProperty(value = "消息")
    private String content;
}
src/main/java/cc/mrbird/febs/ai/service/AiMemberTalkService.java
@@ -4,10 +4,12 @@
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkAnswerDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkItemPageDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkStreamDto;
import cc.mrbird.febs.ai.res.ai.AiResponse;
import cc.mrbird.febs.common.entity.FebsResponse;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.IService;
import reactor.core.publisher.Flux;
import java.util.Date;
import java.util.function.Consumer;
@@ -36,4 +38,6 @@
    AiMemberTalk add(String memberUuid, String productId, Date nowTime);
    FebsResponse historyPage(ApiMemberTalkItemPageDto dto);
    Flux<FebsResponse> answerStream(String question);
}
src/main/java/cc/mrbird/febs/ai/service/AiService.java
@@ -4,6 +4,8 @@
import cc.mrbird.febs.ai.req.ai.AiRequest;
import cc.mrbird.febs.ai.res.ai.AiResponse;
import cc.mrbird.febs.ai.res.ai.Report;
import cc.mrbird.febs.common.entity.FebsResponse;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.function.Consumer;
@@ -26,4 +28,6 @@
     * @return 解析后的报告对象
     */
    Report extractReportData(String modelOutput);
    Flux<FebsResponse> answerStream(String question);
}
src/main/java/cc/mrbird/febs/ai/service/impl/AiMemberTalkServiceImpl.java
@@ -10,6 +10,7 @@
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkAnswerDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkItemPageDto;
import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkStreamDto;
import cc.mrbird.febs.ai.res.ai.AiResponse;
import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkVo;
import cc.mrbird.febs.ai.service.AiMemberTalkItemService;
@@ -30,6 +31,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import java.util.Date;
import java.util.List;
@@ -256,4 +258,12 @@
        dto.setMemberUuid(memberUuid);
        return aiMemberTalkItemService.historyPage(dto);
    }
    @Override
    public Flux<FebsResponse> answerStream(String question) {
//        String memberUuid = LoginUserUtil.getLoginUser().getMemberUuid();
        return aiService.answerStream(question);
    }
}
src/main/java/cc/mrbird/febs/ai/service/impl/AiServiceImpl.java
@@ -7,8 +7,10 @@
import cc.mrbird.febs.ai.res.ai.AiResponse;
import cc.mrbird.febs.ai.res.ai.RadarDataItem;
import cc.mrbird.febs.ai.res.ai.Report;
import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkStreamVo;
import cc.mrbird.febs.ai.service.AiProductRoleService;
import cc.mrbird.febs.ai.service.AiService;
import cc.mrbird.febs.common.entity.FebsResponse;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -23,10 +25,12 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -354,6 +358,59 @@
        }
    }
        // 修改服务实现
    @Override
    public Flux<FebsResponse> answerStream(String question) {
        log.info("----- standard request -----");
        final ChatMessage systemMessage = ChatMessage.builder()
                .role(ChatMessageRole.SYSTEM)
                .content("你是豆包,是由字节跳动开发的 AI 人工智能助手")
                .build();
        final ChatMessage userMessage = ChatMessage.builder()
                .role(ChatMessageRole.USER)
                .content(question)
                .build();
        List<ChatMessage> messages = Arrays.asList(systemMessage, userMessage);
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                .model("ep-20250805124033-lhxbf")
                .messages(messages)
                .stream(true)
                .temperature(0.7)
                .topP(0.9)
                .maxTokens(2048)
                .frequencyPenalty(0.0)
                .build();
        return Flux.from(service.streamChatCompletion(chatCompletionRequest))
                .map(response -> {
                    if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) {
                        return new FebsResponse().success().data("END");
                    }
                    ChatCompletionChoice choice = response.getChoices().get(0);
                    if (choice == null || choice.getMessage() == null) {
                        return new FebsResponse().success().data("END");
                    }
                    Object contentObj = choice.getMessage().getContent();
                    String content = contentObj == null ? "" : contentObj.toString();
                    ApiMemberTalkStreamVo apiMemberTalkStreamVo = new ApiMemberTalkStreamVo();
                    apiMemberTalkStreamVo.setContent(content);
                    return new FebsResponse().success().data(apiMemberTalkStreamVo);
                })
                .onErrorResume(throwable -> {
                    log.error("流式调用AI服务失败,问题输入: {}", question, throwable);
                    FebsResponse errorResponse = new FebsResponse().message("AI服务调用失败");
                    return Flux.just(errorResponse);
                });
    }
    private Report tryRepairTruncatedJson(String truncatedJson) {
        String[] repairAttempts = {
                truncatedJson + "\"}}}",
src/main/java/cc/mrbird/febs/common/configure/FebsConfigure.java
@@ -34,6 +34,21 @@
    private final FebsProperties properties;
    // 专门用于MVC异步处理的线程池
    @Bean("mvcTaskExecutor")
    public ThreadPoolTaskExecutor mvcTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(300);
        executor.setThreadNamePrefix("mvc-async-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
    @Bean(FebsConstant.ASYNC_POOL)
    public ThreadPoolTaskExecutor asyncThreadPoolTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
src/main/java/cc/mrbird/febs/common/configure/WebMvcConfigure.java
@@ -2,9 +2,12 @@
import cc.mrbird.febs.common.interceptor.LoginInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.InterceptorRegistration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import javax.annotation.Resource;
/**
 * @author xxx
@@ -12,6 +15,13 @@
 **/
@Configuration
public class WebMvcConfigure implements WebMvcConfigurer {
    @Resource
    private FebsConfigure febsConfigure;
    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setTaskExecutor(febsConfigure.mvcTaskExecutor()); // 使用专门的线程池
        configurer.setDefaultTimeout(30_000);
    }
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
@@ -52,5 +62,6 @@
        registration.excludePathPatterns("/api/ai/product/**");
        registration.excludePathPatterns("/api/ai/productPoint/**");
        registration.excludePathPatterns("/api/ai/test/**");
        registration.excludePathPatterns("/api/ai/memberTalk/answer-stream");
    }
}