Text
TurnInput::text(...) or explicit InputItem::Text. For @path-style refs, resolve in the host and inline the marker text you want the model to see.
Turn outcomes, input shape, the semantic event stream, sink semantics, usage channels, and RLM finish.
TurnResult.outcome is one of three categories: Finished, AgentFrameSwitch, Stopped. Branch on category; inspect the variant only when handling specific stop reasons.
pub enum TurnOutcome {
Finished(TurnFinish),
AgentFrameSwitch { frame_id: String, task: String },
Stopped(TurnStop),
}
pub enum TurnFinish {
AssistantMessage { text: String },
FinalValue { value: serde_json::Value },
ToolValue { tool_name: String, value: serde_json::Value },
}
Finished: clean terminal value. AssistantMessage is the prose default for both modes; the runtime commit materializes the transcript message exactly once; there is no second "final message" stream event after AssistantProseDelta. FinalValue / ToolValue come from RLM finish or a tool-authored terminal; both arrive on the stream too.
AgentFrameSwitch: a protocol plugin or tool switched to a fresh AgentFrame inside the same session. Runtime callers normally use the facade, which keeps driving the same session until a terminal outcome is reached.
Stopped: turn aborted before a terminal value. Ten TurnStop variants:
| Variant | Class | Cause |
|---|---|---|
Cancelled | Host-driven | The cancellation token passed to TurnBuilder::cancel(...) fired. |
InvalidInput | Host-driven | Input normalization rejected the TurnInput (e.g. missing attachment, malformed image ref). |
Incomplete | Provider | The LLM hit its output limit mid-message without finishing. |
ProviderError | Provider | The provider returned an error, a content-filter refusal, or a context-overflow terminal reason. |
MaxTurns | Runtime | The protocol turn loop hit session_spec.max_turns and the final forced-reply turn finished without a terminal. |
ToolFailure | Runtime | A tool returned ToolResult::err(...) and turn assembly flagged the turn as failed. |
PluginAbort | Runtime | A plugin's turn-preparation or checkpoint hook returned an abort directive. |
RuntimeError | Fatal | Internal runtime failure or a strict-termination policy with no Done event. Treat as a bug or environmental fault. |
SubmittedError { value } | RLM | An RLM program ended with submit_error(...); value is the model-authored payload. |
ToolError { tool_name, value } | RLM | The RLM protocol signalled a tool-authored error terminal; value is the payload. |
Host-driven stops are recoverable next request. Provider stops usually want a different model or shorter context. Runtime stops indicate a configuration or authoring problem. RuntimeError is the only category where identical-input retry is unlikely to help.
Session-lane conflicts and CAS backstop conflicts are not Stopped variants; they surface as Err from run()/stream(), no partial commit. See Persistence → Session Lane And CAS.
match result.outcome {
TurnOutcome::Finished(finish) => persist_terminal(finish)?,
TurnOutcome::AgentFrameSwitch { frame_id, .. } => record_frame_boundary(frame_id)?,
TurnOutcome::Stopped(stop) => match stop {
TurnStop::Cancelled | TurnStop::InvalidInput => report_user_visible(stop),
TurnStop::ProviderError | TurnStop::Incomplete => offer_retry(stop),
TurnStop::MaxTurns => suggest_higher_max_turns(),
other => record_for_diagnosis(other),
},
}
TurnInput is text plus image refs backed by the attachment store. Filesystem syntax is a host concern.
TurnInput::text(...) or explicit InputItem::Text. For @path-style refs, resolve in the host and inline the marker text you want the model to see.
InputItem::ImageRef with a matching entry in TurnInput::image_blobs. Stored as typed attachments and resolved when building provider requests.
Identity-bearing stream. Prose and reasoning arrive as deltas. Tool and code activity carries a correlation_id. finish or tool-authored terminals arrive as FinalValue / ToolValue. Normal prose completion emits no terminal stream item.
Apps consume TurnActivity: id, correlation_id, and a TurnEvent payload. Hosts do not subscribe to lower-level runtime/debug graph events. There is no app-facing final-message event enum contract. Sinks match only on TurnEvent.
Use TurnBuilder::stream_to, run, and pull-style stream for one running turn. Hosts that need browser reconnect, session cursors, NDJSON, SSE, or remote DTO framing should use Streaming and reconnect.
use async_trait::async_trait;
use lash::{TurnActivity, TurnActivitySink, TurnEvent};
struct AppEvents {
tx: AppUiTx,
turn_state: std::sync::Mutex<TurnUiState>,
}
#[derive(Default)]
struct TurnUiState {
reasoning: Option<UiRowId>,
tools: std::collections::HashMap<String, UiRowId>,
code: Option<UiRowId>,
}
#[async_trait]
impl TurnActivitySink for AppEvents {
async fn emit(&self, activity: TurnActivity) {
let correlation_id = activity.correlation_id.0.clone();
match activity.event {
TurnEvent::AssistantProseDelta { text } => {
append_live_text(text).await;
}
TurnEvent::ReasoningDelta { text } => {
let row = self.turn_state.lock().unwrap().reasoning.clone();
let row = upsert_reasoning_row(row, text).await;
self.turn_state.lock().unwrap().reasoning = Some(row);
}
TurnEvent::ToolCallStarted { name, args, .. } => {
let row = insert_tool_row(name, args).await;
self.turn_state
.lock()
.unwrap()
.tools
.insert(correlation_id, row);
}
TurnEvent::ToolCallCompleted { name, output, .. } => {
let row = self
.turn_state
.lock()
.unwrap()
.tools
.remove(&correlation_id);
update_or_insert_tool_row(row, name, output).await;
}
TurnEvent::CodeBlockStarted { language, code, .. } => {
let row = insert_code_row(language, code).await;
self.turn_state.lock().unwrap().code = Some(row);
}
TurnEvent::CodeBlockCompleted {
language,
output,
error,
success,
..
} => {
let row = self.turn_state.lock().unwrap().code.take();
update_or_insert_code_row(row, language, output, error, success).await;
}
TurnEvent::FinalValue { value } => {
append_live_text(render_terminal_value(&value)).await;
}
TurnEvent::ToolValue { tool_name, value } => {
append_live_text(render_terminal_value(&value)).await;
record_terminal_tool(tool_name).await;
}
TurnEvent::Usage {
usage, cumulative, ..
} => {
update_usage(usage, cumulative).await;
}
TurnEvent::ChildUsage {
source,
usage,
cumulative,
..
} => {
update_child_usage(source, usage, cumulative).await;
}
_ => {}
}
}
}
fn render_terminal_value(value: &serde_json::Value) -> String {
match value {
serde_json::Value::Null => String::new(),
serde_json::Value::String(text) => text.clone(),
other => serde_json::to_string_pretty(other).unwrap_or_else(|_| other.to_string()),
}
}
ToolCallStarted and ToolCallCompleted carry an optional graph_key — the enclosing code block, matching CodeBlockStarted.graph_key — and an optional parent_call_id, the id of the parent batch call. Group rows from these fields directly instead of inferring containment from arrival order. Every Option field on TurnEvent is skip-when-None: an absent graph_key, call_id, or error is omitted from the serialized event rather than sent as null. The full channel map and stability contract live in Reporting channels.
Same correlation_id → same logical row. FinalValue / ToolValue signal a control-path terminal; they are not emitted for normal prose finishes, since that prose already streamed as AssistantProseDelta and the settled message comes from the read view.
TurnActivitySink::emit() is awaited by the runtime. A slow sink slows the turn. No buffering, no spawn-and-drop. Sinks own their own concurrency.
Conventional pattern: push onto an mpsc channel and return immediately; drain into UI state from a separate task. Sender::send only awaits when full; the channel bound is the backpressure dial:
use tokio::sync::mpsc;
struct ChannelSink {
tx: mpsc::Sender<TurnActivity>,
}
#[async_trait::async_trait]
impl TurnActivitySink for ChannelSink {
async fn emit(&self, activity: TurnActivity) {
// send().await yields when the channel is full — the turn
// will pause here if your UI consumer falls behind.
let _ = self.tx.send(activity).await;
}
}
Sink panics or Err-shaped activities do not abort the turn. Persistence and TurnOutput.activities are independent of the sink; a dropped sink loses no activity.
Cancellation is cooperative and first-class: a cancelled turn finishes with TurnOutcome::Stopped(TurnStop::Cancelled), commits like any other turn, and leaves the session ready for the next turn. In-flight provider streaming is aborted, not awaited.
Two tiers. TurnBuilder::cancel(token) attaches a caller-owned CancellationToken (re-exported at the crate root, no tokio-util dependency needed) to one turn. LashSession::cancel_running_turns() cancels everything currently executing through that opened session and its clones: the affordance behind a UI stop button, with no token threading. It returns how many turns were signalled; a handle opened separately for the same session id is not reached.
use lash::CancellationToken;
use lash::{TurnOutcome, TurnStop};
// Per-turn token: hand it to whatever can decide to stop the turn
// (an HTTP handler, a keybinding, a timeout task).
let cancel = CancellationToken::new();
let stream = session
.turn(TurnInput::text("Summarize the incident."))
.cancel(cancel.clone())
.stream()?;
// elsewhere: cancel.cancel();
// Or skip token plumbing entirely: any clone of the opened session can
// stop whatever it is currently running.
let stopper = session.clone();
let cancelled_turns = stopper.cancel_running_turns();
let result = stream.finish().await?;
if matches!(result.outcome, TurnOutcome::Stopped(TurnStop::Cancelled)) {
// The turn committed as cancelled; the session is ready for the
// next turn.
}
Four channels, finest to coarsest:
TraceSinkTurnEvent::Usage / TurnEvent::ChildUsageChildUsage carries session_id and source for grouping child traffic.TurnResult.usage / TurnResult.children_usage(source, model) child breakdown. TurnResult::total_usage() sums both.session.usage_report() → SessionUsageReportsource × model. Dashboards and "session so far."All usage surfaces use the same five canonical buckets: uncached input_tokens, total output_tokens, cache_read_input_tokens, cache_write_input_tokens, and reasoning_output_tokens. Reasoning is included in output. Source-label constants and re-exports: lash::usage.
Two RLM termination policies: FinishRequired validates finish against the configured schema and finishes as TurnFinish::FinalValue (also emitted on the stream). Natural allows a no-code prose answer that finishes as TurnFinish::AssistantMessage with no terminal stream event. For prompt construction, history, variables, and the execution loop, see the RLM protocol guide.
use lash::rlm::RlmTurnBuilderExt as _;
let finished = session
.turn(TurnInput::text("Move on the board."))
.require_finish()?
.stream_to(&sink)
.await?;
let natural = session
.turn(TurnInput::text("Answer directly if no code is needed."))
.allow_prose_or_finish()?
.run()
.await?;
match result.outcome {
TurnOutcome::Finished(TurnFinish::FinalValue { value }) => {
// Same value already arrived as TurnEvent::FinalValue.
persist_typed_value(value)?;
}
TurnOutcome::Finished(TurnFinish::AssistantMessage { text }) => persist_text(text)?,
other => handle_other_outcome(other)?,
}
Use require_finish_schema(...) when the final value must match a JSON Schema. Use RlmSessionBuilderExt::final_answer_format(...) to set the session's normal final-value presentation: root RLM sessions default to Markdown guidance, managed child sessions default to raw finish values, and schema-required turns ignore the presentation preference.
use lash::rlm::{RlmFinalAnswerFormat, RlmSessionBuilderExt as _, RlmTurnBuilderExt as _};
let session = core
.session("analysis")
.final_answer_format(RlmFinalAnswerFormat::RawFinalValue)?
.open()
.await?;
let result = session
.turn(TurnInput::text("Return a risk rating."))
.require_finish_schema(serde_json::json!({
"type": "object",
"required": ["rating"],
"properties": {
"rating": { "type": "string" }
},
"additionalProperties": false
}))?
.run()
.await?;