package cc.mrbird.febs.ai.strategy.Impl; import cc.mrbird.febs.ai.entity.AiTalkItem; import cc.mrbird.febs.ai.res.ai.Report; import cc.mrbird.febs.ai.res.memberTalk.ApiMemberTalkStreamVo; import cc.mrbird.febs.ai.strategy.LlmStrategyService; import cc.mrbird.febs.ai.strategy.param.LlmStrategyDto; import cc.mrbird.febs.common.entity.FebsResponse; import cc.mrbird.febs.common.exception.FebsException; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.dashscope.common.Message; import com.alibaba.dashscope.common.Role; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.volcengine.ark.runtime.model.completion.chat.*; import com.volcengine.ark.runtime.service.ArkService; import okhttp3.ConnectionPool; import okhttp3.Dispatcher; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Component("HsLlmStrategyService") public class HsLlmStrategyServiceImpl implements LlmStrategyService { private ArkService service; private static final String LinkId = "ep-20250805124033-lhxbf"; @PostConstruct public void init() { // 增加连接池大小和存活时间 ConnectionPool connectionPool = new ConnectionPool(32, 60, TimeUnit.SECONDS); Dispatcher dispatcher = new Dispatcher(); // 增加并发请求数量 dispatcher.setMaxRequests(128); dispatcher.setMaxRequestsPerHost(32); this.service = ArkService.builder() .dispatcher(dispatcher) .connectionPool(connectionPool) .baseUrl("https://ark.cn-beijing.volces.com/api/v3") .ak("AKLTZTQxZjMyZTUxMWJmNDEyNDkzNWExOGQ3ODllNzhhNmQ") .sk("TmpFeE1qZ3haREExTW1JeE5HRTBZVGc1WlRRNVlqWXpORGd5TWpsak5HWQ") .build(); } @PreDestroy public void destroy() { if (service != null) { service.shutdownExecutor(); } } private List getMessages(List dto) { List messages = new ArrayList<>(); for (LlmStrategyDto dtoItem : dto){ if (StrUtil.equals(dtoItem.getRole(), ChatMessageRole.SYSTEM.value())){ messages.add(ChatMessage.builder() .role(ChatMessageRole.SYSTEM) .content(dtoItem.getContent()) .build()); } if (StrUtil.equals(dtoItem.getRole(), ChatMessageRole.USER.value())){ messages.add(ChatMessage.builder() .role(ChatMessageRole.USER) .content(dtoItem.getContent()) .build()); } if (StrUtil.equals(dtoItem.getRole(), ChatMessageRole.ASSISTANT.value())){ messages.add(ChatMessage.builder() .role(ChatMessageRole.ASSISTANT) .content(dtoItem.getContent()) .build()); } } return messages; } @Override public FebsResponse llmInvokeNonStreaming(List dto) { if (CollUtil.isEmpty(dto)){ throw new FebsException("火山大模型初始化异常"); } List messages = getMessages(dto); String result = ""; try { ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder() .model(LinkId) .messages(messages) .stream(false) .temperature(0.7) // 降低温度参数,提高确定性,可能提升速度 .topP(0.9) // 调整topP参数 .maxTokens(2048) // 减少最大token数 .frequencyPenalty(0.0) .build(); List choices = service.createChatCompletion(chatCompletionRequest).getChoices(); result = choices.stream() .map(choice -> choice.getMessage().getContent()) .filter(contentObj -> contentObj != null) .map(Object::toString) .collect(Collectors.joining()); } catch (Exception e) { throw new FebsException(StrUtil.format("火山大模型调用异常:{}", e.getMessage())); } return new FebsResponse().success().data(result); } @Override public Flux llmInvokeStreaming(List dto) { if (CollUtil.isEmpty(dto)){ throw new FebsException("火山大模型初始化异常"); } List messages = getMessages(dto); ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder() .model(LinkId) .messages(messages) .stream(true) .thinking(new ChatCompletionRequest.ChatCompletionRequestThinking("enabled")) .temperature(0.7) .topP(0.9) .maxTokens(2048) .frequencyPenalty(0.0) .build(); return Flux.from(service.streamChatCompletion(chatCompletionRequest)) .map(response -> { if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) { return new FebsResponse().success().data("未响应,请重试"); } ChatCompletionChoice choice = response.getChoices().get(0); if (choice == null || choice.getMessage() == null) { return new FebsResponse().success().data("END"); } ChatMessage message = choice.getMessage(); ApiMemberTalkStreamVo apiMemberTalkStreamVo = new ApiMemberTalkStreamVo(); // 处理 reasoning content String reasoningContent = message.getReasoningContent(); if (StrUtil.isNotEmpty(reasoningContent)) { apiMemberTalkStreamVo.setReasoningContent(reasoningContent); } // 安全处理 content String content = ""; if (message.getContent() != null) { content = message.getContent().toString(); } apiMemberTalkStreamVo.setContent(content); return new FebsResponse().success().data(apiMemberTalkStreamVo); }) .onErrorResume(throwable -> { throw new FebsException(StrUtil.format("火山大模型流式调用AI服务失:{}",throwable)); }); } }