package cc.mrbird.febs.ai.service.impl; import cc.mrbird.febs.ai.enumerates.AiTypeEnum; import cc.mrbird.febs.ai.entity.AiProductRole; import cc.mrbird.febs.ai.req.ai.AiMessage; import cc.mrbird.febs.ai.req.ai.AiRequest; import cc.mrbird.febs.ai.res.ai.AiResponse; import cc.mrbird.febs.ai.res.ai.RadarDataItem; import cc.mrbird.febs.ai.res.ai.Report; import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkStreamVo; import cc.mrbird.febs.ai.service.AiProductRoleService; import cc.mrbird.febs.ai.service.AiService; import cc.mrbird.febs.common.entity.FebsResponse; import cn.hutool.core.collection.CollUtil; import cn.hutool.json.JSONUtil; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.volcengine.ark.runtime.model.completion.chat.*; import com.volcengine.ark.runtime.service.ArkService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import okhttp3.ConnectionPool; import okhttp3.Dispatcher; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; /** * @author Administrator */ @Slf4j @Service @RequiredArgsConstructor public class AiServiceImpl implements AiService { private static final String CODE_SUCCESS = "200"; private static final String CODE_GOING_ON = "199"; private static final String CODE_NOT_FOUND = "201"; private static final String CODE_ERROR = "500"; private static final String SCHEMA_JSON = "{\n" + " \"radar_data\": {\n" + " \"problem_understanding\": \"object\",\n" + " \"fluency\": \"object\",\n" + " \"principle_adherence\": \"object\",\n" + " \"logicality\": \"object\",\n" + " \"knowledge_mastery\": \"object\"\n" + " },\n" + " \"evaluation\": {\n" + " \"highlight\": \"object\",\n" + " \"suggestion\": \"object\",\n" + " \"reference_answer\": \"object\",\n" + " \"key_knowledge\": \"object\"\n" + " }\n" + " }"; private final AiProductRoleService aiProductRoleService; private final ObjectMapper objectMapper; @Value("${ai.service.ak}") private String ak; @Value("${ai.service.sk}") private String sk; @Value("${ai.service.base-url}") private String baseUrl; private ArkService service; @PostConstruct public void init() { // 增加连接池大小和存活时间 ConnectionPool connectionPool = new ConnectionPool(32, 60, TimeUnit.SECONDS); Dispatcher dispatcher = new Dispatcher(); // 增加并发请求数量 dispatcher.setMaxRequests(128); dispatcher.setMaxRequestsPerHost(32); this.service = ArkService.builder() .dispatcher(dispatcher) .connectionPool(connectionPool) .baseUrl(baseUrl) .ak(ak) .sk(sk) .build(); } @PreDestroy public void destroy() { if (service != null) { service.shutdownExecutor(); } } @Override public AiResponse start(List aiMessageDtoList,Integer type,String productRoleId, String content, String question) { if (!StringUtils.hasText(productRoleId)) { log.warn("productRoleId 不能为空"); return buildErrorResponse(CODE_NOT_FOUND, "AI陪练不存在"); } AiProductRole aiProductRole = aiProductRoleService.getById(productRoleId); if (aiProductRole == null) { log.warn("未找到对应的角色配置,productRoleId: {}", productRoleId); return buildErrorResponse(CODE_NOT_FOUND, "AI陪练不存在"); } String promptTemplate = "作为一个智能助手,请回答我提出的问题。"; if (AiTypeEnum.QUESTION.getCode() == type){ promptTemplate = aiProductRole.getPromptHead(); } if (AiTypeEnum.ANSWER.getCode() == type){ promptTemplate = aiProductRole.getPromptTemplate()+question; } log.info("promptTemplate: {}", promptTemplate); String linkId = aiProductRole.getLinkId(); String jsonTemplate = aiProductRole.getJsonTemplate(); if ( !StringUtils.hasText(promptTemplate) || !StringUtils.hasText(linkId) || !StringUtils.hasText(jsonTemplate) ) { log.warn("角色配置不完整,promptTemplate 或 linkId 或 jsonTemplate为空,productRoleId: {}", productRoleId); return buildErrorResponse(CODE_ERROR, "角色配置不完整"); } AiRequest aiRequest = new AiRequest(); aiRequest.setPromptTemplate(promptTemplate); aiRequest.setJsonTemplate(jsonTemplate); aiRequest.setLinkId(linkId); aiRequest.setContent(content); if (CollUtil.isNotEmpty(aiMessageDtoList)){ aiRequest.setAiMessageDtoList(aiMessageDtoList); } return this.question(aiRequest); } @Override public AiResponse question(AiRequest aiRequest) { String promptTemplate = aiRequest.getPromptTemplate(); String linkId = aiRequest.getLinkId(); String content = aiRequest.getContent(); String jsonTemplate = aiRequest.getJsonTemplate(); if ( !StringUtils.hasText(promptTemplate) || !StringUtils.hasText(linkId) || !StringUtils.hasText(content) || !StringUtils.hasText(jsonTemplate) ) { log.warn("请求参数不完整,promptTemplate: {}, linkId: {}, content: {}, jsonTemplate: {}", promptTemplate, linkId, content, jsonTemplate); return buildErrorResponse(CODE_ERROR, "请求参数不完整"); } List messages = new ArrayList<>(); ChatMessage systemMessage = ChatMessage.builder().role(ChatMessageRole.SYSTEM).content(promptTemplate).build(); ChatMessage userMessage = ChatMessage.builder().role(ChatMessageRole.USER).content(content).build(); messages.add(systemMessage); if (CollUtil.isNotEmpty(aiRequest.getAiMessageDtoList())){ aiRequest.getAiMessageDtoList().forEach(aiMessageDto -> { ChatMessage message = ChatMessage.builder() .role(aiMessageDto.getRole()) .content(aiMessageDto.getContent()) .build(); messages.add(message); }); } messages.add(userMessage); try { JsonNode schemaNode = objectMapper.readTree(jsonTemplate); ChatCompletionRequest.ChatCompletionRequestResponseFormat responseFormat = new ChatCompletionRequest.ChatCompletionRequestResponseFormat( "json_schema", new ResponseFormatJSONSchemaJSONSchemaParam( "ai_response", "json数据响应", schemaNode, true ) ); ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder() .model(linkId) .messages(messages) .stream(false) .responseFormat(responseFormat) .temperature(0.7) // 降低温度参数,提高确定性,可能提升速度 .topP(0.9) // 调整topP参数 .maxTokens(2048) // 减少最大token数 .frequencyPenalty(0.0) .build(); List choices = service.createChatCompletion(chatCompletionRequest).getChoices(); String result = choices.stream() .map(choice -> choice.getMessage().getContent()) .filter(contentObj -> contentObj != null) .map(Object::toString) .collect(Collectors.joining()); Report report = this.extractReportData(result); return buildSuccessResponse(report, result); } catch (JsonProcessingException e) { log.error("初始化AI服务失败,JSON格式化输出初始化失败", e); return buildErrorResponse(CODE_ERROR, "AI服务调用失败"); } catch (Exception e) { log.error("调用AI服务失败,modelId: {}", linkId, e); return buildErrorResponse(CODE_ERROR, "AI服务调用失败"); } } public static void main(String[] args) { Report report = new Report(); List radarDataItems = new ArrayList<>(); RadarDataItem item1 = new RadarDataItem(); item1.setName("A"); item1.setCode("A"); item1.setScore("80"); radarDataItems.add(item1); RadarDataItem item2 = new RadarDataItem(); item2.setName("A"); item2.setCode("A"); item2.setScore("80"); radarDataItems.add(item2); report.setRadarDataItems(radarDataItems); System.out.println(JSONUtil.parse( report)); } @Override public void streamQuestion(AiRequest aiRequest, Consumer callback) { String promptTemplate = aiRequest.getPromptTemplate(); String linkId = aiRequest.getLinkId(); String content = aiRequest.getContent(); if (!StringUtils.hasText(promptTemplate) || !StringUtils.hasText(linkId) || !StringUtils.hasText(content)) { log.warn("请求参数不完整,promptTemplate: {}, linkId: {}, content: {}", promptTemplate, linkId, content); } final List messages = new ArrayList<>(); final ChatMessage systemMessage = ChatMessage.builder().role(ChatMessageRole.SYSTEM).content(promptTemplate).build(); final ChatMessage userMessage = ChatMessage.builder().role(ChatMessageRole.USER).content(content).build(); messages.add(systemMessage); messages.add(userMessage); try { JsonNode schemaNode = objectMapper.readTree(SCHEMA_JSON); ChatCompletionRequest.ChatCompletionRequestResponseFormat responseFormat = new ChatCompletionRequest.ChatCompletionRequestResponseFormat( "json_schema", new ResponseFormatJSONSchemaJSONSchemaParam( "ai_response", "json数据响应", schemaNode, true ) ); ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder() .model(linkId) .messages(messages) .stream(true) // 启用流式响应 .responseFormat(responseFormat) .temperature(0.7) .topP(0.9) .maxTokens(2048) .build(); service.streamChatCompletion(chatCompletionRequest) .doOnError(Throwable::printStackTrace) // 处理错误 .blockingForEach(response -> { AiResponse partialResponse = new AiResponse(); if (response.getChoices() != null && !response.getChoices().isEmpty()) { String responseStr = String.valueOf(response.getChoices().get(0).getMessage().getContent()); if (responseStr != null) { // 构造部分响应并回调 partialResponse = buildGOINGONResponse(responseStr); } }else{ partialResponse = buildPartialResponse("成功"); } callback.accept(partialResponse); }); // service.streamChatCompletion(chatCompletionRequest) // .doOnError(throwable -> { // log.error("流式调用AI服务失败", throwable); // callback.accept(buildErrorResponse(CODE_ERROR, "AI服务调用失败")); // }) // .subscribe(chatCompletionChunk -> { // // 处理每个数据块 // Object chunkContent = chatCompletionChunk.getChoices().get(0).getMessage().getContent(); // // 构造部分响应并回调 // AiResponse partialResponse = buildGOINGONResponse(chunkContent); // callback.accept(partialResponse); // }); } catch (Exception e) { log.error("调用AI服务失败", e); callback.accept(buildErrorResponse(CODE_ERROR, "AI服务调用失败")); } } private AiResponse buildGOINGONResponse(Object chunkContent) { AiResponse response = new AiResponse(); response.setCode(CODE_GOING_ON); response.setDescription("成功"); response.setResContext(chunkContent.toString()); return response; } private AiResponse buildPartialResponse(Object chunkContent) { AiResponse response = new AiResponse(); response.setCode(CODE_SUCCESS); response.setDescription("成功"); response.setResContext(chunkContent.toString()); return response; } private static final Pattern JSON_PATTERN = Pattern.compile( "<\\|FunctionCallBegin\\|>(.*?)<\\|FunctionCallEnd\\|>", Pattern.DOTALL ); @Override public Report extractReportData(String modelOutput) { Matcher matcher = JSON_PATTERN.matcher(modelOutput); if (!matcher.find()) { log.warn("未匹配到FunctionCall内容,原始输出长度: {}", modelOutput); return null; } String jsonContent = matcher.group(1); log.debug("提取到的JSON内容长度: {}", jsonContent.length()); try { return objectMapper.readValue(jsonContent, Report.class); } catch (JsonProcessingException e) { log.error("JSON解析失败,原始内容长度: {}", jsonContent.length(), e); Report repairedReport = tryRepairTruncatedJson(jsonContent); if (repairedReport != null) { log.info("成功修复截断的JSON"); return repairedReport; } return null; } } // 修改服务实现 @Override public Flux answerStream(String question) { log.info("----- standard request -----"); final ChatMessage systemMessage = ChatMessage.builder() .role(ChatMessageRole.SYSTEM) .content("你是豆包,是由字节跳动开发的 AI 人工智能助手") .build(); final ChatMessage userMessage = ChatMessage.builder() .role(ChatMessageRole.USER) .content(question) .build(); List messages = Arrays.asList(systemMessage, userMessage); ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder() .model("ep-20250805124033-lhxbf") .messages(messages) .stream(true) .thinking(new ChatCompletionRequest.ChatCompletionRequestThinking("enabled")) .temperature(0.7) .topP(0.9) .maxTokens(2048) .frequencyPenalty(0.0) .build(); return Flux.from(service.streamChatCompletion(chatCompletionRequest)) .map(response -> { if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) { return new FebsResponse().success().data("END"); } ChatCompletionChoice choice = response.getChoices().get(0); if (choice == null || choice.getMessage() == null) { return new FebsResponse().success().data("END"); } ApiMemberTalkStreamVo apiMemberTalkStreamVo = new ApiMemberTalkStreamVo(); // 判断是否触发深度思考,触发则打印模型输出的思维链内容 ChatMessage message = choice.getMessage(); if (message.getReasoningContent()!= null &&!message.getReasoningContent().isEmpty()) { apiMemberTalkStreamVo.setReasoningContent(message.getReasoningContent()); System.out.print(message.getReasoningContent()); } String content = message.getContent() == null ? "" : message.getContent().toString(); apiMemberTalkStreamVo.setContent(content); System.out.print(content); return new FebsResponse().success().data(apiMemberTalkStreamVo); }) .onErrorResume(throwable -> { log.error("流式调用AI服务失败,问题输入: {}", question, throwable); FebsResponse errorResponse = new FebsResponse().message("AI服务调用失败"); return Flux.just(errorResponse); }); } private Report tryRepairTruncatedJson(String truncatedJson) { String[] repairAttempts = { truncatedJson + "\"}}}", truncatedJson + "}}}", truncatedJson + "}}" }; for (String attempt : repairAttempts) { try { return objectMapper.readValue(attempt, Report.class); } catch (JsonProcessingException e) { log.debug("修复尝试失败: {}", attempt); continue; } } log.warn("无法修复截断的JSON,原始内容长度: {}", truncatedJson.length()); return null; } private AiResponse buildErrorResponse(String code, String description) { AiResponse response = new AiResponse(); response.setCode(code); response.setDescription(description); return response; } private AiResponse buildSuccessResponse(String result) { AiResponse response = new AiResponse(); response.setCode(CODE_SUCCESS); response.setDescription("成功"); response.setResContext(result); return response; } private AiResponse buildSuccessResponse(Report report, String result) { AiResponse response = new AiResponse(); response.setCode(CODE_SUCCESS); response.setDescription("成功"); response.setResContext(result); response.setReport(report); return response; } }