Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.embabel.agent.api.common.support

import com.embabel.agent.api.common.*
import com.embabel.agent.api.common.support.streaming.StreamingCapabilityDetector
import com.embabel.agent.spi.support.streaming.StreamingCapabilityDetector
import com.embabel.agent.api.tool.ArtifactSinkingTool
import com.embabel.agent.api.tool.Tool
import com.embabel.agent.api.tool.ToolCallContext
Expand All @@ -35,11 +35,10 @@ import com.embabel.agent.core.internal.LlmOperations
import com.embabel.agent.core.support.LlmInteraction
import com.embabel.agent.core.support.safelyGetTools
import com.embabel.agent.experimental.primitive.Determination
import com.embabel.agent.core.internal.streaming.StreamingLlmOperationsFactory
import com.embabel.agent.spi.loop.ToolChainingInjectionStrategy
import com.embabel.agent.spi.loop.ToolInjectionStrategy
import com.embabel.agent.spi.loop.ToolNotFoundPolicy
import com.embabel.agent.spi.support.springai.ChatClientLlmOperations
import com.embabel.agent.spi.support.springai.streaming.StreamingChatClientOperations
import com.embabel.chat.AssistantMessage
import com.embabel.chat.ImagePart
import com.embabel.chat.Message
Expand Down Expand Up @@ -325,8 +324,7 @@ internal data class OperationContextDelegate(
}

