lash/turns

Turn outcomes, input shape, the semantic event stream, sink semantics, usage channels, and RLM finish.

Turn Outcomes

TurnResult.outcome is one of three categories: Finished, AgentFrameSwitch, Stopped. Branch on category; inspect the variant only when handling specific stop reasons.

TurnOutcome shape
pub enum TurnOutcome {
    Finished(TurnFinish),
    AgentFrameSwitch { frame_id: String, task: String },
    Stopped(TurnStop),
}

pub enum TurnFinish {
    AssistantMessage { text: String },
    FinalValue { value: serde_json::Value },
    ToolValue { tool_name: String, value: serde_json::Value },
}

Finished: clean terminal value. AssistantMessage is the prose default for both modes; the runtime commit materializes the transcript message exactly once; there is no second "final message" stream event after AssistantProseDelta. FinalValue / ToolValue come from RLM finish or a tool-authored terminal; both arrive on the stream too.

AgentFrameSwitch: a protocol plugin or tool switched to a fresh AgentFrame inside the same session. Runtime callers normally use the facade, which keeps driving the same session until a terminal outcome is reached.

Stopped: turn aborted before a terminal value. Ten TurnStop variants:

VariantClassCause
CancelledHost-drivenThe cancellation token passed to TurnBuilder::cancel(...) fired.
InvalidInputHost-drivenInput normalization rejected the TurnInput (e.g. missing attachment, malformed image ref).
IncompleteProviderThe LLM hit its output limit mid-message without finishing.
ProviderErrorProviderThe provider returned an error, a content-filter refusal, or a context-overflow terminal reason.
MaxTurnsRuntimeThe protocol turn loop hit session_spec.max_turns and the final forced-reply turn finished without a terminal.
ToolFailureRuntimeA tool returned ToolResult::err(...) and turn assembly flagged the turn as failed.
PluginAbortRuntimeA plugin's turn-preparation or checkpoint hook returned an abort directive.
RuntimeErrorFatalInternal runtime failure or a strict-termination policy with no Done event. Treat as a bug or environmental fault.
SubmittedError { value }RLMAn RLM program ended with submit_error(...); value is the model-authored payload.
ToolError { tool_name, value }RLMThe RLM protocol signalled a tool-authored error terminal; value is the payload.

Host-driven stops are recoverable next request. Provider stops usually want a different model or shorter context. Runtime stops indicate a configuration or authoring problem. RuntimeError is the only category where identical-input retry is unlikely to help.

Session-lane conflicts and CAS backstop conflicts are not Stopped variants; they surface as Err from run()/stream(), no partial commit. See Persistence → Session Lane And CAS.

match result.outcome {
    TurnOutcome::Finished(finish) => persist_terminal(finish)?,
    TurnOutcome::AgentFrameSwitch { frame_id, .. } => record_frame_boundary(frame_id)?,
    TurnOutcome::Stopped(stop) => match stop {
        TurnStop::Cancelled | TurnStop::InvalidInput => report_user_visible(stop),
        TurnStop::ProviderError | TurnStop::Incomplete => offer_retry(stop),
        TurnStop::MaxTurns => suggest_higher_max_turns(),
        other => record_for_diagnosis(other),
    },
}

Input Boundary

TurnInput is text plus image refs backed by the attachment store. Filesystem syntax is a host concern.

Text

TurnInput::text(...) or explicit InputItem::Text. For @path-style refs, resolve in the host and inline the marker text you want the model to see.

Images

InputItem::ImageRef with a matching entry in TurnInput::image_blobs. Stored as typed attachments and resolved when building provider requests.

Semantic Events

Identity-bearing stream. Prose and reasoning arrive as deltas. Tool and code activity carries a correlation_id. finish or tool-authored terminals arrive as FinalValue / ToolValue. Normal prose completion emits no terminal stream item.

Three event types, only one is for your app.

Apps consume TurnActivity: id, correlation_id, and a TurnEvent payload. Hosts do not subscribe to lower-level runtime/debug graph events. There is no app-facing final-message event enum contract. Sinks match only on TurnEvent.

Turn streams are not reconnect cursors.

Use TurnBuilder::stream_to, run, and pull-style stream for one running turn. Hosts that need browser reconnect, session cursors, NDJSON, SSE, or remote DTO framing should use Streaming and reconnect.

