Skip to content

Commit c62ea91

Browse files
committed
KG-576. Finalize pipeline features after agent run for the StatefulSingleUseAIAgent
- Finalize the feature processors after an agent run. After an agent run, it does not make sense to keep running feature processors. We can safely close them and re-open later on the second run if needed; - Add tests to verify graph and functional agents close processors for features.
1 parent 2fb8ad7 commit c62ea91

File tree

16 files changed

+471
-284
lines changed

16 files changed

+471
-284
lines changed

agents/agents-core/src/commonMain/kotlin/ai/koog/agents/core/agent/GraphAIAgent.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import ai.koog.agents.core.feature.config.FeatureConfig
1616
import ai.koog.agents.core.feature.pipeline.AIAgentGraphPipeline
1717
import ai.koog.agents.core.tools.ToolRegistry
1818
import ai.koog.prompt.executor.model.PromptExecutor
19-
import ai.koog.utils.io.Closeable
2019
import io.github.oshai.kotlinlogging.KotlinLogging
2120
import kotlinx.datetime.Clock
2221
import kotlin.reflect.KType
@@ -59,7 +58,7 @@ public open class GraphAIAgent<Input, Output>(
5958
) : StatefulSingleUseAIAgent<Input, Output, AIAgentGraphContextBase>(
6059
logger = logger,
6160
id = id,
62-
), Closeable {
61+
) {
6362

6463
private companion object {
6564
private val logger = KotlinLogging.logger {}

agents/agents-core/src/commonMain/kotlin/ai/koog/agents/core/agent/StatefulSingleUseAIAgent.kt

Lines changed: 65 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import kotlin.uuid.Uuid
1818
/**
1919
* Abstract base class representing a single-use AI agent with state.
2020
*
21-
* This AI agent is designed to execute a specific long-running strategy only once, and provides API to monitor and manage it's state.
21+
* This AI agent is designed to execute a specific long-running strategy only once and provides an API to monitor and manage its state.
2222
*
2323
* It maintains internal states including its running status, whether it was started, its result (if available), and
2424
* the root context associated with its execution. The class enforces safe state transitions and provides
@@ -86,87 +86,85 @@ public abstract class StatefulSingleUseAIAgent<Input, Output, TContext : AIAgent
8686
}
8787

8888
val runId = Uuid.random().toString()
89-
90-
pipeline.prepareFeatures()
89+
val context = prepareContext(agentInput, runId)
9190

9291
return withContext(
9392
AgentRunInfoContextElement(
94-
agentId = this@StatefulSingleUseAIAgent.id,
93+
agentId = id,
9594
runId = runId,
9695
agentConfig = agentConfig,
9796
strategyName = strategy.name
9897
)
9998
) {
100-
val context = prepareContext(agentInput, runId)
99+
pipeline.withPreparedPipeline {
100+
agentStateMutex.withLock {
101+
state = State.Running(context)
102+
}
101103

102-
agentStateMutex.withLock {
103-
state = State.Running(context)
104-
}
104+
logger.debug {
105+
formatLog(
106+
agentId = id,
107+
runId = runId,
108+
message = "Starting agent execution"
109+
)
110+
}
105111

106-
logger.debug {
107-
formatLog(
108-
agentId = this@StatefulSingleUseAIAgent.id,
112+
pipeline.onAgentStarting<Input, Output>(
109113
runId = runId,
110-
message = "Starting agent execution"
114+
agent = this@StatefulSingleUseAIAgent,
115+
context = context
111116
)
112-
}
113-
114-
pipeline.onAgentStarting<Input, Output>(
115-
runId = runId,
116-
agent = this@StatefulSingleUseAIAgent,
117-
context = context
118-
)
119117

120-
val result = try {
121-
strategy.execute(context = context, input = agentInput)
122-
} catch (e: Throwable) {
123-
logger.error(e) { "Execution exception reported by server!" }
124-
pipeline.onAgentExecutionFailed(
125-
agentId = this@StatefulSingleUseAIAgent.id,
126-
runId = runId,
127-
throwable = e
128-
)
129-
agentStateMutex.withLock { state = State.Failed(e) }
130-
throw e
131-
}
118+
val result = try {
119+
strategy.execute(context = context, input = agentInput)
120+
} catch (e: Throwable) {
121+
logger.error(e) { "Execution exception reported by server!" }
122+
pipeline.onAgentExecutionFailed(
123+
agentId = id,
124+
runId = runId,
125+
throwable = e
126+
)
127+
agentStateMutex.withLock { state = State.Failed(e) }
128+
throw e
129+
}
132130

133-
logger.debug {
134-
formatLog(
135-
agentId = this@StatefulSingleUseAIAgent.id,
131+
logger.debug {
132+
formatLog(
133+
agentId = id,
134+
runId = runId,
135+
message = "Finished agent execution"
136+
)
137+
}
138+
pipeline.onAgentCompleted(
139+
agentId = id,
136140
runId = runId,
137-
message = "Finished agent execution"
141+
result = result
138142
)
139-
}
140-
pipeline.onAgentCompleted(
141-
agentId = this@StatefulSingleUseAIAgent.id,
142-
runId = runId,
143-
result = result
144-
)
145143

146-
agentStateMutex.withLock {
147-
state = if (result != null) {
148-
State.Finished(result)
149-
} else {
150-
State.Failed(Exception("result is null"))
144+
agentStateMutex.withLock {
145+
state = if (result != null) {
146+
State.Finished(result)
147+
} else {
148+
State.Failed(Exception("result is null"))
149+
}
151150
}
152-
}
153151

154-
return@withContext result ?: error("result is null")
152+
result ?: error("result is null")
153+
}
155154
}
156155
}
157156

158157
/**
159158
* Closes the AI Agent and performs necessary cleanup operations.
160159
*
161160
* This method is a suspending function that ensures that the AI Agent's resources are released
162-
* when it is no longer needed. It notifies the pipeline of the agent's closure and ensures
161+
* when it is no longer required. It notifies the pipeline of the agent's closure and ensures
163162
* that any associated features or stream providers are properly closed.
164163
*
165164
* Overrides the `close` method to implement agent-specific shutdown logic.
166165
*/
167166
final override suspend fun close() {
168-
pipeline.onAgentClosing(agentId = this@StatefulSingleUseAIAgent.id)
169-
pipeline.closeFeaturesStreamProviders()
167+
// TODO: Delete Closeable interface from [AIAgent] declaration.
170168
}
171169

172170
/**
@@ -202,6 +200,22 @@ public abstract class StatefulSingleUseAIAgent<Input, Output, TContext : AIAgent
202200
*/
203201
protected fun formatLog(agentId: String, runId: String, message: String): String =
204202
"[agent id: $agentId, run id: $runId] $message"
203+
204+
/**
205+
* Executes the provided block within a prepared pipeline context.
206+
* Ensures that the necessary feature resources are initialized before the block is invoked
207+
* and properly cleaned up after finishing the block.
208+
*
209+
* @return The result of the block's execution.
210+
*/
211+
private suspend fun <T> AIAgentPipeline.withPreparedPipeline(block: suspend () -> T): T =
212+
try {
213+
prepareAllFeatures()
214+
block.invoke()
215+
} finally {
216+
onAgentClosing(agentId = id)
217+
closeAllFeaturesMessageProcessors()
218+
}
205219
}
206220

207221
/**

agents/agents-core/src/commonMain/kotlin/ai/koog/agents/core/feature/pipeline/AIAgentFunctionalPipeline.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ public class AIAgentFunctionalPipeline(clock: Clock = Clock.System) : AIAgentPip
3333
pipeline = this,
3434
)
3535

36-
registeredFeatures[feature.key] = RegisteredFeature(featureImpl, featureConfig)
36+
super.install(feature.key, featureConfig, featureImpl)
3737
}
3838
}

agents/agents-core/src/commonMain/kotlin/ai/koog/agents/core/feature/pipeline/AIAgentGraphPipeline.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,12 @@ public class AIAgentGraphPipeline(clock: Clock = Clock.System) : AIAgentPipeline
4848
* The feature's message processors are initialized during installation.
4949
*
5050
* @param TConfig The type of the feature configuration
51-
* @param TFeature The type of the feature being installed
51+
* @param TFeatureImpl The type of the feature being installed
5252
* @param feature The feature implementation to be installed
5353
* @param configure A lambda to customize the feature configuration
5454
*/
55-
public fun <TConfig : FeatureConfig, TFeature : Any> install(
56-
feature: AIAgentGraphFeature<TConfig, TFeature>,
55+
public fun <TConfig : FeatureConfig, TFeatureImpl : Any> install(
56+
feature: AIAgentGraphFeature<TConfig, TFeatureImpl>,
5757
configure: TConfig.() -> Unit,
5858
) {
5959
val featureConfig = feature.createInitialConfig().apply { configure() }
@@ -62,7 +62,7 @@ public class AIAgentGraphPipeline(clock: Clock = Clock.System) : AIAgentPipeline
6262
pipeline = this,
6363
)
6464

65-
registeredFeatures[feature.key] = RegisteredFeature(featureImpl, featureConfig)
65+
super.install(feature.key, featureConfig, featureImpl)
6666
}
6767

6868
//region Trigger Node Handlers

0 commit comments

Comments
 (0)