override fun generateStream(): Flux<String> {
val llmOperations = context.agentPlatform().platformServices.llmOperations as ChatClientLlmOperations
val streamingLlmOperations = StreamingChatClientOperations(llmOperations)
val streamingLlmOperations = streamingFactory().createStreamingOperations(llm)

return streamingLlmOperations.generateStream(
messages = messages,
Expand All @@ -337,8 +335,7 @@ internal data class OperationContextDelegate(
}

override fun <T> createObjectStream(itemClass: Class<T>): Flux<T> {
val llmOperations = context.agentPlatform().platformServices.llmOperations as ChatClientLlmOperations
val streamingLlmOperations = StreamingChatClientOperations(llmOperations)
val streamingLlmOperations = streamingFactory().createStreamingOperations(llm)

return streamingLlmOperations.createObjectStream(
messages = messages,
Expand All @@ -350,8 +347,7 @@ internal data class OperationContextDelegate(
}

override fun <T> createObjectStreamWithThinking(itemClass: Class<T>): Flux<StreamingEvent<T>> {
val llmOperations = context.agentPlatform().platformServices.llmOperations as ChatClientLlmOperations
val streamingLlmOperations = StreamingChatClientOperations(llmOperations)
val streamingLlmOperations = streamingFactory().createStreamingOperations(llm)
return streamingLlmOperations.createObjectStreamWithThinking(
messages = messages,
interaction = streamingInteraction(),
Expand Down Expand Up @@ -381,6 +377,15 @@ internal data class OperationContextDelegate(
)
}

private fun streamingFactory(): StreamingLlmOperationsFactory {
val llmOperations = context.agentPlatform().platformServices.llmOperations
return llmOperations as? StreamingLlmOperationsFactory
?: throw UnsupportedOperationException(
"Streaming not supported: LlmOperations (${llmOperations::class.simpleName}) " +
"does not implement StreamingLlmOperationsFactory"
)
}

override fun supportsThinking(): Boolean = true

// Patterned after createObject() - uses ProcessContext flow
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.embabel.agent.spi.streaming
package com.embabel.agent.core.internal.streaming

import com.embabel.agent.api.event.LlmRequestEvent
import com.embabel.agent.core.Action
Expand All @@ -22,12 +22,13 @@ import com.embabel.agent.core.support.LlmInteraction
import com.embabel.chat.Message
import com.embabel.chat.UserMessage
import com.embabel.common.core.streaming.StreamingEvent
import org.jetbrains.annotations.ApiStatus
import reactor.core.publisher.Flux

/**
* Streaming extension of LlmOperations for real-time LLM response processing.
*
* This SPI interface provides reactive streaming capabilities that support
* This internal interface provides reactive streaming capabilities that support
* the API layer StreamingPromptRunner interfaces, enabling:
* - Real-time processing of LLM responses as they arrive
* - Streaming lists of objects from JSONL responses
Expand All @@ -37,6 +38,7 @@ import reactor.core.publisher.Flux
* All streaming methods return Project Reactor Flux streams for integration
* with Spring WebFlux and other reactive frameworks.
*/
@ApiStatus.Internal
interface StreamingLlmOperations {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024-2026 Embabel Pty Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.embabel.agent.core.internal.streaming

import com.embabel.common.ai.model.LlmOptions
import org.jetbrains.annotations.ApiStatus

/**
* Factory interface for creating [StreamingLlmOperations] instances.
*
* This interface is separate from [com.embabel.agent.core.internal.LlmOperations]
* to maintain interface segregation. Implementations can choose to implement
* both interfaces or just one, depending on their capabilities.
*
* @see StreamingLlmOperations
* @see com.embabel.agent.core.internal.LlmOperations
*/
@ApiStatus.Internal
interface StreamingLlmOperationsFactory {

Check warning on line 32 in embabel-agent-api/src/main/kotlin/com/embabel/agent/core/internal/streaming/StreamingLlmOperationsFactory.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Make this interface functional or replace it with a function type.

See more on https://sonarcloud.io/project/issues?id=embabel_embabel-agent&issues=AZ2I4KUOyB-eWHH5WTa_&open=AZ2I4KUOyB-eWHH5WTa_&pullRequest=1600

/**
* Create a [StreamingLlmOperations] instance configured with the given options.
*
* @param options LLM options including model selection criteria
* @return A streaming operations instance for the selected LLM
*/
fun createStreamingOperations(options: LlmOptions): StreamingLlmOperations
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.embabel.agent.spi

import com.embabel.agent.spi.loop.LlmMessageSender
import com.embabel.agent.spi.loop.streaming.LlmMessageStreamer
import com.embabel.common.ai.model.LlmMetadata
import com.embabel.common.ai.model.LlmOptions
import com.embabel.common.ai.prompt.PromptContributor
Expand Down Expand Up @@ -48,6 +49,27 @@ interface LlmService<THIS : LlmService<THIS>> : LlmMetadata, PromptContributorCo
*/
fun createMessageSender(options: LlmOptions): LlmMessageSender

/**
* Create a message streamer for this LLM configured with the given options.
*
* The message streamer handles streaming LLM API calls. Tool execution is
* handled internally by the underlying LLM framework during streaming.
*
* @param options Configuration options for the LLM call (temperature, max tokens, etc.)
* @return A message streamer configured for this LLM
*/
fun createMessageStreamer(options: LlmOptions): LlmMessageStreamer

/**
* Check if this LLM service supports streaming operations.
*
* Each LlmService instance is bound to a specific model, so this checks
* whether that particular model supports streaming.
*
* @return true if the underlying model supports streaming, false otherwise
*/
fun supportsStreaming(): Boolean

/**
* Returns a copy of this LLM service with the specified knowledge cutoff date.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import com.embabel.agent.api.tool.Tool
import com.embabel.agent.core.Action
import com.embabel.agent.core.AgentProcess
import com.embabel.agent.core.internal.LlmOperations
import com.embabel.agent.core.internal.streaming.StreamingLlmOperations
import com.embabel.agent.core.internal.streaming.StreamingLlmOperationsFactory
import com.embabel.agent.spi.support.streaming.StreamingLlmOperationsImpl
import com.embabel.agent.core.support.InvalidLlmReturnTypeException
import com.embabel.agent.core.support.LlmInteraction
import com.embabel.agent.spi.AutoLlmSelectionCriteriaResolver
Expand All @@ -37,6 +40,7 @@ import com.embabel.common.ai.model.ModelSelectionCriteria
import com.embabel.common.ai.model.PreResolvedModelSelectionCriteria
import com.embabel.common.core.thinking.ThinkingResponse
import com.embabel.common.util.time
import com.fasterxml.jackson.databind.ObjectMapper
import jakarta.validation.ConstraintViolation
import jakarta.validation.Validator
import java.lang.reflect.Field
Expand Down Expand Up @@ -67,7 +71,8 @@ abstract class AbstractLlmOperations(
protected val dataBindingProperties: LlmDataBindingProperties,
protected val promptsProperties: LlmOperationsPromptsProperties = LlmOperationsPromptsProperties(),
protected val asyncer: Asyncer,
) : LlmOperations {
internal open val objectMapper: ObjectMapper,
) : LlmOperations, StreamingLlmOperationsFactory {

protected val logger: Logger = LoggerFactory.getLogger(javaClass)

Expand Down Expand Up @@ -419,6 +424,17 @@ abstract class AbstractLlmOperations(
return modelProvider.getLlm(crit)
}

override fun createStreamingOperations(options: LlmOptions): StreamingLlmOperations {
val llmService = chooseLlm(options)
val messageStreamer = llmService.createMessageStreamer(options)
return StreamingLlmOperationsImpl(
messageStreamer = messageStreamer,
objectMapper = objectMapper,
llmService = llmService,
toolDecorator = toolDecorator,
)
}

protected abstract fun <O> doTransformIfPossible(
messages: List<Message>,
interaction: LlmInteraction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ import java.time.Duration
import java.time.Instant
import javax.annotation.concurrent.ThreadSafe

const val PROMPT_ELEMENT_SEPARATOR = "\n----\n"

/**
* Output converter abstraction for parsing LLM output.
* Framework-agnostic interface that can be implemented by Spring AI converters or others.
Expand Down Expand Up @@ -106,7 +104,7 @@ open class ToolLoopLlmOperations(
dataBindingProperties: LlmDataBindingProperties = LlmDataBindingProperties(),
autoLlmSelectionCriteriaResolver: AutoLlmSelectionCriteriaResolver = AutoLlmSelectionCriteriaResolver.DEFAULT,
promptsProperties: LlmOperationsPromptsProperties = LlmOperationsPromptsProperties(),
internal open val objectMapper: ObjectMapper = jacksonObjectMapper().registerModule(JavaTimeModule()),
objectMapper: ObjectMapper = jacksonObjectMapper().registerModule(JavaTimeModule()),
protected val observationRegistry: ObservationRegistry = ObservationRegistry.NOOP,
asyncer: Asyncer = ExecutorAsyncer(java.util.concurrent.Executors.newCachedThreadPool()),
protected val toolLoopFactory: ToolLoopFactory = ToolLoopFactory.create(ToolLoopConfiguration(), asyncer, AutoCorrectionPolicy()),
Expand All @@ -120,6 +118,7 @@ open class ToolLoopLlmOperations(
autoLlmSelectionCriteriaResolver = autoLlmSelectionCriteriaResolver,
promptsProperties = promptsProperties,
asyncer = asyncer,
objectMapper = objectMapper,
) {

override fun <O> doTransform(
Expand Down Expand Up @@ -631,10 +630,7 @@ open class ToolLoopLlmOperations(
protected fun buildPromptContributions(
interaction: LlmInteraction,
llm: LlmService<*>,
): String {
return (interaction.promptContributors + llm.promptContributors)
.joinToString(PROMPT_ELEMENT_SEPARATOR) { it.contribution() }
}
): String = buildPromptContributionsString(interaction.promptContributors, llm.promptContributors)

/**
* Build initial messages for the tool loop, including system prompt contributions and schema.
Expand Down
Loading
Loading