TurnActivity sink
use async_trait::async_trait;
use lash::{TurnActivity, TurnActivitySink, TurnEvent};

struct AppEvents {
    tx: AppUiTx,
    turn_state: std::sync::Mutex<TurnUiState>,
}

#[derive(Default)]
struct TurnUiState {
    reasoning: Option<UiRowId>,
    tools: std::collections::HashMap<String, UiRowId>,
    code: Option<UiRowId>,
}

#[async_trait]
impl TurnActivitySink for AppEvents {
    async fn emit(&self, activity: TurnActivity) {
        let correlation_id = activity.correlation_id.0.clone();
        match activity.event {
            TurnEvent::AssistantProseDelta { text } => {
                append_live_text(text).await;
            }
            TurnEvent::ReasoningDelta { text } => {
                let row = self.turn_state.lock().unwrap().reasoning.clone();
                let row = upsert_reasoning_row(row, text).await;
                self.turn_state.lock().unwrap().reasoning = Some(row);
            }
            TurnEvent::ToolCallStarted { name, args, .. } => {
                let row = insert_tool_row(name, args).await;
                self.turn_state
                    .lock()
                    .unwrap()
                    .tools
                    .insert(correlation_id, row);
            }
            TurnEvent::ToolCallCompleted { name, output, .. } => {
                let row = self
                    .turn_state
                    .lock()
                    .unwrap()
                    .tools
                    .remove(&correlation_id);
                update_or_insert_tool_row(row, name, output).await;
            }
            TurnEvent::CodeBlockStarted { language, code, .. } => {
                let row = insert_code_row(language, code).await;
                self.turn_state.lock().unwrap().code = Some(row);
            }
            TurnEvent::CodeBlockCompleted {
                language,
                output,
                error,
                success,
                ..
            } => {
                let row = self.turn_state.lock().unwrap().code.take();
                update_or_insert_code_row(row, language, output, error, success).await;
            }
            TurnEvent::FinalValue { value } => {
                append_live_text(render_terminal_value(&value)).await;
            }
            TurnEvent::ToolValue { tool_name, value } => {
                append_live_text(render_terminal_value(&value)).await;
                record_terminal_tool(tool_name).await;
            }
            TurnEvent::Usage {
                usage, cumulative, ..
            } => {
                update_usage(usage, cumulative).await;
            }
            TurnEvent::ChildUsage {
                source,
                usage,
                cumulative,
                ..
            } => {
                update_child_usage(source, usage, cumulative).await;
            }
            _ => {}
        }
    }
}

fn render_terminal_value(value: &serde_json::Value) -> String {
    match value {
        serde_json::Value::Null => String::new(),
        serde_json::Value::String(text) => text.clone(),
        other => serde_json::to_string_pretty(other).unwrap_or_else(|_| other.to_string()),
    }
}
Tool and code events carry their containment.

ToolCallStarted and ToolCallCompleted carry an optional graph_key — the enclosing code block, matching CodeBlockStarted.graph_key — and an optional parent_call_id, the id of the parent batch call. Group rows from these fields directly instead of inferring containment from arrival order. Every Option field on TurnEvent is skip-when-None: an absent graph_key, call_id, or error is omitted from the serialized event rather than sent as null. The full channel map and stability contract live in Reporting channels.

Use event identity, not duplicate detection.

Same correlation_id → same logical row. FinalValue / ToolValue signal a control-path terminal; they are not emitted for normal prose finishes, since that prose already streamed as AssistantProseDelta and the settled message comes from the read view.

Sink Semantics

TurnActivitySink::emit() is awaited by the runtime. A slow sink slows the turn. No buffering, no spawn-and-drop. Sinks own their own concurrency.

Conventional pattern: push onto an mpsc channel and return immediately; drain into UI state from a separate task. Sender::send only awaits when full; the channel bound is the backpressure dial:

use tokio::sync::mpsc;

struct ChannelSink {
    tx: mpsc::Sender<TurnActivity>,
}

#[async_trait::async_trait]
impl TurnActivitySink for ChannelSink {
    async fn emit(&self, activity: TurnActivity) {
        // send().await yields when the channel is full — the turn
        // will pause here if your UI consumer falls behind.
        let _ = self.tx.send(activity).await;
    }
}

