diff --git a/packages/core/src/helpers/Conversation.helper.ts b/packages/core/src/helpers/Conversation.helper.ts index c2eb3399..97a7f727 100644 --- a/packages/core/src/helpers/Conversation.helper.ts +++ b/packages/core/src/helpers/Conversation.helper.ts @@ -76,6 +76,8 @@ export class Conversation extends EventEmitter { return this._id; } + private _conversationId: string = ''; + // Tool call limit tracking private _toolCallCount: number = 0; private _maxToolCallsPerSession: number = Infinity; // Default limit @@ -169,6 +171,10 @@ export class Conversation extends EventEmitter { this._maxToolCallsPerSession = _settings.maxToolCalls; } + if(_settings?.conversationId) { + this._conversationId = _settings.conversationId; + } + this._baseUrl = _settings?.baseUrl; this._agentVersion = _settings?.agentVersion; @@ -808,7 +814,7 @@ export class Conversation extends EventEmitter { //we force the conversationId header after checking that it was not remotely set if (!reqConfig.headers['x-conversation-id']) { - reqConfig.headers['x-conversation-id'] = this.id; + reqConfig.headers['x-conversation-id'] = this._conversationId || this.id; } if (canRunLocally && !requiresRemoteCall) { console.log('RUNNING AGENT LOCALLY'); @@ -1037,7 +1043,7 @@ export class Conversation extends EventEmitter { messages = this._context.messages; // preserve messages } - this._context = new LLMContext(this.llmInference, this.systemPrompt, this._llmContextStore); + this._context = new LLMContext(this.llmInference, this.systemPrompt, this._llmContextStore, this._conversationId); } else { this._toolsConfig = null; this._reqMethods = null; diff --git a/packages/core/src/subsystems/AgentManager/AgentRuntime.class.ts b/packages/core/src/subsystems/AgentManager/AgentRuntime.class.ts index 2b56c497..8bfd92fd 100644 --- a/packages/core/src/subsystems/AgentManager/AgentRuntime.class.ts +++ b/packages/core/src/subsystems/AgentManager/AgentRuntime.class.ts @@ -1,6 +1,7 @@ import { Component } from '@sre/Components/Component.class'; import { Agent } from './Agent.class'; +import { ConnectorService } from '@sre/Core/ConnectorsService'; import { Logger } from '@sre/helpers/Log.helper'; import { LLMCache } from '@sre/MemoryManager/LLMCache'; import { RuntimeContext } from '@sre/MemoryManager/RuntimeContext'; @@ -22,7 +23,7 @@ const AgentRuntimeUnavailable = new Proxy( }; } }, - } + }, ); export class AgentRuntime { private static processResults: any = {}; @@ -182,15 +183,97 @@ export class AgentRuntime { //if xDebugId is equal to agent session, it means that the debugging features are not active this._debugActive = this.xDebugId != agent.sessionId; - const xCacheId = agent.agentRequest.header('X-CACHE-ID') || ''; - this.llmCache = new LLMCache(AccessCandidate.agent(this.agent.id), xCacheId); - //this.xCacheId = + this.initLLMCache(); + } + + private getDebugConversationKey(): string { + return `debug:${this.xDebugId}:conversationId`; + } + + private async initLLMCache() { + const xCacheId = this.agent.agentRequest.header('X-CACHE-ID') || ''; + let conversationId = this.agent.conversationId; + + /** + * Retrieve conversationId for workflow runtime using debug session mapping. + * + * Goal: Track conversation context across different runtime instances (chatbot conversation vs workflow components). + * + * When debugging chatbot: + * - Chatbot conversation runtime HAS conversationId (from chat request) + * - Workflow/debugger runtime DOES NOT have conversationId (workflow execution) + * + * Solution: Retrieve conversationId using xDebugId from the mapping stored by chatbot runtime. + */ + if (!conversationId && this._debugActive && this.xDebugId) { + const cacheConnector = ConnectorService.getCacheConnector(); + const storedConversationId = await cacheConnector + .requester(AccessCandidate.agent(this.agent.id)) + .get(this.getDebugConversationKey()) + .catch(() => null); + + if (storedConversationId) { + conversationId = storedConversationId; + this.agent.conversationId = storedConversationId; + } + } + + /** + * Generate cache ID to track conversation context across different runtime instances. + * + * Priority: xCacheId (explicit override) > conversationId (for context tracking) > undefined + * + * Why conversationId is needed: + * - Enables GenAILLM's "Use Context Window" to maintain conversation history + * - When debugging chatbot: conversationId available from chatbot runtime, OR retrieved via + * debug mapping (above) for workflow runtime - both use same cache ID to share context. + */ + const cacheId = xCacheId || (conversationId ? LLMCache.generateLLMCacheId(conversationId) : undefined); + this.llmCache = new LLMCache(AccessCandidate.agent(this.agent.id), cacheId); + + /** + * Store conversationId with debug session for workflow runtime to retrieve. + * + * When debugging chatbot: + * - Chatbot runtime HAS conversationId → stores mapping via getDebugConversationKey() + * - Workflow runtime retrieves conversationId using xDebugId → both share same LLM context cache + * + * TTL: 1 hour (sufficient for typical debug workflows) + */ + if (this._debugActive && this.xDebugId && this.agent.conversationId) { + const cacheConnector = ConnectorService.getCacheConnector(); + const DEBUG_SESSION_TTL = 60 * 60; // 1 hour in seconds + + cacheConnector + .requester(AccessCandidate.agent(this.agent.id)) + .set(this.getDebugConversationKey(), this.agent.conversationId, null, null, DEBUG_SESSION_TTL) + .catch((err) => { + logger.warn('Failed to store debug-conversation mapping', err); + }); + } } public async ready() { return this.agentContext.ready(); } + /** + * Retrieves the conversation ID associated with the current debug session + * @returns The conversation ID if found, undefined otherwise + */ + public async getConversationId(): Promise { + if (this.agent.conversationId) { + return this.agent.conversationId; + } + + if (this._debugActive && this.xDebugId) { + const conversationId = await this.llmCache.get(this.getDebugConversationKey(), 'text'); + return conversationId || undefined; + } + + return undefined; + } + public destroy() { this.sessionClosed = true; this.sync(); @@ -247,11 +330,11 @@ export class AgentRuntime { dbgActiveComponents = dbgAllComponents.filter( (c: any) => c?.ctx?.active == true || - (!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0) + (!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0), ); //find waiting components that was not previously run const dbgActiveWaitingComponents: any = dbgAllComponents.filter( - (c: any) => c?.ctx?.active == true && c?.ctx?.status && typeof c?.ctx?.output !== undefined + (c: any) => c?.ctx?.active == true && c?.ctx?.status && typeof c?.ctx?.output !== undefined, ); const dbgActiveReadyComponents: any = dbgAllComponents.filter((c: any) => c?.ctx?.active == true && !c?.ctx?.status); @@ -275,10 +358,10 @@ export class AgentRuntime { } const remainingActiveComponents: any = Object.values(ctxData?.components || []).filter( - (c: any) => c?.ctx?.active == true && !c?.ctx?.alwaysActive + (c: any) => c?.ctx?.active == true && !c?.ctx?.alwaysActive, ); const activeAsyncComponents: any = Object.values(ctxData?.components || []).filter( - (c: any) => !c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0 + (c: any) => !c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0, ); if (remainingActiveComponents.length == 0 && activeAsyncComponents.length == 0 /*&& awaitingInputs.length == 0*/) { @@ -319,7 +402,7 @@ export class AgentRuntime { public async runCycle() { logger.debug( `runCycle agentId=${this.agent.id} wfReqId=${this.workflowReqId} reqTag=${this.reqTag} session=${this.xDebugRun} cycleId=${this.processID}`, - AccessCandidate.agent(this.agent.id) + AccessCandidate.agent(this.agent.id), ); //this.checkRuntimeContext(); @@ -336,16 +419,16 @@ export class AgentRuntime { dbgActiveComponents = dbgAllComponents.filter( (c: any) => c?.ctx?.active == true || - (!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0) + (!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0), ); //find waiting components that was not previously run const dbgActiveWaitingComponents: any = dbgAllComponents.filter( - (c: any) => c?.ctx?.active == true && c?.ctx?.status && typeof c?.ctx?.output !== undefined + (c: any) => c?.ctx?.active == true && c?.ctx?.status && typeof c?.ctx?.output !== undefined, ); const dbgActiveReadyComponents: any = dbgAllComponents.filter( (c: any) => (c?.ctx?.active == true && !c?.ctx?.status) || - (!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0) + (!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0), ); //const dbgActiveReadyComponents: any = dbgActiveComponents.filter((c: any) => c?.ctx?.active == true && !c?.ctx?.status); @@ -387,7 +470,7 @@ export class AgentRuntime { const remainingActiveComponents: any = Object.values(ctxData?.components || []).filter((c: any) => c?.ctx?.active == true); const activeAsyncComponents: any = Object.values(ctxData?.components || []).filter( - (c: any) => !c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0 + (c: any) => !c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0, ); const dbgActiveWaitingComponents: any = dbgAllComponents.filter((c: any) => c?.ctx?.status && typeof c?.ctx?.output !== undefined); @@ -405,7 +488,7 @@ export class AgentRuntime { !e.result._missing_inputs && !e.result._in_progress && //check if the current result is a trigger - triggers.find((t: any) => t.id == e.id) + triggers.find((t: any) => t.id == e.id), ); //capture workflow results @@ -415,7 +498,7 @@ export class AgentRuntime { e.result && !e.result._missing_inputs && //check if this is the last component in the chain - !agent.connections.find((c) => c.sourceId == e.id) + !agent.connections.find((c) => c.sourceId == e.id), ); sessionResults = sessionResults.concat(triggersResults); @@ -497,7 +580,7 @@ export class AgentRuntime { } return acc; }, - { seen: {}, result: [] } + { seen: {}, result: [] }, ) .result.filter((e) => !e.result?._exclude); diff --git a/packages/core/src/subsystems/MemoryManager/LLMCache.ts b/packages/core/src/subsystems/MemoryManager/LLMCache.ts index 92c07720..0952e25b 100644 --- a/packages/core/src/subsystems/MemoryManager/LLMCache.ts +++ b/packages/core/src/subsystems/MemoryManager/LLMCache.ts @@ -18,6 +18,15 @@ export class LLMCache { return this._cacheId; } + /** + * Generates a standardized LLM cache ID + * @param id - Identifier to use as cache ID (e.g., conversationId, sessionId) + * @returns LLM cache ID with 'llm_cache_' prefix + */ + public static generateLLMCacheId(id: string): string { + return `llm_cache:${id}`; + } + /** * Creates a new LLMCache instace for a smythOS actor, the actor can be an agent, a user or a team * This is mainly use with agent to maintain a cache of the current LLM context @@ -30,7 +39,7 @@ export class LLMCache { */ constructor(candidate: AccessCandidate, cacheId?: string, ttl: number = 1 * 60 * 60) { this._cacheConnector = ConnectorService.getCacheConnector(); - this._cacheId = cacheId || 'llm_cache_' + uid(); + this._cacheId = cacheId || LLMCache.generateLLMCacheId(uid()); this._ttl = ttl; this._candidate = candidate; } diff --git a/packages/core/src/subsystems/MemoryManager/LLMContext.ts b/packages/core/src/subsystems/MemoryManager/LLMContext.ts index f9001cd2..23eec75b 100644 --- a/packages/core/src/subsystems/MemoryManager/LLMContext.ts +++ b/packages/core/src/subsystems/MemoryManager/LLMContext.ts @@ -36,11 +36,25 @@ export class LLMContext { return this.llmInference.model; } /** + * Creates a new LLM Context instance for managing conversation state. * - * @param source a messages[] object, or smyth file system uri (smythfs://...) + * @param llmInference - The LLM inference instance to use for this context + * @param _systemPrompt - Initial system prompt for the conversation + * @param llmContextStore - Optional persistent storage for conversation messages + * @param conversationId - Optional conversation ID for cache key generation. + * Used to track cached context across runtime instances + * (e.g., debug steps with the Chatbot embodiment has two separate runtimes: one for chat conversation + * and another for workflow components inside the builder). + * Required for "Use Context Window" feature in GenAILLM component to maintain conversation history. */ - constructor(private llmInference, _systemPrompt: string = '', llmContextStore?: ILLMContextStore) { - this._llmCache = new LLMCache(AccessCandidate.team(this.llmInference.teamId)); + constructor( + private llmInference, + _systemPrompt: string = '', + llmContextStore?: ILLMContextStore, + conversationId?: string, + ) { + const cacheId = conversationId ? LLMCache.generateLLMCacheId(conversationId) : undefined; + this._llmCache = new LLMCache(AccessCandidate.team(this.llmInference.teamId), cacheId); //this._systemPrompt = _systemPrompt; this.systemPrompt = _systemPrompt; diff --git a/packages/core/src/types/LLM.types.ts b/packages/core/src/types/LLM.types.ts index 3225bdfd..f090c74d 100644 --- a/packages/core/src/types/LLM.types.ts +++ b/packages/core/src/types/LLM.types.ts @@ -475,6 +475,7 @@ export interface IConversationSettings { * @default 100 */ maxToolCalls?: number; + conversationId?: string; } export enum APIKeySource { diff --git a/packages/sdk/src/LLM/Chat.class.ts b/packages/sdk/src/LLM/Chat.class.ts index 8268b9b1..3a48ed32 100644 --- a/packages/sdk/src/LLM/Chat.class.ts +++ b/packages/sdk/src/LLM/Chat.class.ts @@ -222,6 +222,9 @@ export class Chat extends SDKObject { if ((this.source as Agent)?.modes) { this._curAgentModes = (this.source as Agent).modes.join('|'); } + + this._convOptions.conversationId = this._id; + this._conversation = createConversation(this._data, this._convOptions); }