Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/core/src/helpers/Conversation.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ export class Conversation extends EventEmitter {
return this._id;
}

private _conversationId: string = '';

// Tool call limit tracking
private _toolCallCount: number = 0;
private _maxToolCallsPerSession: number = Infinity; // Default limit
Expand Down Expand Up @@ -169,6 +171,10 @@ export class Conversation extends EventEmitter {
this._maxToolCallsPerSession = _settings.maxToolCalls;
}

if(_settings?.conversationId) {
this._conversationId = _settings.conversationId;
}

this._baseUrl = _settings?.baseUrl;

this._agentVersion = _settings?.agentVersion;
Expand Down Expand Up @@ -808,7 +814,7 @@ export class Conversation extends EventEmitter {

//we force the conversationId header after checking that it was not remotely set
if (!reqConfig.headers['x-conversation-id']) {
reqConfig.headers['x-conversation-id'] = this.id;
reqConfig.headers['x-conversation-id'] = this._conversationId || this.id;
}
if (canRunLocally && !requiresRemoteCall) {
console.log('RUNNING AGENT LOCALLY');
Expand Down Expand Up @@ -1037,7 +1043,7 @@ export class Conversation extends EventEmitter {
messages = this._context.messages; // preserve messages
}

this._context = new LLMContext(this.llmInference, this.systemPrompt, this._llmContextStore);
this._context = new LLMContext(this.llmInference, this.systemPrompt, this._llmContextStore, this._conversationId);
} else {
this._toolsConfig = null;
this._reqMethods = null;
Expand Down
115 changes: 99 additions & 16 deletions packages/core/src/subsystems/AgentManager/AgentRuntime.class.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Component } from '@sre/Components/Component.class';
import { Agent } from './Agent.class';

import { ConnectorService } from '@sre/Core/ConnectorsService';
import { Logger } from '@sre/helpers/Log.helper';
import { LLMCache } from '@sre/MemoryManager/LLMCache';
import { RuntimeContext } from '@sre/MemoryManager/RuntimeContext';
Expand All @@ -22,7 +23,7 @@ const AgentRuntimeUnavailable = new Proxy(
};
}
},
}
},
);
export class AgentRuntime {
private static processResults: any = {};
Expand Down Expand Up @@ -182,15 +183,97 @@ export class AgentRuntime {
//if xDebugId is equal to agent session, it means that the debugging features are not active
this._debugActive = this.xDebugId != agent.sessionId;

const xCacheId = agent.agentRequest.header('X-CACHE-ID') || '';
this.llmCache = new LLMCache(AccessCandidate.agent(this.agent.id), xCacheId);
//this.xCacheId =
this.initLLMCache();
}

private getDebugConversationKey(): string {
return `debug:${this.xDebugId}:conversationId`;
}

private async initLLMCache() {
const xCacheId = this.agent.agentRequest.header('X-CACHE-ID') || '';
let conversationId = this.agent.conversationId;

/**
* Retrieve conversationId for workflow runtime using debug session mapping.
*
* Goal: Track conversation context across different runtime instances (chatbot conversation vs workflow components).
*
* When debugging chatbot:
* - Chatbot conversation runtime HAS conversationId (from chat request)
* - Workflow/debugger runtime DOES NOT have conversationId (workflow execution)
*
* Solution: Retrieve conversationId using xDebugId from the mapping stored by chatbot runtime.
*/
if (!conversationId && this._debugActive && this.xDebugId) {
const cacheConnector = ConnectorService.getCacheConnector();
const storedConversationId = await cacheConnector
.requester(AccessCandidate.agent(this.agent.id))
.get(this.getDebugConversationKey())
.catch(() => null);

if (storedConversationId) {
conversationId = storedConversationId;
this.agent.conversationId = storedConversationId;
}
}

/**
* Generate cache ID to track conversation context across different runtime instances.
*
* Priority: xCacheId (explicit override) > conversationId (for context tracking) > undefined
*
* Why conversationId is needed:
* - Enables GenAILLM's "Use Context Window" to maintain conversation history
* - When debugging chatbot: conversationId available from chatbot runtime, OR retrieved via
* debug mapping (above) for workflow runtime - both use same cache ID to share context.
*/
const cacheId = xCacheId || (conversationId ? LLMCache.generateLLMCacheId(conversationId) : undefined);
this.llmCache = new LLMCache(AccessCandidate.agent(this.agent.id), cacheId);

/**
* Store conversationId with debug session for workflow runtime to retrieve.
*
* When debugging chatbot:
* - Chatbot runtime HAS conversationId β†’ stores mapping via getDebugConversationKey()
* - Workflow runtime retrieves conversationId using xDebugId β†’ both share same LLM context cache
*
* TTL: 1 hour (sufficient for typical debug workflows)
*/
if (this._debugActive && this.xDebugId && this.agent.conversationId) {
const cacheConnector = ConnectorService.getCacheConnector();
const DEBUG_SESSION_TTL = 60 * 60; // 1 hour in seconds

cacheConnector
.requester(AccessCandidate.agent(this.agent.id))
.set(this.getDebugConversationKey(), this.agent.conversationId, null, null, DEBUG_SESSION_TTL)
.catch((err) => {
logger.warn('Failed to store debug-conversation mapping', err);
});
}
}

public async ready() {
return this.agentContext.ready();
}

/**
* Retrieves the conversation ID associated with the current debug session
* @returns The conversation ID if found, undefined otherwise
*/
public async getConversationId(): Promise<string | undefined> {
if (this.agent.conversationId) {
return this.agent.conversationId;
}

if (this._debugActive && this.xDebugId) {
const conversationId = await this.llmCache.get(this.getDebugConversationKey(), 'text');
return conversationId || undefined;
}

return undefined;
}

public destroy() {
this.sessionClosed = true;
this.sync();
Expand Down Expand Up @@ -247,11 +330,11 @@ export class AgentRuntime {
dbgActiveComponents = dbgAllComponents.filter(
(c: any) =>
c?.ctx?.active == true ||
(!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0)
(!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0),
);
//find waiting components that was not previously run
const dbgActiveWaitingComponents: any = dbgAllComponents.filter(
(c: any) => c?.ctx?.active == true && c?.ctx?.status && typeof c?.ctx?.output !== undefined
(c: any) => c?.ctx?.active == true && c?.ctx?.status && typeof c?.ctx?.output !== undefined,
);

const dbgActiveReadyComponents: any = dbgAllComponents.filter((c: any) => c?.ctx?.active == true && !c?.ctx?.status);
Expand All @@ -275,10 +358,10 @@ export class AgentRuntime {
}

const remainingActiveComponents: any = Object.values(ctxData?.components || []).filter(
(c: any) => c?.ctx?.active == true && !c?.ctx?.alwaysActive
(c: any) => c?.ctx?.active == true && !c?.ctx?.alwaysActive,
);
const activeAsyncComponents: any = Object.values(ctxData?.components || []).filter(
(c: any) => !c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0
(c: any) => !c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0,
);

if (remainingActiveComponents.length == 0 && activeAsyncComponents.length == 0 /*&& awaitingInputs.length == 0*/) {
Expand Down Expand Up @@ -319,7 +402,7 @@ export class AgentRuntime {
public async runCycle() {
logger.debug(
`runCycle agentId=${this.agent.id} wfReqId=${this.workflowReqId} reqTag=${this.reqTag} session=${this.xDebugRun} cycleId=${this.processID}`,
AccessCandidate.agent(this.agent.id)
AccessCandidate.agent(this.agent.id),
);
//this.checkRuntimeContext();

Expand All @@ -336,16 +419,16 @@ export class AgentRuntime {
dbgActiveComponents = dbgAllComponents.filter(
(c: any) =>
c?.ctx?.active == true ||
(!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0)
(!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0),
);
//find waiting components that was not previously run
const dbgActiveWaitingComponents: any = dbgAllComponents.filter(
(c: any) => c?.ctx?.active == true && c?.ctx?.status && typeof c?.ctx?.output !== undefined
(c: any) => c?.ctx?.active == true && c?.ctx?.status && typeof c?.ctx?.output !== undefined,
);
const dbgActiveReadyComponents: any = dbgAllComponents.filter(
(c: any) =>
(c?.ctx?.active == true && !c?.ctx?.status) ||
(!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0)
(!c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0),
);
//const dbgActiveReadyComponents: any = dbgActiveComponents.filter((c: any) => c?.ctx?.active == true && !c?.ctx?.status);

Expand Down Expand Up @@ -387,7 +470,7 @@ export class AgentRuntime {

const remainingActiveComponents: any = Object.values(ctxData?.components || []).filter((c: any) => c?.ctx?.active == true);
const activeAsyncComponents: any = Object.values(ctxData?.components || []).filter(
(c: any) => !c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0
(c: any) => !c?.ctx?.output?._error && Array.isArray(c?.ctx?._job_components) && c?.ctx?._job_components.length > 0,
);
const dbgActiveWaitingComponents: any = dbgAllComponents.filter((c: any) => c?.ctx?.status && typeof c?.ctx?.output !== undefined);

Expand All @@ -405,7 +488,7 @@ export class AgentRuntime {
!e.result._missing_inputs &&
!e.result._in_progress &&
//check if the current result is a trigger
triggers.find((t: any) => t.id == e.id)
triggers.find((t: any) => t.id == e.id),
);

//capture workflow results
Expand All @@ -415,7 +498,7 @@ export class AgentRuntime {
e.result &&
!e.result._missing_inputs &&
//check if this is the last component in the chain
!agent.connections.find((c) => c.sourceId == e.id)
!agent.connections.find((c) => c.sourceId == e.id),
);

sessionResults = sessionResults.concat(triggersResults);
Expand Down Expand Up @@ -497,7 +580,7 @@ export class AgentRuntime {
}
return acc;
},
{ seen: {}, result: [] }
{ seen: {}, result: [] },
)
.result.filter((e) => !e.result?._exclude);

Expand Down
11 changes: 10 additions & 1 deletion packages/core/src/subsystems/MemoryManager/LLMCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ export class LLMCache {
return this._cacheId;
}

/**
* Generates a standardized LLM cache ID
* @param id - Identifier to use as cache ID (e.g., conversationId, sessionId)
* @returns LLM cache ID with 'llm_cache_' prefix
*/
public static generateLLMCacheId(id: string): string {
return `llm_cache:${id}`;
}

/**
* Creates a new LLMCache instace for a smythOS actor, the actor can be an agent, a user or a team
* This is mainly use with agent to maintain a cache of the current LLM context
Expand All @@ -30,7 +39,7 @@ export class LLMCache {
*/
constructor(candidate: AccessCandidate, cacheId?: string, ttl: number = 1 * 60 * 60) {
this._cacheConnector = ConnectorService.getCacheConnector();
this._cacheId = cacheId || 'llm_cache_' + uid();
this._cacheId = cacheId || LLMCache.generateLLMCacheId(uid());
this._ttl = ttl;
this._candidate = candidate;
}
Expand Down
20 changes: 17 additions & 3 deletions packages/core/src/subsystems/MemoryManager/LLMContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,25 @@ export class LLMContext {
return this.llmInference.model;
}
/**
* Creates a new LLM Context instance for managing conversation state.
*
* @param source a messages[] object, or smyth file system uri (smythfs://...)
* @param llmInference - The LLM inference instance to use for this context
* @param _systemPrompt - Initial system prompt for the conversation
* @param llmContextStore - Optional persistent storage for conversation messages
* @param conversationId - Optional conversation ID for cache key generation.
* Used to track cached context across runtime instances
* (e.g., debug steps with the Chatbot embodiment has two separate runtimes: one for chat conversation
* and another for workflow components inside the builder).
* Required for "Use Context Window" feature in GenAILLM component to maintain conversation history.
*/
constructor(private llmInference, _systemPrompt: string = '', llmContextStore?: ILLMContextStore) {
this._llmCache = new LLMCache(AccessCandidate.team(this.llmInference.teamId));
constructor(
private llmInference,
_systemPrompt: string = '',
llmContextStore?: ILLMContextStore,
conversationId?: string,
) {
const cacheId = conversationId ? LLMCache.generateLLMCacheId(conversationId) : undefined;
this._llmCache = new LLMCache(AccessCandidate.team(this.llmInference.teamId), cacheId);

//this._systemPrompt = _systemPrompt;
this.systemPrompt = _systemPrompt;
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/types/LLM.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ export interface IConversationSettings {
* @default 100
*/
maxToolCalls?: number;
conversationId?: string;
}

export enum APIKeySource {
Expand Down
3 changes: 3 additions & 0 deletions packages/sdk/src/LLM/Chat.class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ export class Chat extends SDKObject {
if ((this.source as Agent)?.modes) {
this._curAgentModes = (this.source as Agent).modes.join('|');
}

this._convOptions.conversationId = this._id;

this._conversation = createConversation(this._data, this._convOptions);
}

Expand Down