src/main/java/cc/mrbird/febs/ai/strategy/Impl/HsLlmStrategyServiceImpl.java
@@ -1,20 +1,204 @@
package cc.mrbird.febs.ai.strategy.Impl;
import cc.mrbird.febs.ai.strategy.LlmStrategyService;
import cc.mrbird.febs.ai.strategy.enumerates.LlmStrategyContextEnum;
import cc.mrbird.febs.ai.strategy.param.LlmStrategyDto;
import cc.mrbird.febs.common.entity.FebsResponse;
import cc.mrbird.febs.common.exception.FebsException;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.volcengine.ark.runtime.model.completion.chat.*;
import com.volcengine.ark.runtime.service.ArkService;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Component("HsLlmStrategyService")
public class HsLlmStrategyServiceImpl implements LlmStrategyService {
    private ArkService service;
    private static final String ak = "AKLTZTQxZjMyZTUxMWJmNDEyNDkzNWExOGQ3ODllNzhhNmQ";
    private static final String sk = "TmpFeE1qZ3haREExTW1JeE5HRTBZVGc1WlRRNVlqWXpORGd5TWpsak5HWQ==";
    private static final String baseUrl = "https://ark.cn-beijing.volces.com/api/v3";
    private static final String LinkId = "ep-20250805124033-lhxbf";
    private static final Double temperature = 0.7;
    private static final Double topP = 0.9;
    private static final Integer maxTokens = 2048;
    private static final Double frequencyPenalty = 0.0;
    @PostConstruct
    public void init() {
        // 增加连接池大小和存活时间
        ConnectionPool connectionPool = new ConnectionPool(32, 60, TimeUnit.SECONDS);
        Dispatcher dispatcher = new Dispatcher();
        // 增加并发请求数量
        dispatcher.setMaxRequests(128);
        dispatcher.setMaxRequestsPerHost(32);
        this.service = ArkService.builder()
                .dispatcher(dispatcher)
                .connectionPool(connectionPool)
                .baseUrl(baseUrl)
                .ak(ak)
                .sk(sk)
                .build();
    }
    @PreDestroy
    public void destroy() {
        if (service != null) {
            service.shutdownExecutor();
        }
    }
    private List<ChatMessage> getMessages(List<LlmStrategyDto> dto) {
        List<ChatMessage> messages = new ArrayList<>();
        for (LlmStrategyDto dtoItem : dto){
            if (StrUtil.equals(dtoItem.getRole(), ChatMessageRole.SYSTEM.value())){
                messages.add(ChatMessage.builder()
                        .role(ChatMessageRole.SYSTEM)
                        .content(dtoItem.getContent())
                        .build());
            }
            if (StrUtil.equals(dtoItem.getRole(), ChatMessageRole.USER.value())){
                messages.add(ChatMessage.builder()
                        .role(ChatMessageRole.USER)
                        .content(dtoItem.getContent())
                        .build());
            }
            if (StrUtil.equals(dtoItem.getRole(), ChatMessageRole.ASSISTANT.value())){
                messages.add(ChatMessage.builder()
                        .role(ChatMessageRole.ASSISTANT)
                        .content(dtoItem.getContent())
                        .build());
            }
        }
        return messages;
    }
    @Override
    public FebsResponse llmInvokeNonStreaming(LlmStrategyDto dto) {
        return null;
    public FebsResponse llmInvokeNonStreaming(List<LlmStrategyDto> dto) {
        if (CollUtil.isEmpty(dto)){
            throw new FebsException("火山大模型初始化异常");
        }
        List<ChatMessage> messages = getMessages(dto);
        String result = "";
        try {
            ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                    .model(LinkId)
                    .messages(messages)
                    .stream(false)
                    .temperature(temperature) // 降低温度参数,提高确定性,可能提升速度
                    .topP(topP)        // 调整topP参数
                    .maxTokens(maxTokens)  // 减少最大token数
                    .frequencyPenalty(frequencyPenalty)
                    .build();
            List<ChatCompletionChoice> choices = service.createChatCompletion(chatCompletionRequest).getChoices();
            result = choices.stream()
                    .map(choice -> choice.getMessage().getContent())
                    .filter(contentObj -> contentObj != null)
                    .map(Object::toString)
                    .collect(Collectors.joining());
        } catch (Exception e) {
            throw new FebsException(StrUtil.format("火山大模型调用异常:{}", e.getMessage()));
        }
        return new FebsResponse().success().data(result);
    }
    @Override
    public Flux<FebsResponse> llmInvokeStreaming(LlmStrategyDto dto) {
        return null;
    public Flux<FebsResponse> llmInvokeStreamingWithThink(List<LlmStrategyDto> dto) {
        if (CollUtil.isEmpty(dto)){
            throw new FebsException("火山大模型初始化异常");
        }
        List<ChatMessage> messages = getMessages(dto);
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                .model(LinkId)
                .messages(messages)
                .stream(true)
                .thinking(new ChatCompletionRequest.ChatCompletionRequestThinking("enabled"))
                .temperature(temperature) // 降低温度参数,提高确定性,可能提升速度
                .topP(topP)        // 调整topP参数
                .maxTokens(maxTokens)  // 减少最大token数
                .frequencyPenalty(frequencyPenalty)
                .build();
        return Flux.from(service.streamChatCompletion(chatCompletionRequest))
                .map(response -> {
                    if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) {
                        return new FebsResponse().success().data("未响应,请重试");
                    }
                    ChatCompletionChoice choice = response.getChoices().get(0);
                    if (choice == null || choice.getMessage() == null) {
                        return new FebsResponse().success().data("END");
                    }
                    ChatMessage message = choice.getMessage();
                    HashMap<String, String> stringStringHashMap = new HashMap<>();
                    if (ObjectUtil.isNotEmpty(message.getReasoningContent())) {
                        stringStringHashMap.put(LlmStrategyContextEnum.THINK.name(),message.getReasoningContent().toString());
                    }
                    if (ObjectUtil.isNotEmpty(message.getContent())) {
                        stringStringHashMap.put(LlmStrategyContextEnum.CONTENT.name(),message.getContent().toString());
                    }
                    return new FebsResponse().success().data(stringStringHashMap);
                })
                .onErrorResume(throwable -> {
                    throw new FebsException(StrUtil.format("火山大模型流式调用AI服务失:{}",throwable));
                });
    }
    @Override
    public Flux<FebsResponse> llmInvokeStreamingNoThink(List<LlmStrategyDto> dto) {
        if (CollUtil.isEmpty(dto)){
            throw new FebsException("火山大模型初始化异常");
        }
        List<ChatMessage> messages = getMessages(dto);
        ChatCompletionRequest chatCompletionRequest = ChatCompletionRequest.builder()
                .model(LinkId)
                .messages(messages)
                .stream(true)
                .temperature(temperature) // 降低温度参数,提高确定性,可能提升速度
                .topP(topP)        // 调整topP参数
                .maxTokens(maxTokens)  // 减少最大token数
                .frequencyPenalty(frequencyPenalty)
                .build();
        return Flux.from(service.streamChatCompletion(chatCompletionRequest))
                .map(response -> {
                    if (response == null || response.getChoices() == null || response.getChoices().isEmpty()) {
                        return new FebsResponse().success().data("未响应,请重试");
                    }
                    ChatCompletionChoice choice = response.getChoices().get(0);
                    if (choice == null || choice.getMessage() == null) {
                        return new FebsResponse().success().data("END");
                    }
                    ChatMessage message = choice.getMessage();
                    HashMap<String, String> stringStringHashMap = new HashMap<>();
                    if (ObjectUtil.isNotEmpty(message.getContent())) {
                        stringStringHashMap.put(LlmStrategyContextEnum.CONTENT.name(),message.getContent().toString());
                    }
                    return new FebsResponse().success().data(stringStringHashMap);
                })
                .onErrorResume(throwable -> {
                    throw new FebsException(StrUtil.format("火山大模型流式调用AI服务失:{}",throwable));
                });
    }
}