Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/AgentObserver.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Swis\Agents;

use OpenAI\Responses\Responses\CreateStreamedResponse;
use Swis\Agents\Interfaces\AgentInterface;
use Swis\Agents\Interfaces\MessageInterface;
use Swis\Agents\Orchestrator\RunContext;
Expand Down Expand Up @@ -42,6 +43,43 @@ public function onResponseInterval(AgentInterface $agent, Payload $payload, RunC
{
}

/**
* Called when an agent generates a complete reasoning message
*
* @param AgentInterface $agent The agent that generated the response
* @param MessageInterface $message The complete reasoning message that was generated
* @param RunContext $context The execution context
* @return void
*/
public function onReasoning(AgentInterface $agent, MessageInterface $message, RunContext $context): void
{
}

/**
* Called during streaming when a partial reasoning token is received
*
* @param AgentInterface $agent The agent that generated the response token
* @param Payload $payload The reasoning payload
* @param RunContext $context The execution context
* @return void
*/
public function onReasoningInterval(AgentInterface $agent, Payload $payload, RunContext $context): void
{
}

/**
* Called during streaming when an SSE event is received
*
* @param RunContext $context The run context
* @param AgentInterface $agent The agent that generated the reasoning token
* @param string $event The name of the event
* @param CreateStreamedResponse $response The response belonging to the event
* @return void
*/
public function onStreamEvent(AgentInterface $agent, string $event, CreateStreamedResponse $response, RunContext $context): void
{
}

/**
* Called when an agent initiates a tool call
*
Expand Down
47 changes: 47 additions & 0 deletions src/Orchestrator/ObserverInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Swis\Agents\Orchestrator;

use OpenAI\Responses\Responses\CreateStreamedResponse;
use Swis\Agents\Interfaces\AgentInterface;
use Swis\Agents\Interfaces\MessageInterface;
use Swis\Agents\Response\Payload;
Expand Down Expand Up @@ -48,6 +49,52 @@ public function agentOnResponseInterval(RunContext $context, AgentInterface $age
}
}

/**
* Notify all agent observers about a reasoning message
*
* @param RunContext $context The run context
* @param AgentInterface $agent The agent that generated the reasoning
* @param MessageInterface $message The reasoning message that was generated
* @return void
*/
public function agentOnReasoning(RunContext $context, AgentInterface $agent, MessageInterface $message): void
{
foreach ($context->agentObservers() as $observer) {
$observer->onReasoning($agent, $message, $context);
}
}

/**
* Notify all agent observers about an intermediate reasoning token during streaming
*
* @param RunContext $context The run context
* @param AgentInterface $agent The agent that generated the reasoning token
* @param Payload $payload The reasoning token payload
* @return void
*/
public function agentOnReasoningInterval(RunContext $context, AgentInterface $agent, Payload $payload): void
{
foreach ($context->agentObservers() as $observer) {
$observer->onReasoningInterval($agent, $payload, $context);
}
}

/**
* Notify all agent observers about an SSE event
*
* @param RunContext $context The run context
* @param AgentInterface $agent The agent that generated the reasoning token
* @param string $event The name of the event
* @param CreateStreamedResponse $response The response belonging to the event
* @return void
*/
public function agentOnStreamEvent(RunContext $context, AgentInterface $agent, string $event, CreateStreamedResponse $response): void
{
foreach ($context->agentObservers() as $observer) {
$observer->onStreamEvent($agent, $event, $response, $context);
}
}

/**
* Notify all agent observers about a tool call
*
Expand Down
24 changes: 21 additions & 3 deletions src/Response/ResponsesStreamedResponseWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
use OpenAI\Responses\Responses\CreateStreamedResponse;
use OpenAI\Responses\Responses\Streaming\OutputItem;
use OpenAI\Responses\Responses\Streaming\OutputTextDelta;
use OpenAI\Responses\Responses\Streaming\ReasoningSummaryTextDelta;
use OpenAI\Responses\Responses\Streaming\ReasoningSummaryTextDone;
use OpenAI\Responses\StreamResponse;
use Swis\Agents\Interfaces\AgentInterface;
use Swis\Agents\Message;
use Swis\Agents\Transporters\ResponsesTransporter;

/**
Expand Down Expand Up @@ -37,10 +40,10 @@ public function __construct(
/**
* Extract text from the streamed response chunk.
*
* @param OutputTextDelta $response
* @param OutputTextDelta|ReasoningSummaryTextDelta $response
* @return string
*/
protected function getText(OutputTextDelta $response): string
protected function getText(OutputTextDelta|ReasoningSummaryTextDelta $response): string
{
return $response->delta;
}
Expand Down Expand Up @@ -110,8 +113,11 @@ public function getIterator(): Generator
assert($response instanceof CreateStreamedResponse);
$this->generated[] = $response;

$context = $this->agent->orchestrator()->context;
$context->observerInvoker()->agentOnStreamEvent($context, $this->agent, $response->event, $response);

if ($response->event === 'response.created' && $response->response instanceof CreateResponse) {
$this->agent->orchestrator()->context->withPreviousResponseId($response->response->id);
$context->withPreviousResponseId($response->response->id);

continue;
}
Expand All @@ -120,6 +126,18 @@ public function getIterator(): Generator
$currentItem = $response->response;
}

if ($response->response instanceof ReasoningSummaryTextDelta) {
$context->observerInvoker()->agentOnReasoningInterval($context, $this->agent, new Payload($this->getText($response->response), 'assistant'));

continue;
}

if ($response->response instanceof ReasoningSummaryTextDone) {
$context->observerInvoker()->agentOnReasoning($context, $this->agent, new Message('assistant', $response->response->text));

continue;
}

if ($response->response instanceof OutputTextDelta) {
yield new Payload(content: $this->getText($response->response), role: $currentItem?->item->role ?? null);

Expand Down