src/main/java/cc/mrbird/febs/ai/controller/memberTalk/ApiMemberTalkController.java
@@ -3,7 +3,9 @@ import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkAnswerDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkItemPageDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkStreamDto; import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkItemVo; import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkStreamVo; import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkVo; import cc.mrbird.febs.ai.service.AiMemberTalkService; import cc.mrbird.febs.common.entity.FebsResponse; @@ -11,11 +13,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import reactor.core.publisher.Flux; import javax.servlet.http.HttpServletRequest; import java.io.IOException; @@ -93,4 +93,14 @@ return emitter; } // 修改接口定义 @ApiOperation("开始AI对话(流式)") @ApiResponses({ @ApiResponse(code = 200, message = "流式响应", response = ApiMemberTalkStreamVo.class), }) @GetMapping("/answer-stream") public Flux<FebsResponse> answerStream(@RequestParam String question) { return aiMemberTalkService.answerStream(question); } } src/main/java/cc/mrbird/febs/ai/req/memberTalk/ApiMemberTalkStreamDto.java
New file @@ -0,0 +1,19 @@ package cc.mrbird.febs.ai.req.memberTalk; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; import javax.validation.constraints.NotBlank; /** * @author Administrator */ @Data @ApiModel(value = "ApiMemberTalkStreamDto", description = "参数") public class ApiMemberTalkStreamDto { @NotBlank(message = "用户提问不能为空") @ApiModelProperty(value = "用户提问", example = "10") private String question; } src/main/java/cc/mrbird/febs/ai/res/memberTalk/ApiMemberTalkStreamVo.java
New file @@ -0,0 +1,16 @@ package cc.mrbird.febs.ai.res.memberTalk; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; /** * @author Administrator */ @Data @ApiModel(value = "ApiMemberTalkStreamVo", description = "参数") public class ApiMemberTalkStreamVo { @ApiModelProperty(value = "消息") private String content; } src/main/java/cc/mrbird/febs/ai/service/AiMemberTalkService.java
@@ -4,10 +4,12 @@ import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkAnswerDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkItemPageDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkStreamDto; import cc.mrbird.febs.ai.res.ai.AiResponse; import cc.mrbird.febs.common.entity.FebsResponse; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.IService; import reactor.core.publisher.Flux; import java.util.Date; import java.util.function.Consumer; @@ -36,4 +38,6 @@ AiMemberTalk add(String memberUuid, String productId, Date nowTime); FebsResponse historyPage(ApiMemberTalkItemPageDto dto); Flux<FebsResponse> answerStream(String question); } src/main/java/cc/mrbird/febs/ai/service/AiService.java
@@ -4,6 +4,8 @@ import cc.mrbird.febs.ai.req.ai.AiRequest; import cc.mrbird.febs.ai.res.ai.AiResponse; import cc.mrbird.febs.ai.res.ai.Report; import cc.mrbird.febs.common.entity.FebsResponse; import reactor.core.publisher.Flux; import java.util.List; import java.util.function.Consumer; @@ -26,4 +28,6 @@ * @return 解析后的报告对象 */ Report extractReportData(String modelOutput); Flux<FebsResponse> answerStream(String question); } src/main/java/cc/mrbird/febs/ai/service/impl/AiMemberTalkServiceImpl.java
@@ -10,6 +10,7 @@ import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkAnswerDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkItemPageDto; import cc.mrbird.febs.ai.req.memberTalk.ApiMemberTalkStreamDto; import cc.mrbird.febs.ai.res.ai.AiResponse; import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkVo; import cc.mrbird.febs.ai.service.AiMemberTalkItemService; @@ -30,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import reactor.core.publisher.Flux; import java.util.Date; import java.util.List; @@ -256,4 +258,12 @@ dto.setMemberUuid(memberUuid); return aiMemberTalkItemService.historyPage(dto); } @Override public Flux<FebsResponse> answerStream(String question) { // String memberUuid = LoginUserUtil.getLoginUser().getMemberUuid(); return aiService.answerStream(question); } } src/main/java/cc/mrbird/febs/ai/service/impl/AiServiceImpl.java
@@ -7,8 +7,10 @@ import cc.mrbird.febs.ai.res.ai.AiResponse; import cc.mrbird.febs.ai.res.ai.RadarDataItem; import cc.mrbird.febs.ai.res.ai.Report; import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkStreamVo; import cc.mrbird.febs.ai.service.AiProductRoleService; import cc.mrbird.febs.ai.service.AiService; import cc.mrbird.febs.common.entity.FebsResponse; import cn.hutool.core.collection.CollUtil; import cn.hutool.json.JSONUtil; import com.fasterxml.jackson.core.JsonProcessingException; @@ -23,10 +25,12 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -354,6 +358,59 @@ } } // 修改服务实现 @Override public Flux<FebsResponse> answerStream(String question) { log.info("----- standard request -----"); final ChatMessage systemMessage = ChatMessage.builder() .role(ChatMessageRole.SYSTEM) .content("你是豆包,是由字节跳动开发的 AI 人工智能助手") .build(); final ChatMessage userMessage = ChatMessage.builder() .role(ChatMessageRole.USER) .content(question) .build(); List<ChatMessage> messages = Arrays.asList(systemMessage, userMessage); ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder() .model("ep-20250805124033-lhxbf") .messages(messages) .stream(true) .temperature(0.7) .topP(0.9) .maxTokens(2048) .frequencyPenalty(0.0) .build(); return Flux.from(service.streamChatCompletion(chatCompletionRequest)) .map(response -> { if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) { return new FebsResponse().success().data("END"); } ChatCompletionChoice choice = response.getChoices().get(0); if (choice == null || choice.getMessage() == null) { return new FebsResponse().success().data("END"); } Object contentObj = choice.getMessage().getContent(); String content = contentObj == null ? "" : contentObj.toString(); ApiMemberTalkStreamVo apiMemberTalkStreamVo = new ApiMemberTalkStreamVo(); apiMemberTalkStreamVo.setContent(content); return new FebsResponse().success().data(apiMemberTalkStreamVo); }) .onErrorResume(throwable -> { log.error("流式调用AI服务失败,问题输入: {}", question, throwable); FebsResponse errorResponse = new FebsResponse().message("AI服务调用失败"); return Flux.just(errorResponse); }); } private Report tryRepairTruncatedJson(String truncatedJson) { String[] repairAttempts = { truncatedJson + "\"}}}", src/main/java/cc/mrbird/febs/common/configure/FebsConfigure.java
@@ -34,6 +34,21 @@ private final FebsProperties properties; // 专门用于MVC异步处理的线程池 @Bean("mvcTaskExecutor") public ThreadPoolTaskExecutor mvcTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(300); executor.setThreadNamePrefix("mvc-async-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } @Bean(FebsConstant.ASYNC_POOL) public ThreadPoolTaskExecutor asyncThreadPoolTaskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); src/main/java/cc/mrbird/febs/common/configure/WebMvcConfigure.java
@@ -2,9 +2,12 @@ import cc.mrbird.febs.common.interceptor.LoginInterceptor; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer; import org.springframework.web.servlet.config.annotation.InterceptorRegistration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import javax.annotation.Resource; /** * @author xxx @@ -12,6 +15,13 @@ **/ @Configuration public class WebMvcConfigure implements WebMvcConfigurer { @Resource private FebsConfigure febsConfigure; @Override public void configureAsyncSupport(AsyncSupportConfigurer configurer) { configurer.setTaskExecutor(febsConfigure.mvcTaskExecutor()); // 使用专门的线程池 configurer.setDefaultTimeout(30_000); } @Override public void addInterceptors(InterceptorRegistry registry) { @@ -52,5 +62,6 @@ registration.excludePathPatterns("/api/ai/product/**"); registration.excludePathPatterns("/api/ai/productPoint/**"); registration.excludePathPatterns("/api/ai/test/**"); registration.excludePathPatterns("/api/ai/memberTalk/answer-stream"); } }