Sink panics or Err-shaped activities do not abort the turn. Persistence and TurnOutput.activities are independent of the sink; a dropped sink loses no activity.

Cancellation

Cancellation is cooperative and first-class: a cancelled turn finishes with TurnOutcome::Stopped(TurnStop::Cancelled), commits like any other turn, and leaves the session ready for the next turn. In-flight provider streaming is aborted, not awaited.

Two tiers. TurnBuilder::cancel(token) attaches a caller-owned CancellationToken (re-exported at the crate root, no tokio-util dependency needed) to one turn. LashSession::cancel_running_turns() cancels everything currently executing through that opened session and its clones: the affordance behind a UI stop button, with no token threading. It returns how many turns were signalled; a handle opened separately for the same session id is not reached.

use lash::CancellationToken;
use lash::{TurnOutcome, TurnStop};

// Per-turn token: hand it to whatever can decide to stop the turn
// (an HTTP handler, a keybinding, a timeout task).
let cancel = CancellationToken::new();
let stream = session
    .turn(TurnInput::text("Summarize the incident."))
    .cancel(cancel.clone())
    .stream()?;
// elsewhere: cancel.cancel();

// Or skip token plumbing entirely: any clone of the opened session can
// stop whatever it is currently running.
let stopper = session.clone();
let cancelled_turns = stopper.cancel_running_turns();

let result = stream.finish().await?;
if matches!(result.outcome, TurnOutcome::Stopped(TurnStop::Cancelled)) {
    // The turn committed as cancelled; the session is ready for the
    // next turn.
}

Token Usage

Four channels, finest to coarsest:

TraceSink
Every provider call across every session. Billing, audit, off-line analysis.
TurnEvent::Usage / TurnEvent::ChildUsage
Live during a turn, one event per LLM iteration. ChildUsage carries session_id and source for grouping child traffic.
TurnResult.usage / TurnResult.children_usage
Per-turn snapshot. Parent-only and per-(source, model) child breakdown. TurnResult::total_usage() sums both.
session.usage_report()SessionUsageReport
Session aggregate by source × model. Dashboards and "session so far."

All usage surfaces use the same five canonical buckets: uncached input_tokens, total output_tokens, cache_read_input_tokens, cache_write_input_tokens, and reasoning_output_tokens. Reasoning is included in output. Source-label constants and re-exports: lash::usage.

RLM And Finish

Two RLM termination policies: FinishRequired validates finish against the configured schema and finishes as TurnFinish::FinalValue (also emitted on the stream). Natural allows a no-code prose answer that finishes as TurnFinish::AssistantMessage with no terminal stream event. For prompt construction, history, variables, and the execution loop, see the RLM protocol guide.

Per-turn RLM options

use lash::rlm::RlmTurnBuilderExt as _;

let finished = session
    .turn(TurnInput::text("Move on the board."))
    .require_finish()?
    .stream_to(&sink)
    .await?;

let natural = session
    .turn(TurnInput::text("Answer directly if no code is needed."))
    .allow_prose_or_finish()?
    .run()
    .await?;

Outcome shape

match result.outcome {
    TurnOutcome::Finished(TurnFinish::FinalValue { value }) => {
        // Same value already arrived as TurnEvent::FinalValue.
        persist_typed_value(value)?;
    }
    TurnOutcome::Finished(TurnFinish::AssistantMessage { text }) => persist_text(text)?,
    other => handle_other_outcome(other)?,
}

Use require_finish_schema(...) when the final value must match a JSON Schema. Use RlmSessionBuilderExt::final_answer_format(...) to set the session's normal final-value presentation: root RLM sessions default to Markdown guidance, managed child sessions default to raw finish values, and schema-required turns ignore the presentation preference.

use lash::rlm::{RlmFinalAnswerFormat, RlmSessionBuilderExt as _, RlmTurnBuilderExt as _};

let session = core
    .session("analysis")
    .final_answer_format(RlmFinalAnswerFormat::RawFinalValue)?
    .open()
    .await?;

let result = session
    .turn(TurnInput::text("Return a risk rating."))
    .require_finish_schema(serde_json::json!({
        "type": "object",
        "required": ["rating"],
        "properties": {
            "rating": { "type": "string" }
        },
        "additionalProperties": false
    }))?
    .run()
    .await?;
read on ·