diff --git a/core/src/commonMain/kotlin/com/google/adk/kt/apps/App.kt b/core/src/commonMain/kotlin/com/google/adk/kt/apps/App.kt index 370e7a8..e78e7e3 100644 --- a/core/src/commonMain/kotlin/com/google/adk/kt/apps/App.kt +++ b/core/src/commonMain/kotlin/com/google/adk/kt/apps/App.kt @@ -17,6 +17,7 @@ package com.google.adk.kt.apps import com.google.adk.kt.agents.BaseAgent +import com.google.adk.kt.summarizer.EventsCompactionConfig /** * Represents an LLM-backed agentic application. @@ -31,8 +32,14 @@ import com.google.adk.kt.agents.BaseAgent * * @property appName The application name. * @property rootAgent The root agent of the application's agent tree. + * @property eventsCompactionConfig Optional configuration controlling context-compaction strategies + * for sessions of this application. When `null`, no compaction runs. */ -data class App(val appName: String, val rootAgent: BaseAgent) { +data class App( + val appName: String, + val rootAgent: BaseAgent, + val eventsCompactionConfig: EventsCompactionConfig? = null, +) { init { require(IDENTIFIER_REGEX.matches(appName)) { "Invalid app name '$appName': must be a valid identifier consisting of letters, digits, " + diff --git a/core/src/commonMain/kotlin/com/google/adk/kt/processors/HistoryRewriterProcessor.kt b/core/src/commonMain/kotlin/com/google/adk/kt/processors/HistoryRewriterProcessor.kt index a5ac0e6..af978a9 100644 --- a/core/src/commonMain/kotlin/com/google/adk/kt/processors/HistoryRewriterProcessor.kt +++ b/core/src/commonMain/kotlin/com/google/adk/kt/processors/HistoryRewriterProcessor.kt @@ -50,17 +50,27 @@ internal class HistoryRewriterProcessor { shouldIncludeEventInContext(currentBranch, it) } - // Process events + // Process events. Compaction events are kept here (they carry their summary in + // actions.compaction rather than content) so processCompactionEvents can expand them below. val filteredEvents = rawFilteredEvents.mapNotNull { event -> when { + event.actions.compaction != null -> event event.content == null -> null isOtherAgentReply(agentName, event) -> presentOtherAgentMessage(event) else -> event } } + // Replace each compaction event with its summary and drop the raw events it covers. + val eventsWithCompactionApplied = + if (filteredEvents.any { it.actions.compaction != null }) { + processCompactionEvents(filteredEvents) + } else { + filteredEvents + } + // Rearrange for latest function response (merge scenarios) and async function responses - return filteredEvents + return eventsWithCompactionApplied .let { rearrangeEventsForLatestFunctionResponse(it) } .let { rearrangeEventsForAsyncFunctionResponsesInHistory(it) } .mapNotNull { event -> @@ -69,6 +79,72 @@ internal class HistoryRewriterProcessor { } } + /** + * Processes events by applying compaction. Identifies compacted ranges and filters out events + * that are covered by compaction summaries. + * + * @param events The list of events to process. + * @return The list of events with compaction applied. + */ + private fun processCompactionEvents(events: List): List { + // Extract all compaction ranges from the events. + val compactionRanges = events.mapIndexedNotNull { index, event -> + event.actions.compaction?.let { CompactionRange(index, it.startTimestamp, it.endTimestamp) } + } + val coveredIndices = coveredCompactionRangeIndices(compactionRanges) + val keptCompactionRanges = compactionRanges.filter { it.index !in coveredIndices } + + data class Item(val timestamp: Long, val index: Int, val event: Event) + + val finalItems = mutableListOf() + + // Pass 1: append all kept compaction events. + for (range in keptCompactionRanges) { + val compaction = events[range.index].actions.compaction!! + finalItems.add( + Item( + compaction.endTimestamp, + range.index, + events[range.index].copy( + author = Role.MODEL, + content = compaction.compactedContent, + timestamp = compaction.endTimestamp, + ), + ) + ) + } + + // Pass 2: append raw (non-compaction) events that don't fall into a kept compaction range. + finalItems += + events + .withIndex() + .filter { (_, event) -> event.actions.compaction == null } + .filter { (_, event) -> keptCompactionRanges.none { event.timestamp in it.start..it.end } } + .map { (index, event) -> Item(event.timestamp, index, event) } + + return finalItems.sortedWith(compareBy({ it.timestamp }, { it.index })).map { it.event } + } + + /** + * Returns the indices of [ranges] that are fully contained by another range. When two ranges are + * identical only the later one is kept; partially overlapping ranges (neither containing the + * other) are both kept. + */ + private fun coveredCompactionRangeIndices(ranges: List): Set = + ranges.filter { range -> ranges.any { it.covers(range) } }.map { it.index }.toSet() + + private data class CompactionRange(val index: Int, val start: Long, val end: Long) { + /** + * True if this range fully contains [other] -- strictly larger on at least one side, or + * identical but appearing later (so equal ranges keep only the most recent). + */ + fun covers(other: CompactionRange): Boolean = + index != other.index && + start <= other.start && + end >= other.end && + (start < other.start || end > other.end || index > other.index) + } + /** * Returns the suffix of [events] that belongs to the current turn. * @@ -220,6 +296,9 @@ internal class HistoryRewriterProcessor { * Parts with only thoughts are also considered empty. */ private fun containsEmptyContent(event: Event): Boolean { + // Compaction events carry their summary in actions.compaction rather than content; keep them so + // processCompactionEvents can expand them into summary content. + if (event.actions.compaction != null) return false val hasContent = event.content != null && diff --git a/core/src/commonMain/kotlin/com/google/adk/kt/runners/AbstractRunner.kt b/core/src/commonMain/kotlin/com/google/adk/kt/runners/AbstractRunner.kt index 822e0d7..65f93e5 100644 --- a/core/src/commonMain/kotlin/com/google/adk/kt/runners/AbstractRunner.kt +++ b/core/src/commonMain/kotlin/com/google/adk/kt/runners/AbstractRunner.kt @@ -20,6 +20,7 @@ package com.google.adk.kt.runners import com.google.adk.kt.agents.BaseAgent import com.google.adk.kt.agents.InvocationContext +import com.google.adk.kt.agents.LlmAgent import com.google.adk.kt.agents.ResumabilityConfig import com.google.adk.kt.agents.RunConfig import com.google.adk.kt.agents.findAgent @@ -41,6 +42,9 @@ import com.google.adk.kt.sessions.Session import com.google.adk.kt.sessions.SessionKey import com.google.adk.kt.sessions.SessionService import com.google.adk.kt.sessions.State +import com.google.adk.kt.summarizer.EventsCompactionConfig +import com.google.adk.kt.summarizer.LlmEventSummarizer +import com.google.adk.kt.summarizer.SlidingWindowEventCompactor import com.google.adk.kt.telemetry.trace import com.google.adk.kt.types.Blob import com.google.adk.kt.types.Content @@ -53,23 +57,43 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking -/** - * An abstract base class for [Runner] implementations that provides common orchestration logic. - * - * Subclasses may be constructed either from explicit fields or from an [App] (via the [App]-based - * constructor), which supplies the [App.appName] and [App.rootAgent]. - */ -abstract class AbstractRunner( - override val appName: String, - override val agent: BaseAgent, - override val sessionService: SessionService, - override val artifactService: ArtifactService?, - override val memoryService: MemoryService?, - override val pluginManager: PluginManager, - override val resumabilityConfig: ResumabilityConfig = ResumabilityConfig(), -) : Runner { - - /** Creates a runner from an [App], taking its [App.appName] and [App.rootAgent]. */ +/** An abstract base class for [Runner] implementations that provides common orchestration logic. */ +abstract class AbstractRunner : Runner { + + val app: App? + final override val appName: String + final override val agent: BaseAgent + final override val sessionService: SessionService + final override val artifactService: ArtifactService? + final override val memoryService: MemoryService? + final override val pluginManager: PluginManager + final override val resumabilityConfig: ResumabilityConfig + + /** Creates a runner from explicit fields, not using an [App]. */ + constructor( + appName: String, + agent: BaseAgent, + sessionService: SessionService, + artifactService: ArtifactService?, + memoryService: MemoryService?, + pluginManager: PluginManager, + resumabilityConfig: ResumabilityConfig = ResumabilityConfig(), + ) { + this.appName = appName + this.agent = agent + this.sessionService = sessionService + this.artifactService = artifactService + this.memoryService = memoryService + this.pluginManager = pluginManager + this.resumabilityConfig = resumabilityConfig + this.app = null + } + + /** + * Creates a runner from an [App]. The compaction config is resolved at construction (failing fast + * if a default summarizer is required but the root agent is not an [LlmAgent]) and stored back on + * the [app], so [App.eventsCompactionConfig] returns the effective config. + */ constructor( app: App, sessionService: SessionService, @@ -77,15 +101,20 @@ abstract class AbstractRunner( memoryService: MemoryService?, pluginManager: PluginManager, resumabilityConfig: ResumabilityConfig = ResumabilityConfig(), - ) : this( - app.appName, - app.rootAgent, - sessionService, - artifactService, - memoryService, - pluginManager, - resumabilityConfig, - ) + ) { + this.appName = app.appName + this.agent = app.rootAgent + this.sessionService = sessionService + this.artifactService = artifactService + this.memoryService = memoryService + this.pluginManager = pluginManager + this.resumabilityConfig = resumabilityConfig + this.app = + app.copy( + eventsCompactionConfig = + resolveEventsCompactionConfig(app.rootAgent, app.eventsCompactionConfig) + ) + } /** * Main entry method to run the agent in this runner. @@ -125,6 +154,10 @@ abstract class AbstractRunner( // 4. Run agent with plugins emitAll(runAgentWithPlugins(context)) + + // 5. Post-invocation context compaction. Runs once the agent has finished emitting and all + // its events have been appended to `session`. + runPostInvocationCompaction(session) } .trace("invocation") @@ -680,7 +713,38 @@ abstract class AbstractRunner( return "e-" + Uuid.random() } + /** + * Runs post-invocation sliding-window context compaction over [session] when configured. A no-op + * when no compaction config was supplied or sliding-window compaction is not configured. The + * compactor appends a single summary [Event] to [session] (via [sessionService]) when the + * configured invocation interval is reached. + */ + private suspend fun runPostInvocationCompaction(session: Session) { + val config = app?.eventsCompactionConfig ?: return + if (!config.hasSlidingWindowConfig()) return + SlidingWindowEventCompactor(config).compact(session, sessionService) + } + private companion object { private val logger = LoggerFactory.getLogger(AbstractRunner::class) + + /** + * Returns [config] with a default [LlmEventSummarizer] injected when compaction is configured + * but no summarizer was supplied. The default summarizer uses [rootAgent]'s model, so + * [rootAgent] must be an [LlmAgent] in that case; otherwise an [IllegalArgumentException] is + * thrown. Returns [config] unchanged when it is `null` or already carries a summarizer. Any + * configured compaction strategy needs a summarizer, so this does not depend on which strategy + * is set. + */ + private fun resolveEventsCompactionConfig( + rootAgent: BaseAgent, + config: EventsCompactionConfig?, + ): EventsCompactionConfig? { + if (config == null || config.summarizer != null) return config + val model = + (rootAgent as? LlmAgent)?.model + ?: throw IllegalArgumentException("No BaseLlm model available for event compaction") + return config.copy(summarizer = LlmEventSummarizer(model)) + } } } diff --git a/core/src/commonTest/kotlin/com/google/adk/kt/apps/AppTest.kt b/core/src/commonTest/kotlin/com/google/adk/kt/apps/AppTest.kt index 1a230d4..aecf089 100644 --- a/core/src/commonTest/kotlin/com/google/adk/kt/apps/AppTest.kt +++ b/core/src/commonTest/kotlin/com/google/adk/kt/apps/AppTest.kt @@ -16,10 +16,12 @@ package com.google.adk.kt.apps +import com.google.adk.kt.summarizer.EventsCompactionConfig import com.google.adk.kt.testing.DummyAgent import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertNull import kotlin.test.assertSame class AppTest { @@ -34,6 +36,22 @@ class AppTest { assertSame(agent, app.rootAgent) } + @Test + fun construct_noEventsCompactionConfig_defaultsToNull() { + val app = App(appName = "my_app", rootAgent = DummyAgent()) + + assertNull(app.eventsCompactionConfig) + } + + @Test + fun construct_withEventsCompactionConfig_exposesIt() { + val config = EventsCompactionConfig(compactionInterval = 2, overlapSize = 1) + + val app = App(appName = "my_app", rootAgent = DummyAgent(), eventsCompactionConfig = config) + + assertSame(config, app.eventsCompactionConfig) + } + @Test fun construct_emptyName_throwsIllegalArgumentException() { assertFailsWith { App(appName = "", rootAgent = DummyAgent()) } diff --git a/core/src/commonTest/kotlin/com/google/adk/kt/processors/ContentsProcessorTest.kt b/core/src/commonTest/kotlin/com/google/adk/kt/processors/ContentsProcessorTest.kt index 0c2e72b..ac36fe4 100644 --- a/core/src/commonTest/kotlin/com/google/adk/kt/processors/ContentsProcessorTest.kt +++ b/core/src/commonTest/kotlin/com/google/adk/kt/processors/ContentsProcessorTest.kt @@ -25,6 +25,7 @@ import com.google.adk.kt.sessions.InMemorySessionService import com.google.adk.kt.sessions.SessionKey import com.google.adk.kt.testing.DummyAgent import com.google.adk.kt.testing.DummyModel +import com.google.adk.kt.testing.compactionEvent import com.google.adk.kt.testing.modelMessage import com.google.adk.kt.testing.testSession import com.google.adk.kt.testing.userMessage @@ -1183,6 +1184,219 @@ class ContentsProcessorTest { assertThat(result).isEmpty() } + @Test + fun process_compactionEvent_replacesCoveredEventsWithSummary() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("u1"), timestamp = 1L), + Event(author = "testAgent", content = modelMessage("m1"), timestamp = 2L), + Event(author = "user", content = userMessage("u2"), timestamp = 3L), + compactionEvent(startTs = 1L, endTs = 2L, timestamp = 4L, summary = "summary"), + ) + + request = processor.process(context, request) + + // u1(ts=1) and m1(ts=2) fall in [1,2] -> replaced by the summary (at ts=2); u2(ts=3) is kept. + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("summary", "u2") + .inOrder() + } + + @Test + fun process_nestedCompactions_keepsOnlyOuterSummary() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("u1"), timestamp = 1L), + Event(author = "testAgent", content = modelMessage("m1"), timestamp = 2L), + Event(author = "user", content = userMessage("u2"), timestamp = 3L), + Event(author = "testAgent", content = modelMessage("m2"), timestamp = 4L), + compactionEvent(startTs = 1L, endTs = 2L, timestamp = 5L, summary = "inner"), + compactionEvent(startTs = 1L, endTs = 4L, timestamp = 6L, summary = "outer"), + Event(author = "user", content = userMessage("u3"), timestamp = 7L), + ) + + request = processor.process(context, request) + + // [1,2] is contained in [1,4], so only "outer" survives (covering u1..m2); u3 is kept. + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("outer", "u3") + .inOrder() + } + + @Test + fun process_partiallyOverlappingCompactions_keepsBoth() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("u1"), timestamp = 1L), + Event(author = "testAgent", content = modelMessage("m1"), timestamp = 2L), + Event(author = "user", content = userMessage("u2"), timestamp = 3L), + Event(author = "testAgent", content = modelMessage("m2"), timestamp = 4L), + compactionEvent(startTs = 1L, endTs = 2L, timestamp = 5L, summary = "first"), + compactionEvent(startTs = 2L, endTs = 4L, timestamp = 6L, summary = "second"), + Event(author = "user", content = userMessage("u3"), timestamp = 7L), + ) + + request = processor.process(context, request) + + // [1,2] and [2,4] overlap but neither contains the other, so both summaries are kept. + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("first", "second", "u3") + .inOrder() + } + + @Test + fun process_noCompaction_returnsEventsUnchanged() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("u1"), timestamp = 1L), + Event(author = "testAgent", content = modelMessage("m1"), timestamp = 2L), + Event(author = "user", content = userMessage("u2"), timestamp = 3L), + ) + + request = processor.process(context, request) + + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("u1", "m1", "u2") + .inOrder() + } + + @Test + fun process_noCompaction_preservesOriginalEventOrder() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("first"), timestamp = 2L), + Event(author = "testAgent", content = modelMessage("second"), timestamp = 1L), + ) + + request = processor.process(context, request) + + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("first", "second") + .inOrder() + } + + @Test + fun process_compactionAtBeginning_keepsLaterEvents() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + compactionEvent(startTs = 1L, endTs = 2L, timestamp = 2L, summary = "summary"), + Event(author = "user", content = userMessage("u3"), timestamp = 3L), + Event(author = "testAgent", content = modelMessage("m4"), timestamp = 4L), + ) + + request = processor.process(context, request) + + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("summary", "u3", "m4") + .inOrder() + } + + @Test + fun process_compactionAtEnd_keepsEarlierRawEvents() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("u1"), timestamp = 1L), + Event(author = "testAgent", content = modelMessage("m2"), timestamp = 2L), + Event(author = "user", content = userMessage("u3"), timestamp = 3L), + compactionEvent(startTs = 2L, endTs = 3L, timestamp = 4L, summary = "summary"), + ) + + request = processor.process(context, request) + + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("u1", "summary") + .inOrder() + } + + @Test + fun process_twoAdjacentCompactions_keepBothSummaries() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("u1"), timestamp = 1L), + Event(author = "testAgent", content = modelMessage("m2"), timestamp = 2L), + compactionEvent(startTs = 1L, endTs = 2L, timestamp = 2L, summary = "summary1to2"), + Event(author = "user", content = userMessage("u3"), timestamp = 3L), + Event(author = "testAgent", content = modelMessage("m4"), timestamp = 4L), + compactionEvent(startTs = 3L, endTs = 4L, timestamp = 4L, summary = "summary3to4"), + Event(author = "user", content = userMessage("u5"), timestamp = 5L), + ) + + request = processor.process(context, request) + + // [1,2] and [3,4] each replace their range; u5 (ts=5) is uncovered and kept. + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("summary1to2", "summary3to4", "u5") + .inOrder() + } + + @Test + fun process_multipleCompactions_replaceRangesAndKeepRawEventsBetween() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("e1"), timestamp = 1L), + Event(author = "user", content = userMessage("e2"), timestamp = 2L), + Event(author = "user", content = userMessage("e3"), timestamp = 3L), + Event(author = "user", content = userMessage("e4"), timestamp = 4L), + compactionEvent(startTs = 1L, endTs = 4L, timestamp = 4L, summary = "summary1to4"), + Event(author = "user", content = userMessage("e5"), timestamp = 5L), + Event(author = "user", content = userMessage("e6"), timestamp = 6L), + Event(author = "user", content = userMessage("e7"), timestamp = 7L), + Event(author = "user", content = userMessage("e8"), timestamp = 8L), + Event(author = "user", content = userMessage("e9"), timestamp = 9L), + compactionEvent(startTs = 6L, endTs = 9L, timestamp = 9L, summary = "summary6to9"), + Event(author = "user", content = userMessage("e10"), timestamp = 10L), + ) + + request = processor.process(context, request) + + // [1,4] and [6,9] are replaced by their summaries; e5 (gap) and e10 (after) are kept. + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("summary1to4", "e5", "summary6to9", "e10") + .inOrder() + } + + @Test + fun process_compactionAppendedLate_keepsNewerEvents() = runTest { + val processor = ContentsProcessor() + var request = LlmRequest(contents = emptyList()) + val context = + createLlmAgentTestContext( + Event(author = "user", content = userMessage("e1"), timestamp = 1L), + Event(author = "user", content = userMessage("e2"), timestamp = 2L), + Event(author = "user", content = userMessage("e3"), timestamp = 3L), + Event(author = "user", content = userMessage("u4"), timestamp = 4L), + Event(author = "testAgent", content = modelMessage("m5"), timestamp = 5L), + compactionEvent(startTs = 1L, endTs = 3L, timestamp = 6L, summary = "summary1to3"), + ) + + request = processor.process(context, request) + + // The compaction covers [1,3] but was appended at ts=6; u4,m5 (after the range) survive, and + // the summary is positioned at its end timestamp (3) -- ahead of them -- not at the append + // time. + assertThat(request.contents.map { it.parts.firstOrNull()?.text }) + .containsExactly("summary1to3", "u4", "m5") + .inOrder() + } + // Helpers private suspend fun createTestContext( diff --git a/core/src/commonTest/kotlin/com/google/adk/kt/runners/AbstractRunnerTest.kt b/core/src/commonTest/kotlin/com/google/adk/kt/runners/AbstractRunnerTest.kt index d0ebe4f..2439939 100644 --- a/core/src/commonTest/kotlin/com/google/adk/kt/runners/AbstractRunnerTest.kt +++ b/core/src/commonTest/kotlin/com/google/adk/kt/runners/AbstractRunnerTest.kt @@ -20,15 +20,22 @@ package com.google.adk.kt.runners import com.google.adk.kt.agents.BaseAgent import com.google.adk.kt.agents.InvocationContext +import com.google.adk.kt.agents.LlmAgent import com.google.adk.kt.agents.ResumabilityConfig import com.google.adk.kt.annotations.ExperimentalResumabilityFeature +import com.google.adk.kt.apps.App import com.google.adk.kt.artifacts.ArtifactService import com.google.adk.kt.artifacts.InMemoryArtifactService import com.google.adk.kt.events.Event import com.google.adk.kt.events.EventActions +import com.google.adk.kt.models.LlmResponse import com.google.adk.kt.sessions.SessionKey import com.google.adk.kt.sessions.State +import com.google.adk.kt.summarizer.EventSummarizer +import com.google.adk.kt.summarizer.EventsCompactionConfig import com.google.adk.kt.testing.DummyAgent +import com.google.adk.kt.testing.DummyModel +import com.google.adk.kt.testing.compactionEvent import com.google.adk.kt.testing.modelMessage import com.google.adk.kt.testing.userFunctionResponse import com.google.adk.kt.testing.userMessage @@ -40,7 +47,11 @@ import com.google.adk.kt.types.Role import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertNotNull import kotlin.test.assertNull +import kotlin.test.assertTrue +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runTest class AbstractRunnerTest { @@ -686,6 +697,172 @@ class AbstractRunnerTest { artifactService.lastSavedArtifact("f1"), ) } + + // ----- Post-invocation context compaction wiring (configured via App) ----- + + @Test + fun runAsync_slidingWindowConfigured_compactsAfterInterval() = runTest { + val summarizer = RecordingSummarizer(returning = compactionEvent(startTs = 0L, endTs = 0L)) + val runner = + InMemoryRunner( + app = + App( + appName = "test_app", + rootAgent = echoAgent(), + eventsCompactionConfig = + EventsCompactionConfig( + compactionInterval = 2, + overlapSize = 0, + summarizer = summarizer, + ), + ) + ) + + // The runner exposes its App (read-only); the effective compaction config is read from it. + assertEquals(summarizer, runner.app?.eventsCompactionConfig?.summarizer) + + // First invocation: only one completed invocation, below the interval. + runner.runAsync(userId = "user", sessionId = "session", newMessage = userMessage("hi")).toList() + assertTrue(summarizer.calls.isEmpty()) + + // Second invocation: the interval is reached, so compaction fires exactly once over the four + // raw events (user + model per invocation) from the two completed invocations. + runner + .runAsync(userId = "user", sessionId = "session", newMessage = userMessage("hi again")) + .toList() + assertEquals(1, summarizer.calls.size) + assertEquals(4, summarizer.calls.single().size) + + val events = + assertNotNull(runner.sessionService.getSession(SessionKey(runner.appName, "user", "session"))) + .events + assertEquals(1, events.count { it.actions.compaction != null }) + } + + @Test + fun runAsync_belowCompactionInterval_doesNotCompact() = runTest { + val summarizer = RecordingSummarizer(returning = compactionEvent(startTs = 0L, endTs = 0L)) + val runner = + InMemoryRunner( + app = + App( + appName = "test_app", + rootAgent = echoAgent(), + eventsCompactionConfig = + EventsCompactionConfig( + compactionInterval = 3, + overlapSize = 0, + summarizer = summarizer, + ), + ) + ) + + runner.runAsync(userId = "user", sessionId = "session", newMessage = userMessage("a")).toList() + runner.runAsync(userId = "user", sessionId = "session", newMessage = userMessage("b")).toList() + + assertTrue(summarizer.calls.isEmpty()) + val events = + assertNotNull(runner.sessionService.getSession(SessionKey(runner.appName, "user", "session"))) + .events + assertTrue(events.none { it.actions.compaction != null }) + // Two invocations, each appending a user and a model event, and no compaction event added. + assertEquals(4, events.size) + } + + @Test + fun runAsync_noCompactionConfig_doesNotCompact() = runTest { + val runner = InMemoryRunner(agent = echoAgent()) + + runner.runAsync(userId = "user", sessionId = "session", newMessage = userMessage("a")).toList() + runner.runAsync(userId = "user", sessionId = "session", newMessage = userMessage("b")).toList() + + val events = + assertNotNull(runner.sessionService.getSession(SessionKey(runner.appName, "user", "session"))) + .events + assertTrue(events.none { it.actions.compaction != null }) + } + + @Test + fun construct_slidingWindowWithoutSummarizerAndNonLlmAgentRoot_throws() { + assertFailsWith { + InMemoryRunner( + app = + App( + appName = "test_app", + rootAgent = DummyAgent(), + eventsCompactionConfig = EventsCompactionConfig(compactionInterval = 2, overlapSize = 0), + ) + ) + } + } + + @Test + fun construct_compactionConfigWithoutStrategyAndNonLlmAgentRoot_throws() { + // A summarizer is resolved for any compaction config (not only sliding-window), so even a + // strategy-less config requires a model and fails fast when the root is not an LlmAgent. + assertFailsWith { + InMemoryRunner( + app = + App( + appName = "test_app", + rootAgent = DummyAgent(), + eventsCompactionConfig = EventsCompactionConfig(), + ) + ) + } + } + + @Test + fun runAsync_defaultSummarizerUsesRootLlmAgentModel() = runTest { + // No summarizer supplied: the runner must build a default LlmEventSummarizer from the root + // LlmAgent's model, so the compaction summary content comes from that model. + val model = DummyModel(name = "model") { flowOf(LlmResponse(content = modelMessage("OK"))) } + val runner = + InMemoryRunner( + app = + App( + appName = "test_app", + rootAgent = LlmAgent(name = "agent", model = model), + eventsCompactionConfig = EventsCompactionConfig(compactionInterval = 2, overlapSize = 0), + ) + ) + + runner.runAsync(userId = "user", sessionId = "session", newMessage = userMessage("hi")).toList() + runner + .runAsync(userId = "user", sessionId = "session", newMessage = userMessage("hi again")) + .toList() + + val events = + assertNotNull(runner.sessionService.getSession(SessionKey(runner.appName, "user", "session"))) + .events + val compaction = events.single { it.actions.compaction != null } + assertEquals("OK", compaction.actions.compaction?.compactedContent?.parts?.firstOrNull()?.text) + } +} + +/** A [DummyAgent] that emits one model [Event] tagged with the current invocation id per turn. */ +private fun echoAgent(name: String = "agent"): DummyAgent = + DummyAgent(name = name) { context -> + emit( + Event( + author = Role.MODEL, + invocationId = context.invocationId, + content = modelMessage("resp"), + ) + ) + } + +/** + * An [EventSummarizer] that records every event list passed to [summarizeEvents] in [calls] and + * returns the preconfigured [returning] event. + */ +private class RecordingSummarizer(private val returning: Event? = null) : EventSummarizer { + val calls: MutableList> = mutableListOf() + + override suspend fun summarizeEvents(events: List): Event? { + calls.add(events.toList()) + return returning + } } /** Artifact service that rejects `fileData` parts, mirroring GCS/file-backed services. */ diff --git a/core/src/commonTest/kotlin/com/google/adk/kt/runners/InMemoryRunnerTest.kt b/core/src/commonTest/kotlin/com/google/adk/kt/runners/InMemoryRunnerTest.kt index 36ca868..966d7ca 100644 --- a/core/src/commonTest/kotlin/com/google/adk/kt/runners/InMemoryRunnerTest.kt +++ b/core/src/commonTest/kotlin/com/google/adk/kt/runners/InMemoryRunnerTest.kt @@ -26,11 +26,19 @@ import com.google.adk.kt.annotations.ExperimentalResumabilityFeature import com.google.adk.kt.apps.App import com.google.adk.kt.events.Event import com.google.adk.kt.events.EventActions +import com.google.adk.kt.models.LlmRequest import com.google.adk.kt.models.LlmResponse +import com.google.adk.kt.sessions.InMemorySessionService +import com.google.adk.kt.sessions.Session import com.google.adk.kt.sessions.SessionKey +import com.google.adk.kt.sessions.SessionService import com.google.adk.kt.sessions.State +import com.google.adk.kt.summarizer.EventSummarizer +import com.google.adk.kt.summarizer.EventsCompactionConfig +import com.google.adk.kt.summarizer.LlmEventSummarizer import com.google.adk.kt.testing.DummyAgent import com.google.adk.kt.testing.DummyModel +import com.google.adk.kt.testing.compactionEvent import com.google.adk.kt.testing.modelMessage import com.google.adk.kt.testing.simplifyEvents import com.google.adk.kt.testing.userMessage @@ -74,6 +82,84 @@ class InMemoryRunnerTest { assertThat(sessionEvents.map { it.author }).containsExactly(Role.USER, Role.MODEL).inOrder() } + @Test + fun runAsync_constructedFromApp_wiresCompactionConfig() = runTest { + val summarizer = RecordingEventSummarizer() + val app = + App( + appName = "compaction_app", + rootAgent = DummyAgent(name = "agent"), + eventsCompactionConfig = + EventsCompactionConfig(compactionInterval = 1, overlapSize = 0, summarizer = summarizer), + ) + val runner = InMemoryRunner(app = app) + + runner + .runAsync(userId = "user1", sessionId = "session1", newMessage = userMessage("hi")) + .toList() + + assertThat(runner.appName).isEqualTo("compaction_app") + assertThat(summarizer.calls).hasSize(1) + val events = + runner.sessionService.getSession(SessionKey(runner.appName, "user1", "session1"))!!.events + assertThat(events.any { it.actions.compaction != null }).isTrue() + } + + /** + * Full end-to-end of sliding-window compaction through [InMemoryRunner.runAsync]: turn 1 triggers + * post-invocation compaction, and turn 2's prompt to the LLM shows the resulting summary in place + * of turn 1's raw messages. A [MonotonicTimestampSessionService] gives appended events strictly + * increasing timestamps so the test does not depend on wall-clock resolution -- real events + * created within the same millisecond would otherwise share a timestamp, making the compaction + * range and event ordering non-deterministic across platforms (e.g. the Android unit-test + * runtime). + */ + @Test + fun runAsync_compactionEndToEnd_summaryFromTurnOneReplacesHistoryInNextPrompt() = runTest { + val agentPrompts = mutableListOf() + // The agent's model records every prompt it receives and returns a canned answer. + val agentModel = + DummyModel(name = "agent-model") { request -> + agentPrompts += request + flowOf(LlmResponse(content = modelMessage("answer"))) + } + // The compaction summarizer's model returns a recognizable summary sentinel. + val summarizerModel = + DummyModel(name = "summarizer-model") { + flowOf(LlmResponse(content = modelMessage("SUMMARY_OF_EARLIER_TURNS"))) + } + val runner = + InMemoryRunner( + app = + App( + appName = "compaction_app", + rootAgent = LlmAgent(name = "agent", model = agentModel), + eventsCompactionConfig = + EventsCompactionConfig( + compactionInterval = 1, + overlapSize = 0, + summarizer = LlmEventSummarizer(summarizerModel), + ), + ), + sessionService = MonotonicTimestampSessionService(), + ) + + // Turn 1: post-invocation compaction fires and summarizes this turn. + runner + .runAsync(userId = "user", sessionId = "session", newMessage = userMessage("first question")) + .toList() + // Turn 2: its prompt should now show the summary instead of turn 1's raw messages. + runner + .runAsync(userId = "user", sessionId = "session", newMessage = userMessage("second question")) + .toList() + + val turn2Prompt = + agentPrompts.last().contents.flatMap { it.parts }.mapNotNull { it.text }.joinToString("\n") + assertThat(turn2Prompt).contains("SUMMARY_OF_EARLIER_TURNS") + assertThat(turn2Prompt).contains("second question") + assertThat(turn2Prompt).doesNotContain("first question") + } + @Test fun runAsync_withoutFunctionResponse_usesProvidedInvocationId() = runTest { val runner = InMemoryRunner(agent = dummyAgent) @@ -345,15 +431,14 @@ class InMemoryRunnerTest { ), ) - val events = - runner - .runAsync( - userId = "user1", - sessionId = "session1", - invocationId = "test-inv", - newMessage = userMessage("New message"), - ) - .toList() + runner + .runAsync( + userId = "user1", + sessionId = "session1", + invocationId = "test-inv", + newMessage = userMessage("New message"), + ) + .toList() val allSessionEvents = runner.sessionService.getSession(SessionKey(runner.appName, "user1", "session1"))!!.events @@ -362,3 +447,33 @@ class InMemoryRunnerTest { assertThat(allSessionEvents.last().content?.parts?.get(0)?.text).isEqualTo("New message") } } + +/** + * An [EventSummarizer] that records every event list passed to [summarizeEvents] in [calls] and + * returns a fixed compaction [Event]. + */ +private class RecordingEventSummarizer( + private val returning: Event = compactionEvent(startTs = 0L, endTs = 0L) +) : EventSummarizer { + val calls: MutableList> = mutableListOf() + + override suspend fun summarizeEvents(events: List): Event { + calls.add(events.toList()) + return returning + } +} + +/** + * A [SessionService] that stamps each appended event with a strictly increasing timestamp, so tests + * driving the runner don't depend on wall-clock resolution. Real events created within the same + * millisecond would otherwise share a timestamp, making compaction ranges and event ordering + * non-deterministic across platforms (e.g. the Android unit-test runtime). + */ +private class MonotonicTimestampSessionService( + private val delegate: SessionService = InMemorySessionService() +) : SessionService by delegate { + private var nextTimestamp = 1L + + override suspend fun appendEvent(session: Session, event: Event): Event = + delegate.appendEvent(session, event.copy(timestamp = nextTimestamp++)) +} diff --git a/examples/src/main/kotlin/com/google/adk/kt/examples/compaction/CompactionDemoAgent.kt b/examples/src/main/kotlin/com/google/adk/kt/examples/compaction/CompactionDemoAgent.kt new file mode 100644 index 0000000..df5c139 --- /dev/null +++ b/examples/src/main/kotlin/com/google/adk/kt/examples/compaction/CompactionDemoAgent.kt @@ -0,0 +1,148 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.kt.examples.compaction + +import com.google.adk.kt.agents.LlmAgent +import com.google.adk.kt.apps.App +import com.google.adk.kt.models.Gemini +import com.google.adk.kt.models.LlmRequest +import com.google.adk.kt.models.LlmResponse +import com.google.adk.kt.models.Model +import com.google.adk.kt.runners.InMemoryRunner +import com.google.adk.kt.sessions.SessionKey +import com.google.adk.kt.summarizer.EventsCompactionConfig +import com.google.adk.kt.summarizer.LlmEventSummarizer +import com.google.adk.kt.types.Content +import com.google.adk.kt.types.Role +import java.util.Scanner +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.runBlocking + +private const val MODEL_NAME = "gemini-3.1-flash-lite" + +/** + * Interactive end-to-end demo of sliding-window context compaction. + * + * Chat with the agent in your terminal. Both the agent's model and the compaction summarizer's + * model are wrapped in a [PrintingModel] that prints every prompt before it is sent, so you can + * watch the conversation history grow and then collapse into a single summary once compaction kicks + * in (configured here to compact every two turns). + * + * If `GEMINI_API_KEY` (or `GOOGLE_API_KEY`) is set, it talks to a real Gemini model. Otherwise it + * falls back to canned replies so the compaction behavior is still fully demonstrable offline. Type + * `exit` (or an empty line) to quit; a summary of the stored session events is printed on exit. + */ +private class PrintingModel(private val label: String, private val delegate: Model) : Model { + override val name: String = delegate.name + + override fun generateContent(request: LlmRequest, stream: Boolean): Flow = flow { + println("\n >>> $label prompt (${request.contents.size} content(s)):") + request.contents.forEachIndexed { index, content -> + val text = content.parts.mapNotNull { it.text }.joinToString(" ").ifEmpty { "" } + println(" [$index] ${content.role}: $text") + } + emitAll(delegate.generateContent(request, stream)) + } +} + +/** + * A [Model] that ignores the prompt and always returns [reply]; used when no API key is available. + */ +private class CannedModel(override val name: String, private val reply: String) : Model { + override fun generateContent(request: LlmRequest, stream: Boolean): Flow = + flowOf(LlmResponse(content = Content.fromText(Role.MODEL, reply))) +} + +private fun hasApiKey(): Boolean = + !System.getenv("GEMINI_API_KEY").isNullOrBlank() || + !System.getenv("GOOGLE_API_KEY").isNullOrBlank() + +private fun realOrCanned(cannedName: String, cannedReply: String): Model = + if (hasApiKey()) Gemini(name = MODEL_NAME) else CannedModel(cannedName, cannedReply) + +fun main() = runBlocking { + val agentModel = PrintingModel("AGENT LLM", realOrCanned("agent", "(canned) Here is an answer.")) + val summarizerModel = + PrintingModel( + "SUMMARIZER LLM", + realOrCanned("summarizer", "<>"), + ) + + val app = + App( + appName = "compaction_demo", + rootAgent = LlmAgent(name = "assistant", model = agentModel), + // Compact every 2 user invocations, with no overlap, using the LLM summarizer above. + eventsCompactionConfig = + EventsCompactionConfig( + compactionInterval = 2, + overlapSize = 0, + summarizer = LlmEventSummarizer(summarizerModel), + ), + ) + val runner = InMemoryRunner(app = app) + val userId = "demo-user" + val sessionId = "demo-session" + + println("Sliding-window compaction demo. Type a message; 'exit' or an empty line quits.") + if (!hasApiKey()) { + println( + "(No GEMINI_API_KEY/GOOGLE_API_KEY set -- using canned replies; compaction still works.)" + ) + } + + val scanner = Scanner(System.`in`) + while (true) { + print("\nYou > ") + System.out.flush() + if (!scanner.hasNextLine()) break + val input = scanner.nextLine() + if (input.isBlank() || input.trim().lowercase() in setOf("exit", "quit")) break + + runner + .runAsync( + userId = userId, + sessionId = sessionId, + newMessage = Content.fromText(Role.USER, input), + ) + .collect { event -> + val text = event.content?.parts?.mapNotNull { it.text }?.joinToString(" ").orEmpty() + if (text.isNotBlank()) println("\nassistant > $text") + } + } + + println("\n========== SESSION EVENTS (raw events are kept; summaries are appended) ==========") + val session = + runner.sessionService.getSession(SessionKey("compaction_demo", userId, sessionId)) + ?: return@runBlocking + session.events.forEachIndexed { index, event -> + val compaction = event.actions.compaction + val description = + if (compaction != null) { + val summary = compaction.compactedContent.parts.mapNotNull { it.text }.joinToString(" ") + "COMPACTION SUMMARY covering [${compaction.startTimestamp}..${compaction.endTimestamp}]: " + + summary + } else { + val text = event.content?.parts?.mapNotNull { it.text }?.joinToString(" ").orEmpty() + "${event.author}: $text" + } + println(" [$index] $description") + } +}