Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 123 additions & 72 deletions crates/lucarne-telegram/src/bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ use tracing::{debug, info, instrument, warn};
use crate::{
agents,
channel::TelegramBotCommand,
state::{BotState, LiveSession, PanelSnapshot, PanelView, RunningCommandWorkflow, WorkSession},
state::{
BotState, LiveSession, PanelSnapshot, PanelView, RunningCommandWorkflow, RunningTurn,
WorkSession,
},
turn,
};
use base64::Engine;
Expand Down Expand Up @@ -1635,6 +1638,48 @@ impl Bot {
Ok(())
}

async fn submit_and_drain_user_turn(
&self,
ws: &WorkspaceId,
handle: &WorkspaceHandle,
live: &LiveSession,
input: AgentInput,
provider_id: &str,
recorder: Arc<dyn turn::TurnEventRecorder>,
intervention_callback_registry: Option<Arc<dyn turn::AgentInterventionCallbackRegistry>>,
final_footer: Option<AgentMessageFooter>,
source_message: &MessageId,
reply_to: Option<MessageId>,
) -> Result<(RunningTurn, turn::TurnRunReport), String> {
turn::prepare_turn_drain(live).await;
let _direct_notification_guard =
DirectNotificationGuard::new(self.state.core_handle(), ws.clone());
let running_turn = self
.state
.submit_user_turn(ws, input, Some(source_message))
.await
.map_err(|err| err.to_string())?;
debug!("prompt submitted");
let report = turn::drain_turn_with_options(
&self.channel,
handle,
live,
provider_id,
turn::TurnRunOptions {
recording: Some(turn::TurnRecording {
turn_id: running_turn.turn_id.clone(),
recorder,
}),
intervention_callback_registry,
final_footer,
},
reply_to,
)
.await?;
self.state.complete_turn(&running_turn)?;
Ok((running_turn, report))
}

async fn run_workspace_turn(
&self,
ws: &WorkspaceId,
Expand Down Expand Up @@ -1715,35 +1760,23 @@ impl Bot {
let recorder = recorder_scope.recorder(&self.state);
let intervention_callback_registry =
recorder_scope.intervention_callback_registry(&self.state);
let mut running_turn =
self.state
.start_user_turn(ws, &live, input.text.as_ref(), Some(source_message))?;
let _direct_notification_guard =
DirectNotificationGuard::new(Arc::clone(&self.core), ws.clone());
let report = match turn::run_turn_with_options(
&self.channel,
handle,
&live,
input.clone(),
session.provider_id,
turn::TurnRunOptions {
recording: Some(turn::TurnRecording {
turn_id: running_turn.turn_id.clone(),
recorder: Arc::clone(&recorder),
}),
intervention_callback_registry: intervention_callback_registry.clone(),
final_footer: final_footer.clone(),
},
reply_to.clone(),
)
.await
let (running_turn, report) = match self
.submit_and_drain_user_turn(
ws,
handle,
&live,
input.clone(),
session.provider_id,
Arc::clone(&recorder),
intervention_callback_registry.clone(),
final_footer.clone(),
source_message,
reply_to.clone(),
)
.await
{
Ok(report) => {
self.state.complete_turn(&running_turn)?;
report
}
Ok((running_turn, report)) => (running_turn, report),
Err(err) if is_recoverable_live_session_error(&err) => {
self.state.fail_turn(&running_turn, &err)?;
warn!(
target: "lucarne_telegram::bot",
error = %err,
Expand Down Expand Up @@ -1773,44 +1806,24 @@ impl Bot {
let retry_live = self
.ensure_live_bound(&mut retry_session, force_bypass_permissions)
.await?;
running_turn = self.state.start_user_turn(
ws,
&retry_live,
input.text.as_ref(),
Some(source_message),
)?;
let retry_report = match turn::run_turn_with_options(
&self.channel,
handle,
&retry_live,
input,
session.provider_id,
turn::TurnRunOptions {
recording: Some(turn::TurnRecording {
turn_id: running_turn.turn_id.clone(),
recorder,
}),
let retry_result = self
.submit_and_drain_user_turn(
ws,
handle,
&retry_live,
input,
session.provider_id,
recorder,
intervention_callback_registry,
final_footer,
},
reply_to,
)
.await
{
Ok(retry_report) => {
self.state.complete_turn(&running_turn)?;
retry_report
}
Err(retry_err) => {
self.state.fail_turn(&running_turn, &retry_err)?;
return Err(retry_err);
}
};
source_message,
reply_to,
)
.await?;
live = retry_live;
retry_report
retry_result
}
Err(err) => {
self.state.fail_turn(&running_turn, &err)?;
let resume_ref = match session.resume_ref.clone() {
Some(resume_ref) => Some(resume_ref),
None => live_provider_resume_ref(&live).await,
Expand Down Expand Up @@ -3135,7 +3148,40 @@ impl Bot {
let observed_close = live.session.observed_close_reason().await;
if observed_close.is_none() {
let provider_ref_after = live_provider_resume_ref(&live).await;
if let (Some(expected_ref), Some(live_ref)) =
let core_workspace_id =
lucarne::control_plane::WorkspaceId::new(session.workspace.as_str());
let core_live_instance_id = lucarne::control_plane::LiveInstanceId::new(
live.session.instance_id().0.as_str(),
);
if !self
.core
.live_session_is_bound(&core_workspace_id, &core_live_instance_id)
{
let resume_ref = match session.resume_ref.clone() {
Some(resume_ref) => Some(resume_ref),
None if self.should_keep_live_only_resume_ref(
&session.workspace,
None,
provider_ref_after.as_deref(),
) =>
{
None
}
None => provider_ref_after.clone(),
};
warn!(
target: "lucarne_telegram::bot",
provider = session.provider_id,
workspace = %session.workspace.as_str(),
live_instance = %core_live_instance_id.as_str(),
"discarding live session missing from core runtime registry before turn"
);
self.state
.mark_live_dead(&session.workspace, resume_ref.clone())
.map_err(|e| e.to_string())?;
session.resume_ref = resume_ref;
session.live = None;
} else if let (Some(expected_ref), Some(live_ref)) =
(session.resume_ref.clone(), provider_ref_after.clone())
{
if expected_ref != live_ref {
Expand Down Expand Up @@ -3216,9 +3262,17 @@ impl Bot {
return Err(err);
}
};
let resume_ref = live_provider_resume_ref(&live)
.await
.or_else(|| session.resume_ref.clone());
let provider_ref_after = live_provider_resume_ref(&live).await;
let keep_live_only = self.should_keep_live_only_resume_ref(
&session.workspace,
session.resume_ref.as_deref(),
provider_ref_after.as_deref(),
);
let resume_ref = if keep_live_only {
None
} else {
provider_ref_after.or_else(|| session.resume_ref.clone())
};
self.state
.bind_live(&session.workspace, live.clone(), resume_ref.clone())
.map_err(|e| e.to_string())?;
Expand Down Expand Up @@ -7089,7 +7143,7 @@ mod tests {
}

fn test_bot_with_state(channel: Arc<RecordingChannel>, state: Arc<BotState>) -> Arc<Bot> {
let core = core_with_runtime(Arc::new(AgentRuntime::new()));
let core = state.core_handle();
Arc::new(Bot::new_with_state(
channel,
core,
Expand Down Expand Up @@ -7535,12 +7589,11 @@ mod tests {
std::fs::create_dir_all(&project).expect("project dir");
std::fs::write(project.join("README.md"), "lucarne telegram live bot e2e\n")
.expect("readme");
let db = temp.path().join("state.sqlite3");
let channel = Arc::new(RecordingChannel::default());
let runtime = Arc::new(AgentRuntime::new());
runtime.register_defaults();
let core = core_with_runtime(runtime);
let state = BotState::open_sqlite(&db).expect("open temp state");
let state = BotState::new_with_core(Arc::clone(&core));
let bot = Arc::new(Bot::new_with_state(
channel.clone(),
core,
Expand Down Expand Up @@ -7607,9 +7660,8 @@ mod tests {
ProtocolProvider::new(active_provider.adapter()).expect("recorded provider"),
));
let core = core_with_runtime(Arc::new(runtime));
let db = temp.path().join("state.sqlite3");
let channel = Arc::new(RecordingChannel::default());
let state = BotState::open_sqlite(&db).expect("open temp state");
let state = BotState::new_with_core(Arc::clone(&core));
let bot = Arc::new(Bot::new_with_state(
channel.clone(),
core,
Expand Down Expand Up @@ -7712,12 +7764,11 @@ mod tests {
("LUCARNE_TELEGRAM_LIVE_TIMEOUT", OsString::from("5")),
]);

let db = temp.path().join("state.sqlite3");
let channel = Arc::new(RecordingChannel::default());
let runtime = Arc::new(AgentRuntime::new());
runtime.register_defaults();
let core = core_with_runtime(runtime);
let state = BotState::open_sqlite(&db).expect("open temp state");
let state = BotState::new_with_core(Arc::clone(&core));
let bot = Arc::new(Bot::new_with_state(
channel.clone(),
core,
Expand Down
33 changes: 31 additions & 2 deletions crates/lucarne-telegram/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use lucarne::agent_runtime::{
AgentCapabilities, AgentError, AgentErrorKind, AgentProvider, AgentRuntime, OpenSession,
ProbeResult, ProviderId, ResumeSession,
};
use lucarne::agent_runtime::{AgentForkTargetCatalog, AgentImageInput, AgentStatus, InstanceId};
use lucarne::agent_runtime::{
AgentForkTargetCatalog, AgentImageInput, AgentInput, AgentStatus, InstanceId,
};
#[cfg(test)]
use lucarne::control_plane::ControlPlaneSqliteStore;
use lucarne::control_plane::{
Expand All @@ -21,7 +23,7 @@ use lucarne::control_plane::{
WorkspaceId as ControlWorkspaceId,
};
use lucarne::core_service::{
CoreWorkspaceEventStream, DaemonSession, LucarneCore, OpenWorkspaceRequest,
CoreWorkspaceEventStream, DaemonSession, LucarneCore, OpenWorkspaceRequest, SubmitTurnRequest,
TimelineKindProjection, TimelineProjectionItem,
};
use lucarne::event::SubAgentCall;
Expand Down Expand Up @@ -292,6 +294,32 @@ impl BotState {
));
}

pub fn core_handle(&self) -> Arc<LucarneCore> {
Arc::clone(&self.core)
}

pub async fn submit_user_turn(
&self,
ws: &WorkspaceId,
input: AgentInput,
reply_to: Option<&MessageId>,
) -> Result<RunningTurn, String> {
let submitted = self
.core
.submit_turn(SubmitTurnRequest {
workspace_id: control_workspace_id(ws),
source: TurnSource::UserMessage,
input,
reply_to_channel_message_id: reply_to
.and_then(|id| id.as_str().parse::<i64>().ok()),
})
.await
.map_err(|err| err.to_string())?;
Ok(RunningTurn {
turn_id: submitted.turn_id,
})
}

fn sync_core_session(
&self,
session: &WorkSession,
Expand Down Expand Up @@ -1691,6 +1719,7 @@ impl BotState {
)
}

#[cfg(test)]
pub fn start_user_turn(
&self,
ws: &WorkspaceId,
Expand Down
30 changes: 13 additions & 17 deletions crates/lucarne-telegram/src/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

use base64::Engine as _;
#[cfg(test)]
use lucarne::agent_runtime::AgentInput;
#[cfg(test)]
use lucarne::agent_runtime::InterventionRequest;
use lucarne::agent_runtime::{
AgentCommandInvocation, AgentCommandResult, AgentCommandResultData, AgentInput, AgentStatus,
AgentCommandInvocation, AgentCommandResult, AgentCommandResultData, AgentStatus,
Attachment as AgentAttachment, CommandResultEvent, Event, InstanceId, MessageEvent,
MessageRole,
};
Expand Down Expand Up @@ -447,15 +449,7 @@ fn log_event_json(value: &serde_json::Value) -> String {
log_event_text(&value.to_string(), EVENT_LOG_TEXT_MAX)
}

pub(crate) async fn run_turn_with_options(
channel: &Arc<dyn Channel>,
target: &WorkspaceHandle,
live: &LiveSession,
input: AgentInput,
provider_id: &str,
options: TurnRunOptions,
reply_to: Option<MessageId>,
) -> Result<TurnRunReport, String> {
pub(crate) async fn prepare_turn_drain(live: &LiveSession) {
// 0. Drop any leftover events from a prior turn. With
// `Event::TurnCompleted` now being the authoritative end-of-turn
// signal, this should almost always be empty in practice — if
Expand All @@ -471,14 +465,16 @@ pub(crate) async fn run_turn_with_options(
"discarded stale events from previous turn (inspect logs above for event kinds)"
);
}
}

// 1. Submit the prompt.
live.session
.submit_turn(input)
.await
.map_err(|e| e.to_string())?;
debug!("prompt submitted");

pub(crate) async fn drain_turn_with_options(
channel: &Arc<dyn Channel>,
target: &WorkspaceHandle,
live: &LiveSession,
provider_id: &str,
options: TurnRunOptions,
reply_to: Option<MessageId>,
) -> Result<TurnRunReport, String> {
drain_submitted_events(
channel,
target,
Expand Down
Loading